From 004e6c17df188f05700b20fb21b3ab4982fc6e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=A4usler?= Date: Mon, 30 Jun 2025 03:40:36 +0200 Subject: [PATCH] feat: add speech to text --- config.py | 26 ++++++ main.py | 57 ++++++++++++- speech_to_text.py | 201 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 speech_to_text.py diff --git a/config.py b/config.py index 62ec41b..4fc427c 100644 --- a/config.py +++ b/config.py @@ -79,3 +79,29 @@ class ConfigManager: def get_midi_device_index(self) -> int: """Get the MIDI device index.""" return self.config.get("midi_device", 1) + + def get_stt_config(self) -> Dict[str, Any]: + """Get the speech-to-text configuration.""" + default_stt_config = { + "enabled": False, + "audio_device_index": 1, + "language": "de", + "api_key": "", + "api_endpoint": "https://api.openai.com/v1/audio/transcriptions", + "midi_trigger": { + "channel": 1, + "note": 1, + "type": "note_on" + } + } + return self.config.get("speech_to_text", default_stt_config) + + def is_stt_enabled(self) -> bool: + """Check if speech-to-text is enabled in the configuration.""" + stt_config = self.get_stt_config() + return stt_config.get("enabled", False) + + def get_stt_midi_trigger(self) -> Dict[str, Any]: + """Get MIDI trigger configuration for speech-to-text.""" + stt_config = self.get_stt_config() + return stt_config.get("midi_trigger", {}) diff --git a/main.py b/main.py index 6eae141..93de487 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,6 @@ """ Main module for MIDI-to-Hue application. -Ties together the config, MIDI controller, Hue controller, mapper, and animations. +Ties together the config, MIDI controller, Hue controller, mapper, animations, and speech-to-text. """ import sys import time @@ -11,6 +11,7 @@ from hue_controller import HueController from midi_controller import MidiController, DeviceMappingManager from mapper import MidiToHueMapper from animations import AnimationManager, MidiLedAnimation +from speech_to_text import SpeechToText def main(): """Main application entry point.""" @@ -20,6 +21,14 @@ def main(): # Create animation manager animation_manager = AnimationManager() + # Initialize speech-to-text if enabled + stt = None + if config_manager.is_stt_enabled(): + print("Initializing speech-to-text...") + stt = SpeechToText(config_manager.get_stt_config()) + if not stt.initialize(): + print("Warning: Failed to initialize speech-to-text.") + # Initialize Hue controller try: hue_controller = HueController( @@ -64,6 +73,46 @@ def main(): # Bind the device mapper to the MIDI controller midi_controller.get_input_name = device_mapper.get_input_name + # Set up speech-to-text MIDI handling if enabled + if stt: + midi_trigger = config_manager.get_stt_midi_trigger() + stt_trigger_type = midi_trigger.get("type", "note_on") + stt_channel = midi_trigger.get("channel", 1) + stt_note = midi_trigger.get("note", 1) + + print(f"Speech-to-text trigger: {stt_trigger_type}/{stt_channel}/{stt_note}") + + # Define STT result callback + def stt_result_callback(text): + print(f"\nSpeech recognition result: {text}\n") + + # Set result callback + stt.set_callback(stt_result_callback) + + # Register MIDI handler for speech-to-text + def handle_stt_midi(msg): + # Check if message matches our trigger + if hasattr(msg, 'type') and msg.type == stt_trigger_type and hasattr(msg, 'channel') and msg.channel == stt_channel: + if stt_trigger_type == "note_on" and hasattr(msg, 'note') and msg.note == stt_note: + # For note_on messages, check velocity to determine press/release + if hasattr(msg, 'velocity') and msg.velocity > 0: + print("\nStarting speech recognition (button pressed)...") + stt.start_recording() + else: + print("\nStopping speech recognition (button released)...") + stt.stop_recording() + elif stt_trigger_type == "control_change" and hasattr(msg, 'control') and msg.control == stt_note: + # For control_change messages, use value threshold + if hasattr(msg, 'value') and msg.value > 64: + print("\nStarting speech recognition...") + stt.start_recording() + else: + print("\nStopping speech recognition...") + stt.stop_recording() + + # Register the handler + midi_controller.register_handler(handle_stt_midi) + # Create MIDI-to-Hue mapper with configuration mapper = MidiToHueMapper( hue_controller, @@ -95,6 +144,9 @@ def main(): print("\nStopping animations and exiting...") animation_manager.stop_all() midi_controller.close() + # Clean up STT resources + if stt: + stt.cleanup() sys.exit(0) # Register signal handler for Ctrl+C @@ -108,6 +160,9 @@ def main(): finally: animation_manager.stop_all() midi_controller.close() + # Clean up STT resources + if stt: + stt.cleanup() return 0 diff --git a/speech_to_text.py b/speech_to_text.py new file mode 100644 index 0000000..cb768d9 --- /dev/null +++ b/speech_to_text.py @@ -0,0 +1,201 @@ +""" +Speech-to-Text module for MIDI-to-Hue application. +Handles audio recording and cloud-based speech recognition. +""" +import os +import time +import threading +import requests +import io +import wave +import pyaudio +import numpy as np +from typing import Optional, Dict, Any, Callable + + +class SpeechToText: + """Manages audio recording and speech-to-text processing.""" + + def __init__(self, config: Dict[str, Any]): + """ + Initialize the Speech-to-Text controller. + + Args: + config: Configuration dictionary with STT settings + """ + self.config = config + self.recording = False + self.stream = None + self.frames = [] + self.p = None + self.recording_thread = None + self.callback = None + + # Audio settings from config or defaults + self.format = pyaudio.paInt16 + self.channels = 1 + self.rate = 16000 + self.chunk = 4096 + self.device_index = config.get("audio_device_index", 1) + self.language = config.get("language", "de") + self.api_key = config.get("api_key", "") + self.api_endpoint = config.get("api_endpoint", "https://api.openai.com/v1/audio/transcriptions") + + def initialize(self) -> bool: + """Initialize PyAudio and check if the device is available.""" + try: + self.p = pyaudio.PyAudio() + device_count = self.p.get_device_count() + + if self.device_index >= device_count: + print(f"Error: Audio device index {self.device_index} out of range.") + print(f"Available devices: {device_count}") + return False + + # Get device info to display + device_info = self.p.get_device_info_by_index(self.device_index) + print(f"Using audio device: {device_info['name']}") + return True + + except Exception as e: + print(f"Error initializing audio: {e}") + return False + + def set_callback(self, callback: Callable[[str], None]) -> None: + """Set callback function to receive transcription results.""" + self.callback = callback + + def start_recording(self) -> bool: + """Start recording audio when MIDI button is pressed.""" + if self.recording: + return False # Already recording + + try: + print("Starting audio recording...") + self.recording = True + self.frames = [] + + # Open audio stream + self.stream = self.p.open( + format=self.format, + channels=self.channels, + rate=self.rate, + input=True, + input_device_index=self.device_index, + frames_per_buffer=self.chunk + ) + + # Start recording thread + self.recording_thread = threading.Thread(target=self._record_audio) + self.recording_thread.daemon = True + self.recording_thread.start() + + return True + + except Exception as e: + print(f"Error starting recording: {e}") + self.recording = False + return False + + def _record_audio(self) -> None: + """Record audio in a separate thread while recording flag is True.""" + try: + while self.recording: + data = self.stream.read(self.chunk, exception_on_overflow=False) + self.frames.append(data) + + except Exception as e: + print(f"Error during recording: {e}") + self.recording = False + + def stop_recording(self) -> None: + """Stop recording and process the audio.""" + if not self.recording: + return # Not recording + + print("Stopping recording and processing audio...") + self.recording = False + + # Wait for recording thread to finish + if self.recording_thread: + self.recording_thread.join(timeout=1.0) + + # Close the stream + if self.stream: + self.stream.stop_stream() + self.stream.close() + self.stream = None + + # Process the recorded audio + if self.frames: + threading.Thread(target=self._process_audio).start() + + def _process_audio(self) -> None: + """Process recorded audio and send to the cloud STT service.""" + try: + if not self.frames: + print("No audio frames recorded.") + return + + # Prepare WAV file in memory + wav_buffer = io.BytesIO() + with wave.open(wav_buffer, 'wb') as wf: + wf.setnchannels(self.channels) + wf.setsampwidth(self.p.get_sample_size(self.format)) + wf.setframerate(self.rate) + wf.writeframes(b''.join(self.frames)) + + # Reset buffer position + wav_buffer.seek(0) + + # Check if we have API key + if not self.api_key: + print("Error: No API key provided for STT service.") + return + + # Send to API + headers = {"Authorization": f"Bearer {self.api_key}"} + + files = { + "file": ("audio.wav", wav_buffer, "audio/wav") + } + + data = { + "model": "whisper-1", + "language": self.language + } + + print("Sending audio to STT service...") + response = requests.post( + self.api_endpoint, + headers=headers, + files=files, + data=data + ) + + if response.status_code == 200: + result = response.json() + text = result.get("text", "") + + print(f"Recognized: {text}") + + # Call callback with result if provided + if self.callback and text: + self.callback(text) + else: + print(f"Error: {response.status_code}, {response.text}") + + except Exception as e: + print(f"Error processing audio: {e}") + + def cleanup(self) -> None: + """Clean up resources.""" + if self.recording: + self.recording = False + + if self.stream: + self.stream.stop_stream() + self.stream.close() + + if self.p: + self.p.terminate()