""" Speech-to-Text module for MIDI-to-Hue application. Handles audio recording and cloud-based speech recognition. """ import os import time import threading import requests import io import wave import pyaudio import numpy as np from typing import Optional, Dict, Any, Callable from gtts import gTTS import tempfile class SpeechToText: """Manages audio recording and speech-to-text processing.""" def __init__(self, config: Dict[str, Any]): """ Initialize the Speech-to-Text controller. Args: config: Configuration dictionary with STT settings """ self.config = config self.recording = False self.stream = None self.frames = [] self.p = None self.recording_thread = None self.callback = None # Audio settings from config or defaults self.format = pyaudio.paInt16 self.channels = 1 self.rate = 44100 self.chunk = 4096 self.device_index = config.get("audio_device_index", 1) self.language = config.get("language", "de") self.api_key = config.get("api_key", "") self.api_endpoint = config.get("api_endpoint", "https://api.openai.com/v1/audio/transcriptions") def initialize(self) -> bool: """Initialize PyAudio and check if the device is available.""" try: self.p = pyaudio.PyAudio() device_count = self.p.get_device_count() if self.device_index >= device_count: print(f"Error: Audio device index {self.device_index} out of range.") print(f"Available devices: {device_count}") return False # Get device info to display device_info = self.p.get_device_info_by_index(self.device_index) print(f"Using audio device: {device_info['name']}") return True except Exception as e: print(f"Error initializing audio: {e}") return False def set_callback(self, callback: Callable[[str], None]) -> None: """Set callback function to receive transcription results.""" self.callback = callback def start_recording(self) -> bool: """Start recording audio when MIDI button is pressed.""" if self.recording: return False # Already recording try: print("Starting audio recording...") self.recording = True self.frames = [] # Open audio stream self.stream = self.p.open( format=self.format, channels=self.channels, rate=self.rate, input=True, input_device_index=self.device_index, frames_per_buffer=self.chunk ) # Start recording thread self.recording_thread = threading.Thread(target=self._record_audio) self.recording_thread.daemon = True self.recording_thread.start() return True except Exception as e: print(f"Error starting recording: {e}") self.recording = False return False def _record_audio(self) -> None: """Record audio in a separate thread while recording flag is True.""" try: while self.recording: data = self.stream.read(self.chunk, exception_on_overflow=False) self.frames.append(data) except Exception as e: print(f"Error during recording: {e}") self.recording = False def stop_recording(self) -> None: """Stop recording and process the audio.""" if not self.recording: return # Not recording print("Stopping recording and processing audio...") self.recording = False # Wait for recording thread to finish if self.recording_thread: self.recording_thread.join(timeout=1.0) # Close the stream if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None # Process the recorded audio if self.frames: threading.Thread(target=self._process_audio).start() def _process_audio(self) -> None: """Process recorded audio and send to the cloud STT service.""" try: if not self.frames: print("No audio frames recorded.") return # Prepare WAV file in memory wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wf: wf.setnchannels(self.channels) wf.setsampwidth(self.p.get_sample_size(self.format)) wf.setframerate(self.rate) wf.writeframes(b''.join(self.frames)) # Reset buffer position wav_buffer.seek(0) # Check if we have API key if not self.api_key: print("Error: No API key provided for STT service.") return # Send to API headers = {"Authorization": f"Bearer {self.api_key}"} files = { "file": ("audio.wav", wav_buffer, "audio/wav") } data = { "model": "whisper-1", "language": self.language } print("Sending audio to STT service...") response = requests.post( self.api_endpoint, headers=headers, files=files, data=data ) if response.status_code == 200: result = response.json() text = result.get("text", "") print(f"Recognized: {text}") # Play back the recognized text via TTS if text: threading.Thread(target=self.speak_text, args=(text,)).start() # Call callback with result if provided if self.callback and text: self.callback(text) else: print(f"Error: {response.status_code}, {response.text}") except Exception as e: print(f"Error processing audio: {e}") def speak_text(self, text: str) -> None: """Convert text to speech and play it back on the audio device.""" try: print("Converting text to speech...") # Create a temporary file to store the TTS audio with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file: temp_filename = temp_file.name # Generate speech using gTTS tts = gTTS(text=text, lang=self.language) tts.save(temp_filename) # Play the audio file print("Playing TTS feedback...") # Convert MP3 to WAV with matching sample rate wav_file = self._convert_mp3_to_wav(temp_filename) if not wav_file: print("Failed to convert speech audio for playback") return # Open the audio file wf = wave.open(wav_file, 'rb') # Use the same sample rate we know works for the device try: output_stream = self.p.open( format=self.p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=self.rate, # Use the known working sample rate output=True, output_device_index=self.device_index ) except Exception as e: print(f"Failed to open audio output stream: {e}") # Try again with device default settings device_info = self.p.get_device_info_by_index(self.device_index) output_stream = self.p.open( format=pyaudio.paInt16, channels=self.channels, rate=int(device_info['defaultSampleRate']), output=True, output_device_index=self.device_index ) print(f"Using device default sample rate: {device_info['defaultSampleRate']}") # Play the audio with larger buffer for smoother playback # Using larger chunk size and adding a small delay to allow the buffer to fill chunk_size = 8192 # Increased from 1024 to reduce stuttering # Pre-buffer data for smoother playback audio_data = [] while True: data = wf.readframes(chunk_size) if len(data) == 0: break audio_data.append(data) # Reset file position for playback wf.rewind() # Set a larger buffer size for output buffer_size = chunk_size * 4 print(f"Playing audio with buffer size {buffer_size} bytes") # Play the buffered data with lower CPU load for data in audio_data: output_stream.write(data) # Small sleep to reduce CPU load and allow buffer to process time.sleep(0.005) # Clean up resources output_stream.stop_stream() output_stream.close() wf.close() # Remove temporary files try: os.unlink(temp_filename) os.unlink(wav_file) # Also remove the WAV file except Exception: pass except Exception as e: print(f"Error generating or playing speech: {e}") def _convert_mp3_to_wav(self, mp3_file: str) -> str: """Convert MP3 to WAV format with correct sample rate for PyAudio.""" try: import subprocess wav_file = mp3_file.replace('.mp3', '.wav') # Use ffmpeg to convert MP3 to WAV with the same sample rate as the recording # This ensures the device can play it back properly subprocess.call(['ffmpeg', '-y', '-i', mp3_file, '-ar', str(self.rate), '-ac', str(self.channels), wav_file], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) print(f"Converted MP3 to WAV with sample rate {self.rate} Hz") return wav_file except Exception as e: print(f"Error converting MP3 to WAV: {e}") return "" def cleanup(self) -> None: """Clean up resources.""" if self.recording: self.recording = False if self.stream: self.stream.stop_stream() self.stream.close() if self.p: self.p.terminate()