ship-controller/midi_controller.py
2025-06-30 01:52:10 +02:00

231 lines
8.2 KiB
Python

"""
MIDI Controller module for MIDI-to-Hue application.
Handles MIDI device connections, message processing, and event handling.
"""
import mido
import time
import threading
from typing import Dict, List, Callable, Optional, Any, Union
class MidiController:
"""Manages MIDI device connections and message handling."""
def __init__(self, midi_device_index: int = 1, reconnect_interval: int = 5):
"""
Initialize the MIDI controller.
Args:
midi_device_index: Index of the MIDI device to use
reconnect_interval: Seconds between reconnection attempts (if connection lost)
"""
self.midi_device_index = midi_device_index
self.reconnect_interval = reconnect_interval
self.port = None
self.input_names = []
self.output_names = []
self.ioport_names = []
self.message_handlers = []
self._running = False
self._reconnect_thread = None
self._connection_healthy = False
# Initialize MIDI
self._initialize()
def _initialize(self) -> None:
"""Initialize MIDI ports and discover available devices."""
# Get available MIDI inputs/outputs
self.input_names = mido.get_input_names()
self.output_names = mido.get_output_names()
self.ioport_names = mido.get_ioport_names()
def list_devices(self) -> None:
"""Print available MIDI devices."""
print("\nAvailable MIDI inputs:")
for i, name in enumerate(self.input_names):
print(f" {i}: {name}")
print("\nAvailable MIDI outputs:")
for i, name in enumerate(self.output_names):
print(f" {i}: {name}")
print("\nAvailable MIDI I/O ports:")
for i, name in enumerate(self.ioport_names):
print(f" {i}: {name}")
def open(self) -> bool:
"""Open the configured MIDI port."""
try:
if not self.input_names:
print("No MIDI inputs found. Please connect a MIDI device.")
return False
if len(self.input_names) <= self.midi_device_index:
print(f"Error: MIDI device index {self.midi_device_index} out of range.")
print(f"Available devices: {len(self.input_names)}")
return False
selected_input = self.input_names[self.midi_device_index]
print(f"\nOpening MIDI input: {selected_input}")
self.port = mido.open_ioport(selected_input)
self._connection_healthy = True
return True
except Exception as e:
print(f"Error opening MIDI device: {e}")
self._connection_healthy = False
return False
def close(self) -> None:
"""Close the MIDI port if open."""
self._running = False
if self._reconnect_thread and self._reconnect_thread.is_alive():
self._reconnect_thread.join(timeout=1.0)
if self.port:
self.port.close()
self.port = None
self._connection_healthy = False
def register_handler(self, handler: Callable) -> None:
"""Register a function to handle incoming MIDI messages."""
self.message_handlers.append(handler)
def send_message(self, msg) -> None:
"""Send a MIDI message through the current port."""
if self.port:
self.port.send(msg)
else:
print("Error: No MIDI port open")
def _reconnect_monitor(self) -> None:
"""
Monitor thread that attempts to reconnect if the MIDI connection is lost.
"""
while self._running:
if not self._connection_healthy and not self.port:
print("MIDI connection lost. Attempting to reconnect...")
# Refresh available devices
self._initialize()
# Try to open the port again
if self.open():
print("Successfully reconnected to MIDI device!")
else:
print(f"Reconnect failed. Retrying in {self.reconnect_interval} seconds...")
# Wait before checking again
time.sleep(self.reconnect_interval)
def process_messages(self) -> None:
"""Start processing MIDI messages, calling registered handlers."""
if not self.port:
print("Error: No MIDI port open")
return
# Start the reconnect monitor thread
self._running = True
self._reconnect_thread = threading.Thread(
target=self._reconnect_monitor,
daemon=True
)
self._reconnect_thread.start()
print("Waiting for MIDI messages... Press Ctrl+C to exit.")
while self._running:
try:
if not self.port:
time.sleep(0.5) # If port is None, wait a bit
continue
# Non-blocking way to check for messages
msg = self.port.poll()
if msg:
print(f"Received: {msg}")
# Call all registered handlers
for handler in self.message_handlers:
handler(msg)
# Connection is healthy since we received a message
self._connection_healthy = True
else:
# Small sleep to prevent CPU hogging in the loop
time.sleep(0.001)
except (IOError, AttributeError) as e:
# These errors typically indicate a connection problem
print(f"MIDI connection error: {e}")
self._connection_healthy = False
if self.port:
try:
self.port.close()
except:
pass
self.port = None
time.sleep(1) # Wait a bit before continuing the loop
except KeyboardInterrupt:
print("\nExiting MIDI processing...")
self._running = False
break
except Exception as e:
print(f"Error processing MIDI messages: {e}")
class DeviceMappingManager:
"""Manages device-specific MIDI mappings and message types."""
def __init__(self, device_mappings: Dict[str, Dict[str, str]] = None):
"""
Initialize with device mappings.
Args:
device_mappings: Dictionary mapping devices to their control mappings
"""
self.device_mappings = device_mappings or {}
self.connected_device = None
def set_active_device(self, device_name: str) -> bool:
"""
Set the active MIDI device mapping.
Args:
device_name: Name of the device to activate
Returns:
True if device was found and set, False otherwise
"""
if device_name in self.device_mappings:
self.connected_device = self.device_mappings[device_name]
print(f"Activated mapping for device: {device_name}")
return True
else:
print(f"No mapping found for device: {device_name}")
self.connected_device = {}
return False
def get_input_name(self, msg) -> Optional[str]:
"""
Get the mapped input name for a MIDI message.
Args:
msg: MIDI message object
Returns:
Mapped input name or None if no mapping found
"""
if not self.connected_device or not hasattr(msg, 'type') or not hasattr(msg, 'channel'):
return None
message_key = None
if msg.type == "note_on" and hasattr(msg, 'note'):
message_key = f"note_on/{msg.channel}/{msg.note}"
elif msg.type == "control_change" and hasattr(msg, 'control'):
message_key = f"control_change/{msg.channel}/{msg.control}"
if message_key and message_key in self.connected_device:
return self.connected_device[message_key]
return None