ship-controller/speech_to_text.py
2025-06-30 03:59:00 +02:00

294 lines
10 KiB
Python

"""
Speech-to-Text module for MIDI-to-Hue application.
Handles audio recording and cloud-based speech recognition.
"""
import os
import time
import threading
import requests
import io
import wave
import pyaudio
import numpy as np
from typing import Optional, Dict, Any, Callable
from gtts import gTTS
import tempfile
class SpeechToText:
"""Manages audio recording and speech-to-text processing."""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the Speech-to-Text controller.
Args:
config: Configuration dictionary with STT settings
"""
self.config = config
self.recording = False
self.stream = None
self.frames = []
self.p = None
self.recording_thread = None
self.callback = None
# Audio settings from config or defaults
self.format = pyaudio.paInt16
self.channels = 1
self.rate = 44100
self.chunk = 4096
self.device_index = config.get("audio_device_index", 1)
self.language = config.get("language", "de")
self.api_key = config.get("api_key", "")
self.api_endpoint = config.get("api_endpoint", "https://api.openai.com/v1/audio/transcriptions")
def initialize(self) -> bool:
"""Initialize PyAudio and check if the device is available."""
try:
self.p = pyaudio.PyAudio()
device_count = self.p.get_device_count()
if self.device_index >= device_count:
print(f"Error: Audio device index {self.device_index} out of range.")
print(f"Available devices: {device_count}")
return False
# Get device info to display
device_info = self.p.get_device_info_by_index(self.device_index)
print(f"Using audio device: {device_info['name']}")
return True
except Exception as e:
print(f"Error initializing audio: {e}")
return False
def set_callback(self, callback: Callable[[str], None]) -> None:
"""Set callback function to receive transcription results."""
self.callback = callback
def start_recording(self) -> bool:
"""Start recording audio when MIDI button is pressed."""
if self.recording:
return False # Already recording
try:
print("Starting audio recording...")
self.recording = True
self.frames = []
# Open audio stream
self.stream = self.p.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
input_device_index=self.device_index,
frames_per_buffer=self.chunk
)
# Start recording thread
self.recording_thread = threading.Thread(target=self._record_audio)
self.recording_thread.daemon = True
self.recording_thread.start()
return True
except Exception as e:
print(f"Error starting recording: {e}")
self.recording = False
return False
def _record_audio(self) -> None:
"""Record audio in a separate thread while recording flag is True."""
try:
while self.recording:
data = self.stream.read(self.chunk, exception_on_overflow=False)
self.frames.append(data)
except Exception as e:
print(f"Error during recording: {e}")
self.recording = False
def stop_recording(self) -> None:
"""Stop recording and process the audio."""
if not self.recording:
return # Not recording
print("Stopping recording and processing audio...")
self.recording = False
# Wait for recording thread to finish
if self.recording_thread:
self.recording_thread.join(timeout=1.0)
# Close the stream
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
# Process the recorded audio
if self.frames:
threading.Thread(target=self._process_audio).start()
def _process_audio(self) -> None:
"""Process recorded audio and send to the cloud STT service."""
try:
if not self.frames:
print("No audio frames recorded.")
return
# Prepare WAV file in memory
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wf:
wf.setnchannels(self.channels)
wf.setsampwidth(self.p.get_sample_size(self.format))
wf.setframerate(self.rate)
wf.writeframes(b''.join(self.frames))
# Reset buffer position
wav_buffer.seek(0)
# Check if we have API key
if not self.api_key:
print("Error: No API key provided for STT service.")
return
# Send to API
headers = {"Authorization": f"Bearer {self.api_key}"}
files = {
"file": ("audio.wav", wav_buffer, "audio/wav")
}
data = {
"model": "whisper-1",
"language": self.language
}
print("Sending audio to STT service...")
response = requests.post(
self.api_endpoint,
headers=headers,
files=files,
data=data
)
if response.status_code == 200:
result = response.json()
text = result.get("text", "")
print(f"Recognized: {text}")
# Play back the recognized text via TTS
if text:
threading.Thread(target=self.speak_text, args=(text,)).start()
# Call callback with result if provided
if self.callback and text:
self.callback(text)
else:
print(f"Error: {response.status_code}, {response.text}")
except Exception as e:
print(f"Error processing audio: {e}")
def speak_text(self, text: str) -> None:
"""Convert text to speech and play it back on the audio device."""
try:
print("Converting text to speech...")
# Create a temporary file to store the TTS audio
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file:
temp_filename = temp_file.name
# Generate speech using gTTS
tts = gTTS(text=text, lang=self.language)
tts.save(temp_filename)
# Play the audio file
print("Playing TTS feedback...")
# Convert MP3 to WAV with matching sample rate
wav_file = self._convert_mp3_to_wav(temp_filename)
if not wav_file:
print("Failed to convert speech audio for playback")
return
# Open the audio file
wf = wave.open(wav_file, 'rb')
# Use the same sample rate we know works for the device
try:
output_stream = self.p.open(
format=self.p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=self.rate, # Use the known working sample rate
output=True,
output_device_index=self.device_index
)
except Exception as e:
print(f"Failed to open audio output stream: {e}")
# Try again with device default settings
device_info = self.p.get_device_info_by_index(self.device_index)
output_stream = self.p.open(
format=pyaudio.paInt16,
channels=self.channels,
rate=int(device_info['defaultSampleRate']),
output=True,
output_device_index=self.device_index
)
print(f"Using device default sample rate: {device_info['defaultSampleRate']}")
# Play the audio
chunk_size = 1024
data = wf.readframes(chunk_size)
while len(data) > 0:
output_stream.write(data)
data = wf.readframes(chunk_size)
# Clean up resources
output_stream.stop_stream()
output_stream.close()
wf.close()
# Remove temporary files
try:
os.unlink(temp_filename)
os.unlink(wav_file) # Also remove the WAV file
except Exception:
pass
except Exception as e:
print(f"Error generating or playing speech: {e}")
def _convert_mp3_to_wav(self, mp3_file: str) -> str:
"""Convert MP3 to WAV format with correct sample rate for PyAudio."""
try:
import subprocess
wav_file = mp3_file.replace('.mp3', '.wav')
# Use ffmpeg to convert MP3 to WAV with the same sample rate as the recording
# This ensures the device can play it back properly
subprocess.call(['ffmpeg', '-y', '-i', mp3_file, '-ar', str(self.rate),
'-ac', str(self.channels), wav_file],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
print(f"Converted MP3 to WAV with sample rate {self.rate} Hz")
return wav_file
except Exception as e:
print(f"Error converting MP3 to WAV: {e}")
return ""
def cleanup(self) -> None:
"""Clean up resources."""
if self.recording:
self.recording = False
if self.stream:
self.stream.stop_stream()
self.stream.close()
if self.p:
self.p.terminate()