278 lines
9.6 KiB
Python
278 lines
9.6 KiB
Python
"""
|
|
Speech-to-Text module for MIDI-to-Hue application.
|
|
Handles audio recording and cloud-based speech recognition.
|
|
"""
|
|
import os
|
|
import time
|
|
import threading
|
|
import requests
|
|
import io
|
|
import wave
|
|
import pyaudio
|
|
import numpy as np
|
|
from typing import Optional, Dict, Any, Callable
|
|
from gtts import gTTS
|
|
import tempfile
|
|
import pygame # For smoother audio playback
|
|
|
|
|
|
class SpeechToText:
|
|
"""Manages audio recording and speech-to-text processing."""
|
|
|
|
def __init__(self, config: Dict[str, Any]):
|
|
"""
|
|
Initialize the Speech-to-Text controller.
|
|
|
|
Args:
|
|
config: Configuration dictionary with STT settings
|
|
"""
|
|
self.config = config
|
|
self.recording = False
|
|
self.stream = None
|
|
self.frames = []
|
|
self.p = None
|
|
self.recording_thread = None
|
|
self.callback = None
|
|
|
|
# Initialize pygame for audio playback
|
|
if not pygame.get_init():
|
|
try:
|
|
pygame.init()
|
|
except Exception as e:
|
|
print(f"Warning: Failed to initialize pygame: {e}")
|
|
|
|
# Audio settings from config or defaults
|
|
self.format = pyaudio.paInt16
|
|
self.channels = 1
|
|
self.rate = 44100
|
|
self.chunk = 4096
|
|
self.device_index = config.get("audio_device_index", 1)
|
|
self.language = config.get("language", "de")
|
|
self.api_key = config.get("api_key", "")
|
|
self.api_endpoint = config.get("api_endpoint", "https://api.openai.com/v1/audio/transcriptions")
|
|
|
|
def initialize(self) -> bool:
|
|
"""Initialize PyAudio and check if the device is available."""
|
|
try:
|
|
self.p = pyaudio.PyAudio()
|
|
device_count = self.p.get_device_count()
|
|
|
|
if self.device_index >= device_count:
|
|
print(f"Error: Audio device index {self.device_index} out of range.")
|
|
print(f"Available devices: {device_count}")
|
|
return False
|
|
|
|
# Get device info to display
|
|
device_info = self.p.get_device_info_by_index(self.device_index)
|
|
print(f"Using audio device: {device_info['name']}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error initializing audio: {e}")
|
|
return False
|
|
|
|
def set_callback(self, callback: Callable[[str], None]) -> None:
|
|
"""Set callback function to receive transcription results."""
|
|
self.callback = callback
|
|
|
|
def start_recording(self) -> bool:
|
|
"""Start recording audio when MIDI button is pressed."""
|
|
if self.recording:
|
|
return False # Already recording
|
|
|
|
try:
|
|
print("Starting audio recording...")
|
|
self.recording = True
|
|
self.frames = []
|
|
|
|
# Open audio stream
|
|
self.stream = self.p.open(
|
|
format=self.format,
|
|
channels=self.channels,
|
|
rate=self.rate,
|
|
input=True,
|
|
input_device_index=self.device_index,
|
|
frames_per_buffer=self.chunk
|
|
)
|
|
|
|
# Start recording thread
|
|
self.recording_thread = threading.Thread(target=self._record_audio)
|
|
self.recording_thread.daemon = True
|
|
self.recording_thread.start()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error starting recording: {e}")
|
|
self.recording = False
|
|
return False
|
|
|
|
def _record_audio(self) -> None:
|
|
"""Record audio in a separate thread while recording flag is True."""
|
|
try:
|
|
while self.recording:
|
|
data = self.stream.read(self.chunk, exception_on_overflow=False)
|
|
self.frames.append(data)
|
|
|
|
except Exception as e:
|
|
print(f"Error during recording: {e}")
|
|
self.recording = False
|
|
|
|
def stop_recording(self) -> None:
|
|
"""Stop recording and process the audio."""
|
|
if not self.recording:
|
|
return # Not recording
|
|
|
|
print("Stopping recording and processing audio...")
|
|
self.recording = False
|
|
|
|
# Wait for recording thread to finish
|
|
if self.recording_thread:
|
|
self.recording_thread.join(timeout=1.0)
|
|
|
|
# Close the stream
|
|
if self.stream:
|
|
self.stream.stop_stream()
|
|
self.stream.close()
|
|
self.stream = None
|
|
|
|
# Process the recorded audio
|
|
if self.frames:
|
|
threading.Thread(target=self._process_audio).start()
|
|
|
|
def _process_audio(self) -> None:
|
|
"""Process recorded audio and send to the cloud STT service."""
|
|
try:
|
|
if not self.frames:
|
|
print("No audio frames recorded.")
|
|
return
|
|
|
|
# Prepare WAV file in memory
|
|
wav_buffer = io.BytesIO()
|
|
with wave.open(wav_buffer, 'wb') as wf:
|
|
wf.setnchannels(self.channels)
|
|
wf.setsampwidth(self.p.get_sample_size(self.format))
|
|
wf.setframerate(self.rate)
|
|
wf.writeframes(b''.join(self.frames))
|
|
|
|
# Reset buffer position
|
|
wav_buffer.seek(0)
|
|
|
|
# Check if we have API key
|
|
if not self.api_key:
|
|
print("Error: No API key provided for STT service.")
|
|
return
|
|
|
|
# Send to API
|
|
headers = {"Authorization": f"Bearer {self.api_key}"}
|
|
|
|
files = {
|
|
"file": ("audio.wav", wav_buffer, "audio/wav")
|
|
}
|
|
|
|
data = {
|
|
"model": "whisper-1",
|
|
"language": self.language
|
|
}
|
|
|
|
print("Sending audio to STT service...")
|
|
response = requests.post(
|
|
self.api_endpoint,
|
|
headers=headers,
|
|
files=files,
|
|
data=data
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
text = result.get("text", "")
|
|
|
|
print(f"Recognized: {text}")
|
|
|
|
# Play back the recognized text via TTS
|
|
if text:
|
|
threading.Thread(target=self.speak_text, args=(text,)).start()
|
|
|
|
# Call callback with result if provided
|
|
if self.callback and text:
|
|
self.callback(text)
|
|
else:
|
|
print(f"Error: {response.status_code}, {response.text}")
|
|
|
|
except Exception as e:
|
|
print(f"Error processing audio: {e}")
|
|
|
|
def speak_text(self, text: str) -> None:
|
|
"""Convert text to speech and play it back using pygame mixer (smoother playback)."""
|
|
try:
|
|
# Initialize pygame mixer if not already done
|
|
if not pygame.get_init():
|
|
pygame.mixer.init(frequency=self.rate, channels=self.channels)
|
|
|
|
print("speaking text: ", text)
|
|
|
|
print("Converting text to speech...")
|
|
# Create a temporary file to store the TTS audio
|
|
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file:
|
|
temp_filename = temp_file.name
|
|
|
|
# Generate speech using gTTS
|
|
tts = gTTS(text=text, lang=self.language)
|
|
tts.save(temp_filename)
|
|
|
|
print("Playing TTS feedback...")
|
|
|
|
try:
|
|
# Use pygame mixer for smoother playback
|
|
pygame.mixer.music.set_volume(1.0)
|
|
pygame.mixer.music.load(temp_filename)
|
|
pygame.mixer.music.play()
|
|
|
|
# Wait for playback to finish
|
|
while pygame.mixer.music.get_busy():
|
|
# Using a short sleep to not consume CPU
|
|
pygame.time.wait(100) # Wait 100ms between checks
|
|
|
|
print("TTS playback completed")
|
|
|
|
except Exception as e:
|
|
print(f"Error during pygame playback: {e}")
|
|
# Fall back to ffplay for playback
|
|
self._play_with_ffplay(temp_filename)
|
|
|
|
# Remove temporary file
|
|
try:
|
|
os.unlink(temp_filename)
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as e:
|
|
print(f"Error generating or playing speech: {e}")
|
|
|
|
|
|
|
|
def _play_with_ffplay(self, audio_file: str) -> None:
|
|
"""Play audio file using ffplay as a fallback method."""
|
|
try:
|
|
print("Trying ffplay fallback playback...")
|
|
import subprocess
|
|
# The -nodisp flag disables the graphical window
|
|
# -autoexit will close ffplay when playback finishes
|
|
subprocess.call(['ffplay', '-nodisp', '-autoexit', audio_file],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL)
|
|
print("ffplay playback completed")
|
|
except Exception as e:
|
|
print(f"Error using ffplay for playback: {e}")
|
|
|
|
def cleanup(self) -> None:
|
|
"""Clean up resources."""
|
|
if self.recording:
|
|
self.recording = False
|
|
|
|
if self.stream:
|
|
self.stream.stop_stream()
|
|
self.stream.close()
|
|
|
|
if self.p:
|
|
self.p.terminate()
|