From d83cf8ce00fa007828e3321837945a36aadeb31c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20H=C3=A4usler?= Date: Mon, 30 Jun 2025 01:52:10 +0200 Subject: [PATCH] featsss: initial commit --- README.md | 54 +++++++++++ __init__.py | 10 ++ config.py | 81 ++++++++++++++++ hue_controller.py | 132 ++++++++++++++++++++++++++ main.py | 75 +++++++++++++++ mapper.py | 163 ++++++++++++++++++++++++++++++++ midi_controller.py | 231 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 746 insertions(+) create mode 100644 README.md create mode 100644 __init__.py create mode 100644 config.py create mode 100644 hue_controller.py create mode 100644 main.py create mode 100644 mapper.py create mode 100644 midi_controller.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..04bbd2f --- /dev/null +++ b/README.md @@ -0,0 +1,54 @@ +# MIDI-to-Hue Controller + +A modular Python application for controlling Philips Hue lights with MIDI controllers. + +## Features + +- Map MIDI notes and control changes to Hue light parameters +- Throttled updates to prevent overwhelming the Hue Bridge +- Configurable mappings via JSON +- LED feedback support for controller visualization +- Customizable value transformations + +## Structure + +This application has been refactored to follow modular design principles: + +- `config.py` - Configuration management module +- `hue_controller.py` - Philips Hue bridge and light control +- `midi_controller.py` - MIDI device management +- `mapper.py` - Mapping logic between MIDI and Hue +- `main.py` - Application entry point + +## Usage + +1. Connect your MIDI device +2. Edit `midi_hue_config.json` to configure your mappings +3. Run `python main.py` + +## Configuration + +The default configuration file (`midi_hue_config.json`) contains: + +```json +{ + "midi_device": 1, + "update_interval_ms": 50, + "bridge_ip": "192.168.178.35", + "mappings": [ + { + "midi_channel": 5, + "midi_control": 1, + "light_name": "Zimmer Decke", + "parameter": "bri", + "value_transform": "value * 2" + }, + ... + ] +} +``` + +## Dependencies + +- mido +- phue diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..0cc9d75 --- /dev/null +++ b/__init__.py @@ -0,0 +1,10 @@ +""" +MIDI-to-Hue package. +Allows controlling Philips Hue lights with MIDI controllers. +""" +from .config import ConfigManager +from .hue_controller import HueController, ThrottledUpdater +from .midi_controller import MidiController, DeviceMappingManager +from .mapper import MidiToHueMapper + +__version__ = '1.0.0' diff --git a/config.py b/config.py new file mode 100644 index 0000000..62ec41b --- /dev/null +++ b/config.py @@ -0,0 +1,81 @@ +""" +Configuration module for MIDI-to-Hue application. +Handles loading, creating and validating configuration. +""" +import os +import json +from typing import Dict, Any, Optional + +# Default config file path +DEFAULT_CONFIG_FILE = 'midi_hue_config.json' + +# Default MIDI-to-Hue mapping if no config file exists +DEFAULT_CONFIG = { + "midi_device": 1, # Index of MIDI device to use (from available inputs list) + "update_interval_ms": 50, # Throttling interval in milliseconds + "bridge_ip": "192.168.178.35", # Default Bridge IP + "mappings": [ + { + "midi_channel": 5, + "midi_control": 1, + "light_name": "Zimmer Decke", + "parameter": "bri", + "value_transform": "value * 2" # MIDI range 0-127 to Hue brightness 0-254 + }, + { + "midi_channel": 6, + "midi_control": 1, + "light_name": "Fernseher links", + "parameter": "bri", + "value_transform": "value * 2" # MIDI range 0-127 to Hue brightness 0-254 + }, + ] +} + +class ConfigManager: + """Manages configuration loading, validation and access.""" + + def __init__(self, config_file: str = DEFAULT_CONFIG_FILE): + """Initialize the config manager with a specific config file.""" + self.config_file = config_file + self.config = self._load_or_create_config() + + def _load_or_create_config(self) -> Dict[str, Any]: + """Load config from file or create with defaults if it doesn't exist.""" + try: + if os.path.exists(self.config_file): + with open(self.config_file, 'r') as f: + config = json.load(f) + print(f"Loaded configuration from {self.config_file}") + return config + except Exception as e: + print(f"Error loading config file: {e}") + + # If we get here, either the file doesn't exist or there was an error + # Create the default config file + with open(self.config_file, 'w') as f: + json.dump(DEFAULT_CONFIG, f, indent=4) + print(f"Created default configuration file at {self.config_file}") + + return DEFAULT_CONFIG + + def get(self, key: str, default: Any = None) -> Any: + """Get a configuration value by key with a default fallback.""" + return self.config.get(key, default) + + def get_mappings(self) -> list: + """Get all mappings from the configuration.""" + return self.config.get("mappings", []) + + def get_bridge_ip(self) -> str: + """Get the Bridge IP address.""" + return self.config.get("bridge_ip", "192.168.178.35") + + def get_update_interval_sec(self) -> float: + """Get the update interval in seconds.""" + update_interval_ms = self.config.get("update_interval_ms", 50) + return update_interval_ms / 1000 # Convert to seconds + + def get_midi_device_index(self) -> int: + """Get the MIDI device index.""" + return self.config.get("midi_device", 1) diff --git a/hue_controller.py b/hue_controller.py new file mode 100644 index 0000000..5e56045 --- /dev/null +++ b/hue_controller.py @@ -0,0 +1,132 @@ +""" +Hue Controller module for MIDI-to-Hue application. +Manages connections to the Hue Bridge and light controls. +""" +from phue import Bridge +import time +from threading import Timer +from typing import Dict, Any, List, Optional + +class ThrottledUpdater: + """ + Updates Hue lights with rate limiting to prevent overwhelming the bridge. + Collects and consolidates multiple rapid updates into fewer bridge calls. + """ + def __init__(self, bridge, update_interval: float = 0.05): + """ + Initialize the throttled updater. + + Args: + bridge: The Hue Bridge object + update_interval: Time between updates in seconds + """ + self.bridge = bridge + self.update_interval = update_interval + self.last_update_time = 0 + self.pending_updates = {} # {light_name: {param: value, ...}, ...} + self.update_timer = None + print(f"ThrottledUpdater initialized with interval: {update_interval * 1000:.0f}ms") + + def schedule_update(self, light_name: str, param: str, value: Any) -> None: + """ + Schedule an update for a specific light and parameter. + + Args: + light_name: Name of the light to update + param: Parameter to update (e.g., 'bri', 'hue', 'on') + value: New value for the parameter + """ + # Store the most recent value for this light and parameter + if light_name not in self.pending_updates: + self.pending_updates[light_name] = {} + self.pending_updates[light_name][param] = value + + current_time = time.time() + time_since_last_update = current_time - self.last_update_time + + # If no timer is active and it's been longer than our interval, update immediately + if self.update_timer is None and time_since_last_update >= self.update_interval: + self._perform_updates() + # Otherwise, ensure we have a timer scheduled + elif self.update_timer is None: + # Calculate remaining time until next update + remaining_time = max(0, self.update_interval - time_since_last_update) + self.update_timer = Timer(remaining_time, self._perform_updates) + self.update_timer.start() + + def _perform_updates(self) -> None: + """Apply all pending updates to the Hue bridge.""" + if self.update_timer: + self.update_timer.cancel() + self.update_timer = None + + # Apply all pending updates + for light_name, params in self.pending_updates.items(): + for param, value in params.items(): + try: + self.bridge.set_light(light_name, param, value) + print(f"Updated {light_name} {param} = {value}") + except Exception as e: + print(f"Error updating light {light_name}: {e}") + + # Clear pending updates and update timestamp + self.pending_updates = {} + self.last_update_time = time.time() + + +class HueController: + """Manages interactions with the Philips Hue system.""" + + def __init__(self, bridge_ip: str, update_interval: float = 0.05): + """ + Initialize the Hue controller. + + Args: + bridge_ip: IP address of the Hue Bridge + update_interval: Time between updates in seconds + """ + self.bridge_ip = bridge_ip + self.bridge = None + self.updater = None + self.update_interval = update_interval + self.connect() + + def connect(self) -> None: + """Connect to the Hue Bridge and initialize the updater.""" + try: + self.bridge = Bridge(self.bridge_ip) + self.bridge.connect() + self.updater = ThrottledUpdater(self.bridge, self.update_interval) + print(f"Connected to Hue Bridge at {self.bridge_ip}") + except Exception as e: + print(f"Failed to connect to Hue Bridge: {e}") + raise + + def list_lights(self) -> None: + """Print available lights for reference.""" + print("Available Hue lights:") + for light in self.bridge.lights: + print(f" {light.name} - Current brightness: {light.brightness}") + + def get_light_by_name(self, light_name: str): + """Get a light object by its name.""" + try: + return self.bridge.get_light_objects('name')[light_name] + except KeyError: + print(f"Light '{light_name}' not found.") + return None + + def update_light(self, light_name: str, parameter: str, value: Any) -> None: + """ + Schedule an update for a light parameter. + + Args: + light_name: Name of the light to update + parameter: Parameter to update (e.g., 'bri', 'hue', 'on') + value: New value for the parameter + """ + if value is None: + print("Skipping update due to None value") + return + + self.updater.schedule_update(light_name, parameter, value) diff --git a/main.py b/main.py new file mode 100644 index 0000000..d294427 --- /dev/null +++ b/main.py @@ -0,0 +1,75 @@ +""" +Main module for MIDI-to-Hue application. +Ties together the config, MIDI controller, Hue controller, and mapper. +""" +import sys +import time +from config import ConfigManager +from hue_controller import HueController +from midi_controller import MidiController, DeviceMappingManager +from mapper import MidiToHueMapper + +def main(): + """Main application entry point.""" + # Load configuration + config_manager = ConfigManager() + + # Initialize Hue controller + try: + hue_controller = HueController( + config_manager.get_bridge_ip(), + config_manager.get_update_interval_sec() + ) + hue_controller.list_lights() + except Exception as e: + print(f"Failed to initialize Hue controller: {e}") + return 1 + + # Initialize MIDI controller + midi_controller = MidiController(config_manager.get_midi_device_index()) + midi_controller.list_devices() + + # Setup device mappings + device_mappings = { + "traktor_kontrol_s2": { + "note_on/1/3": 'left_deck_1', + "note_on/1/4": 'left_deck_2', + "note_on/1/5": 'left_deck_3', + "note_on/1/6": 'left_deck_4', + "note_on/1/7": 'left_deck_5', + "note_on/1/8": 'left_deck_6', + "note_on/1/9": 'left_deck_7', + "note_on/1/10": 'left_deck_8', + "control_change/5/1": 'left_volume_slider', + "control_change/6/1": 'right_volume_slider', + } + } + device_mapper = DeviceMappingManager(device_mappings) + device_mapper.set_active_device("traktor_kontrol_s2") + + # Bind the device mapper to the MIDI controller + midi_controller.get_input_name = device_mapper.get_input_name + + # Create MIDI-to-Hue mapper with configuration + mapper = MidiToHueMapper( + hue_controller, + midi_controller, + config_manager.get_mappings() + ) + + # Open MIDI port and process messages + if not midi_controller.open(): + print("Failed to open MIDI port.") + return 1 + + try: + midi_controller.process_messages() + except KeyboardInterrupt: + print("\nExiting program...") + finally: + midi_controller.close() + + return 0 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/mapper.py b/mapper.py new file mode 100644 index 0000000..00136b7 --- /dev/null +++ b/mapper.py @@ -0,0 +1,163 @@ +""" +Mapper module for MIDI-to-Hue application. +Handles the mapping logic between MIDI messages and Hue light controls. +""" +from typing import Dict, Any, Optional +import mido + +class MidiToHueMapper: + """Maps MIDI messages to Hue light commands according to configuration.""" + + def __init__(self, hue_controller, midi_controller, mappings: list): + """ + Initialize the MIDI-to-Hue mapper. + + Args: + hue_controller: HueController instance + midi_controller: MidiController instance + mappings: List of mapping dictionaries from config + """ + self.hue_controller = hue_controller + self.midi_controller = midi_controller + self.mappings = mappings + self.last_note_hit_light_name = None + + # Register as a handler for MIDI messages + self.midi_controller.register_handler(self.process_midi_message) + + def process_midi_message(self, msg) -> None: + """ + Process a MIDI message and update lights if it matches any mappings. + + Args: + msg: MIDI message object from mido + """ + # Extract MIDI message properties + if not hasattr(msg, 'type') or not hasattr(msg, 'channel'): + return # Skip messages without required attributes + + # Extract value (could be in different attributes depending on message type) + default_value = None + if hasattr(msg, 'value'): + default_value = msg.value + elif hasattr(msg, 'velocity'): + default_value = msg.velocity + + # Get the input name for this message if available + input_name = None + if hasattr(self.midi_controller, 'get_input_name'): + input_name = self.midi_controller.get_input_name(msg) + + # Process message against our mappings + for mapping in self.mappings: + # Match by input_name if available, otherwise fall back to channel/control matching + if input_name and 'input_name' in mapping: + if input_name != mapping['input_name']: + continue + elif msg.type == "note_on" and 'midi_channel' in mapping and 'midi_control' in mapping: + if msg.channel != mapping['midi_channel'] or (hasattr(msg, 'note') and msg.note != mapping['midi_control']): + continue + elif msg.type == "control_change" and 'midi_channel' in mapping and 'midi_control' in mapping: + if msg.channel != mapping['midi_channel'] or (hasattr(msg, 'control') and msg.control != mapping['midi_control']): + continue + else: + continue # No match criteria found + + # Determine the light name + light_name = self._resolve_light_name(mapping, msg) + if light_name is None: + print("No light name specified, skipping update.") + continue + + # Update last note hit for potential use in value_transform + if msg.type == "note_on": + self.last_note_hit_light_name = light_name + + # Verify the light exists + light = self.hue_controller.get_light_by_name(light_name) + if not light: + continue + + # Apply value transformation + transformed_value = self._transform_value(mapping, msg, light, default_value) + if transformed_value is None: + print("Skipping update due to None value") + continue + + # Update LED feedback if needed + if msg.type == "note_on" and mapping.get('parameter') == 'on': + self._send_led_feedback(msg, transformed_value) + + # Schedule the light update + self.hue_controller.update_light( + light_name, + mapping['parameter'], + transformed_value + ) + + print(f"MIDI: {msg} → {light_name}.{mapping['parameter']} = {transformed_value}") + + def _resolve_light_name(self, mapping: Dict[str, Any], msg) -> Optional[str]: + """ + Resolve the light name based on mapping configuration. + + Args: + mapping: Mapping configuration dictionary + msg: MIDI message object + + Returns: + Resolved light name or None if not resolved + """ + if "light_name_eval" in mapping: + try: + return eval(mapping['light_name_eval'], { + 'msg': msg, + 'last_note_hit_light_name': self.last_note_hit_light_name, + }) + except Exception as e: + print(f"Error evaluating light_name_eval: {e}") + return None + return mapping.get('light_name') + + def _transform_value(self, mapping: Dict[str, Any], msg, light, default_value: Any) -> Any: + """ + Transform a MIDI value according to mapping configuration. + + Args: + mapping: Mapping configuration dictionary + msg: MIDI message object + light: Light object from hue controller + default_value: Default value from MIDI message + + Returns: + Transformed value or None if transformation failed + """ + if 'value_transform' not in mapping: + return default_value + + try: + return eval(mapping['value_transform'], { + 'value': default_value, + 'msg': msg, + 'light': light, + }) + except Exception as e: + print(f"Error transforming value: {e}") + return default_value + + def _send_led_feedback(self, msg, value: Any) -> None: + """ + Send LED feedback to the MIDI device. + + Args: + msg: Original MIDI message + value: Transformed value (usually boolean) + """ + try: + led_on = mido.Message('note_on', + note=msg.note, + velocity=127 if value else 0, + channel=msg.channel) + self.midi_controller.send_message(led_on) + except Exception as e: + print(f"Error sending LED feedback: {e}") diff --git a/midi_controller.py b/midi_controller.py new file mode 100644 index 0000000..e04a900 --- /dev/null +++ b/midi_controller.py @@ -0,0 +1,231 @@ +""" +MIDI Controller module for MIDI-to-Hue application. +Handles MIDI device connections, message processing, and event handling. +""" +import mido +import time +import threading +from typing import Dict, List, Callable, Optional, Any, Union + + +class MidiController: + """Manages MIDI device connections and message handling.""" + + def __init__(self, midi_device_index: int = 1, reconnect_interval: int = 5): + """ + Initialize the MIDI controller. + + Args: + midi_device_index: Index of the MIDI device to use + reconnect_interval: Seconds between reconnection attempts (if connection lost) + """ + self.midi_device_index = midi_device_index + self.reconnect_interval = reconnect_interval + self.port = None + self.input_names = [] + self.output_names = [] + self.ioport_names = [] + self.message_handlers = [] + self._running = False + self._reconnect_thread = None + self._connection_healthy = False + + # Initialize MIDI + self._initialize() + + def _initialize(self) -> None: + """Initialize MIDI ports and discover available devices.""" + # Get available MIDI inputs/outputs + self.input_names = mido.get_input_names() + self.output_names = mido.get_output_names() + self.ioport_names = mido.get_ioport_names() + + def list_devices(self) -> None: + """Print available MIDI devices.""" + print("\nAvailable MIDI inputs:") + for i, name in enumerate(self.input_names): + print(f" {i}: {name}") + + print("\nAvailable MIDI outputs:") + for i, name in enumerate(self.output_names): + print(f" {i}: {name}") + + print("\nAvailable MIDI I/O ports:") + for i, name in enumerate(self.ioport_names): + print(f" {i}: {name}") + + def open(self) -> bool: + """Open the configured MIDI port.""" + try: + if not self.input_names: + print("No MIDI inputs found. Please connect a MIDI device.") + return False + + if len(self.input_names) <= self.midi_device_index: + print(f"Error: MIDI device index {self.midi_device_index} out of range.") + print(f"Available devices: {len(self.input_names)}") + return False + + selected_input = self.input_names[self.midi_device_index] + print(f"\nOpening MIDI input: {selected_input}") + self.port = mido.open_ioport(selected_input) + self._connection_healthy = True + return True + + except Exception as e: + print(f"Error opening MIDI device: {e}") + self._connection_healthy = False + return False + + def close(self) -> None: + """Close the MIDI port if open.""" + self._running = False + if self._reconnect_thread and self._reconnect_thread.is_alive(): + self._reconnect_thread.join(timeout=1.0) + + if self.port: + self.port.close() + self.port = None + + self._connection_healthy = False + + def register_handler(self, handler: Callable) -> None: + """Register a function to handle incoming MIDI messages.""" + self.message_handlers.append(handler) + + def send_message(self, msg) -> None: + """Send a MIDI message through the current port.""" + if self.port: + self.port.send(msg) + else: + print("Error: No MIDI port open") + + def _reconnect_monitor(self) -> None: + """ + Monitor thread that attempts to reconnect if the MIDI connection is lost. + """ + while self._running: + if not self._connection_healthy and not self.port: + print("MIDI connection lost. Attempting to reconnect...") + # Refresh available devices + self._initialize() + # Try to open the port again + if self.open(): + print("Successfully reconnected to MIDI device!") + else: + print(f"Reconnect failed. Retrying in {self.reconnect_interval} seconds...") + + # Wait before checking again + time.sleep(self.reconnect_interval) + + def process_messages(self) -> None: + """Start processing MIDI messages, calling registered handlers.""" + if not self.port: + print("Error: No MIDI port open") + return + + # Start the reconnect monitor thread + self._running = True + self._reconnect_thread = threading.Thread( + target=self._reconnect_monitor, + daemon=True + ) + self._reconnect_thread.start() + + print("Waiting for MIDI messages... Press Ctrl+C to exit.") + + while self._running: + try: + if not self.port: + time.sleep(0.5) # If port is None, wait a bit + continue + + # Non-blocking way to check for messages + msg = self.port.poll() + if msg: + print(f"Received: {msg}") + # Call all registered handlers + for handler in self.message_handlers: + handler(msg) + # Connection is healthy since we received a message + self._connection_healthy = True + else: + # Small sleep to prevent CPU hogging in the loop + time.sleep(0.001) + + except (IOError, AttributeError) as e: + # These errors typically indicate a connection problem + print(f"MIDI connection error: {e}") + self._connection_healthy = False + if self.port: + try: + self.port.close() + except: + pass + self.port = None + time.sleep(1) # Wait a bit before continuing the loop + + except KeyboardInterrupt: + print("\nExiting MIDI processing...") + self._running = False + break + + except Exception as e: + print(f"Error processing MIDI messages: {e}") + + +class DeviceMappingManager: + """Manages device-specific MIDI mappings and message types.""" + + def __init__(self, device_mappings: Dict[str, Dict[str, str]] = None): + """ + Initialize with device mappings. + + Args: + device_mappings: Dictionary mapping devices to their control mappings + """ + self.device_mappings = device_mappings or {} + self.connected_device = None + + def set_active_device(self, device_name: str) -> bool: + """ + Set the active MIDI device mapping. + + Args: + device_name: Name of the device to activate + + Returns: + True if device was found and set, False otherwise + """ + if device_name in self.device_mappings: + self.connected_device = self.device_mappings[device_name] + print(f"Activated mapping for device: {device_name}") + return True + else: + print(f"No mapping found for device: {device_name}") + self.connected_device = {} + return False + + def get_input_name(self, msg) -> Optional[str]: + """ + Get the mapped input name for a MIDI message. + + Args: + msg: MIDI message object + + Returns: + Mapped input name or None if no mapping found + """ + if not self.connected_device or not hasattr(msg, 'type') or not hasattr(msg, 'channel'): + return None + + message_key = None + if msg.type == "note_on" and hasattr(msg, 'note'): + message_key = f"note_on/{msg.channel}/{msg.note}" + elif msg.type == "control_change" and hasattr(msg, 'control'): + message_key = f"control_change/{msg.channel}/{msg.control}" + + if message_key and message_key in self.connected_device: + return self.connected_device[message_key] + + return None