201 lines
6.6 KiB
Python
201 lines
6.6 KiB
Python
"""
|
|
Speech-to-Text module for MIDI-to-Hue application.
|
|
Handles audio recording and cloud-based speech recognition.
|
|
"""
|
|
import os
|
|
import time
|
|
import threading
|
|
import requests
|
|
import io
|
|
import wave
|
|
import pyaudio
|
|
import numpy as np
|
|
from typing import Optional, Dict, Any, Callable
|
|
|
|
|
|
class SpeechToText:
|
|
"""Manages audio recording and speech-to-text processing."""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
"""
|
|
Initialize the Speech-to-Text controller.
|
|
|
|
Args:
|
|
config: Configuration dictionary with STT settings
|
|
"""
|
|
self.config = config
|
|
self.recording = False
|
|
self.stream = None
|
|
self.frames = []
|
|
self.p = None
|
|
self.recording_thread = None
|
|
self.callback = None
|
|
|
|
# Audio settings from config or defaults
|
|
self.format = pyaudio.paInt16
|
|
self.channels = 1
|
|
self.rate = 16000
|
|
self.chunk = 4096
|
|
self.device_index = config.get("audio_device_index", 1)
|
|
self.language = config.get("language", "de")
|
|
self.api_key = config.get("api_key", "")
|
|
self.api_endpoint = config.get("api_endpoint", "https://api.openai.com/v1/audio/transcriptions")
|
|
|
|
def initialize(self) -> bool:
|
|
"""Initialize PyAudio and check if the device is available."""
|
|
try:
|
|
self.p = pyaudio.PyAudio()
|
|
device_count = self.p.get_device_count()
|
|
|
|
if self.device_index >= device_count:
|
|
print(f"Error: Audio device index {self.device_index} out of range.")
|
|
print(f"Available devices: {device_count}")
|
|
return False
|
|
|
|
# Get device info to display
|
|
device_info = self.p.get_device_info_by_index(self.device_index)
|
|
print(f"Using audio device: {device_info['name']}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error initializing audio: {e}")
|
|
return False
|
|
|
|
def set_callback(self, callback: Callable[[str], None]) -> None:
|
|
"""Set callback function to receive transcription results."""
|
|
self.callback = callback
|
|
|
|
def start_recording(self) -> bool:
|
|
"""Start recording audio when MIDI button is pressed."""
|
|
if self.recording:
|
|
return False # Already recording
|
|
|
|
try:
|
|
print("Starting audio recording...")
|
|
self.recording = True
|
|
self.frames = []
|
|
|
|
# Open audio stream
|
|
self.stream = self.p.open(
|
|
format=self.format,
|
|
channels=self.channels,
|
|
rate=self.rate,
|
|
input=True,
|
|
input_device_index=self.device_index,
|
|
frames_per_buffer=self.chunk
|
|
)
|
|
|
|
# Start recording thread
|
|
self.recording_thread = threading.Thread(target=self._record_audio)
|
|
self.recording_thread.daemon = True
|
|
self.recording_thread.start()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error starting recording: {e}")
|
|
self.recording = False
|
|
return False
|
|
|
|
def _record_audio(self) -> None:
|
|
"""Record audio in a separate thread while recording flag is True."""
|
|
try:
|
|
while self.recording:
|
|
data = self.stream.read(self.chunk, exception_on_overflow=False)
|
|
self.frames.append(data)
|
|
|
|
except Exception as e:
|
|
print(f"Error during recording: {e}")
|
|
self.recording = False
|
|
|
|
def stop_recording(self) -> None:
|
|
"""Stop recording and process the audio."""
|
|
if not self.recording:
|
|
return # Not recording
|
|
|
|
print("Stopping recording and processing audio...")
|
|
self.recording = False
|
|
|
|
# Wait for recording thread to finish
|
|
if self.recording_thread:
|
|
self.recording_thread.join(timeout=1.0)
|
|
|
|
# Close the stream
|
|
if self.stream:
|
|
self.stream.stop_stream()
|
|
self.stream.close()
|
|
self.stream = None
|
|
|
|
# Process the recorded audio
|
|
if self.frames:
|
|
threading.Thread(target=self._process_audio).start()
|
|
|
|
def _process_audio(self) -> None:
|
|
"""Process recorded audio and send to the cloud STT service."""
|
|
try:
|
|
if not self.frames:
|
|
print("No audio frames recorded.")
|
|
return
|
|
|
|
# Prepare WAV file in memory
|
|
wav_buffer = io.BytesIO()
|
|
with wave.open(wav_buffer, 'wb') as wf:
|
|
wf.setnchannels(self.channels)
|
|
wf.setsampwidth(self.p.get_sample_size(self.format))
|
|
wf.setframerate(self.rate)
|
|
wf.writeframes(b''.join(self.frames))
|
|
|
|
# Reset buffer position
|
|
wav_buffer.seek(0)
|
|
|
|
# Check if we have API key
|
|
if not self.api_key:
|
|
print("Error: No API key provided for STT service.")
|
|
return
|
|
|
|
# Send to API
|
|
headers = {"Authorization": f"Bearer {self.api_key}"}
|
|
|
|
files = {
|
|
"file": ("audio.wav", wav_buffer, "audio/wav")
|
|
}
|
|
|
|
data = {
|
|
"model": "whisper-1",
|
|
"language": self.language
|
|
}
|
|
|
|
print("Sending audio to STT service...")
|
|
response = requests.post(
|
|
self.api_endpoint,
|
|
headers=headers,
|
|
files=files,
|
|
data=data
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
text = result.get("text", "")
|
|
|
|
print(f"Recognized: {text}")
|
|
|
|
# Call callback with result if provided
|
|
if self.callback and text:
|
|
self.callback(text)
|
|
else:
|
|
print(f"Error: {response.status_code}, {response.text}")
|
|
|
|
except Exception as e:
|
|
print(f"Error processing audio: {e}")
|
|
|
|
def cleanup(self) -> None:
|
|
"""Clean up resources."""
|
|
if self.recording:
|
|
self.recording = False
|
|
|
|
if self.stream:
|
|
self.stream.stop_stream()
|
|
self.stream.close()
|
|
|
|
if self.p:
|
|
self.p.terminate()
|