fix: undo reconnection, doesnt really work yet.

This commit is contained in:
Jan Häusler 2025-06-30 02:08:43 +02:00
parent d83cf8ce00
commit 4cfd750afc

View file

@ -3,32 +3,25 @@ MIDI Controller module for MIDI-to-Hue application.
Handles MIDI device connections, message processing, and event handling.
"""
import mido
import time
import threading
from typing import Dict, List, Callable, Optional, Any, Union
class MidiController:
"""Manages MIDI device connections and message handling."""
def __init__(self, midi_device_index: int = 1, reconnect_interval: int = 5):
def __init__(self, midi_device_index: int = 1):
"""
Initialize the MIDI controller.
Args:
midi_device_index: Index of the MIDI device to use
reconnect_interval: Seconds between reconnection attempts (if connection lost)
"""
self.midi_device_index = midi_device_index
self.reconnect_interval = reconnect_interval
self.port = None
self.input_names = []
self.output_names = []
self.ioport_names = []
self.message_handlers = []
self._running = False
self._reconnect_thread = None
self._connection_healthy = False
# Initialize MIDI
self._initialize()
@ -69,25 +62,17 @@ class MidiController:
selected_input = self.input_names[self.midi_device_index]
print(f"\nOpening MIDI input: {selected_input}")
self.port = mido.open_ioport(selected_input)
self._connection_healthy = True
return True
except Exception as e:
print(f"Error opening MIDI device: {e}")
self._connection_healthy = False
return False
def close(self) -> None:
"""Close the MIDI port if open."""
self._running = False
if self._reconnect_thread and self._reconnect_thread.is_alive():
self._reconnect_thread.join(timeout=1.0)
if self.port:
self.port.close()
self.port = None
self._connection_healthy = False
def register_handler(self, handler: Callable) -> None:
"""Register a function to handle incoming MIDI messages."""
@ -100,78 +85,23 @@ class MidiController:
else:
print("Error: No MIDI port open")
def _reconnect_monitor(self) -> None:
"""
Monitor thread that attempts to reconnect if the MIDI connection is lost.
"""
while self._running:
if not self._connection_healthy and not self.port:
print("MIDI connection lost. Attempting to reconnect...")
# Refresh available devices
self._initialize()
# Try to open the port again
if self.open():
print("Successfully reconnected to MIDI device!")
else:
print(f"Reconnect failed. Retrying in {self.reconnect_interval} seconds...")
# Wait before checking again
time.sleep(self.reconnect_interval)
def process_messages(self) -> None:
"""Start processing MIDI messages, calling registered handlers."""
if not self.port:
print("Error: No MIDI port open")
return
# Start the reconnect monitor thread
self._running = True
self._reconnect_thread = threading.Thread(
target=self._reconnect_monitor,
daemon=True
)
self._reconnect_thread.start()
print("Waiting for MIDI messages... Press Ctrl+C to exit.")
while self._running:
try:
if not self.port:
time.sleep(0.5) # If port is None, wait a bit
continue
# Non-blocking way to check for messages
msg = self.port.poll()
if msg:
print(f"Received: {msg}")
# Call all registered handlers
for handler in self.message_handlers:
handler(msg)
# Connection is healthy since we received a message
self._connection_healthy = True
else:
# Small sleep to prevent CPU hogging in the loop
time.sleep(0.001)
except (IOError, AttributeError) as e:
# These errors typically indicate a connection problem
print(f"MIDI connection error: {e}")
self._connection_healthy = False
if self.port:
try:
self.port.close()
except:
pass
self.port = None
time.sleep(1) # Wait a bit before continuing the loop
except KeyboardInterrupt:
print("\nExiting MIDI processing...")
self._running = False
break
except Exception as e:
print(f"Error processing MIDI messages: {e}")
try:
for msg in self.port:
print(f"Received: {msg}")
# Call all registered handlers
for handler in self.message_handlers:
handler(msg)
except KeyboardInterrupt:
print("\nExiting MIDI processing...")
except Exception as e:
print(f"Error processing MIDI messages: {e}")
class DeviceMappingManager: