""" MCP Server module for MIDI-to-Hue application. Provides MCP tools for controlling Philips Hue lights through an MCP server. """ from typing import List, Dict, Any, Optional import asyncio import time import re import colorsys from datetime import datetime, timedelta from mcp.server.fastmcp import FastMCP from hue_controller import HueController # Dictionary to store scheduled tasks scheduled_tasks = {} # Initialize FastMCP server mcp = FastMCP("hue_control") # Global reference to the HueController instance hue_controller = None def initialize_hue_controller(bridge_ip: str, update_interval: float = 0.05) -> None: """Initialize the global HueController instance. Args: bridge_ip: IP address of the Hue Bridge update_interval: Time between updates in seconds """ global hue_controller hue_controller = HueController(bridge_ip, update_interval) return hue_controller # Tier 1: Core Functionality (Essential) @mcp.tool() async def list_all_lights() -> str: """List all available Hue lights and their current states.""" try: if not hue_controller or not hue_controller.bridge: return "Error: Hue controller not initialized" lights = hue_controller.bridge.get_light_objects('name') result = [] for name, light in lights.items(): state = "ON" if light.on else "OFF" result.append(f"• {name}: {state}, Brightness: {light.brightness}/254, Reachable: {light.reachable}") return "\n".join(result) if result else "No lights found." except Exception as e: return f"Error listing lights: {e}" @mcp.tool() async def set_light_state(light_name: str, on: bool) -> str: """Turn a specific light on or off. Args: light_name: Name of the light to control on: True to turn on, False to turn off """ try: if not hue_controller: return "Error: Hue controller not initialized" hue_controller.update_light(light_name, "on", on) state = "on" if on else "off" return f"Successfully turned {light_name} {state}" except Exception as e: return f"Error setting light state: {e}" @mcp.tool() async def set_light_brightness(light_name: str, brightness: int) -> str: """Set the brightness of a specific light. Args: light_name: Name of the light to control brightness: Brightness value (0-254) """ try: if not hue_controller: return "Error: Hue controller not initialized" if brightness < 0 or brightness > 254: return "Brightness must be between 0 and 254" hue_controller.update_light(light_name, "bri", brightness) return f"Successfully set {light_name} brightness to {brightness}" except Exception as e: return f"Error setting brightness: {e}" @mcp.tool() async def flash_alert(light_name: str, flashes: int = 3, interval: float = 0.5) -> str: """Make a light blink/flash for notifications. Args: light_name: Name of the light to flash flashes: Number of times to flash the light (default: 3) interval: Time in seconds between flashes (default: 0.5) """ try: if not hue_controller: return "Error: Hue controller not initialized" light = hue_controller.get_light_by_name(light_name) if not light: return f"Light '{light_name}' not found" # Store original state original_state = light.on original_brightness = light.brightness # Flash the light for _ in range(flashes): hue_controller.update_light(light_name, "on", True) hue_controller.update_light(light_name, "bri", 254) # Full brightness await asyncio.sleep(interval) hue_controller.update_light(light_name, "on", False) await asyncio.sleep(interval) # Restore original state hue_controller.update_light(light_name, "on", original_state) if original_state: hue_controller.update_light(light_name, "bri", original_brightness) return f"Successfully flashed {light_name} {flashes} times" except Exception as e: return f"Error flashing light: {e}" # Tier 2: Common Controls (Very Important) @mcp.tool() async def set_light_color(light_name: str, color: str) -> str: """Set a light to a specific color using hex, RGB, or common names. Args: light_name: Name of the light to control color: Color in hex format (#RRGGBB), RGB format (rgb(r,g,b)), or common name (red, blue, green, etc.) """ try: if not hue_controller: return "Error: Hue controller not initialized" light = hue_controller.get_light_by_name(light_name) if not light: return f"Light '{light_name}' not found" # Convert color to hue/sat values based on format h, s, v = None, None, None # Hex color format (#RRGGBB) if color.startswith('#') and len(color) == 7: try: r = int(color[1:3], 16) / 255.0 g = int(color[3:5], 16) / 255.0 b = int(color[5:7], 16) / 255.0 h, s, v = colorsys.rgb_to_hsv(r, g, b) except ValueError: return f"Invalid hex color format: {color}" # RGB format (rgb(r,g,b)) elif color.lower().startswith('rgb(') and color.endswith(')'): try: rgb_values = re.search(r'rgb\((\d+),\s*(\d+),\s*(\d+)\)', color.lower()) if rgb_values: r = int(rgb_values.group(1)) / 255.0 g = int(rgb_values.group(2)) / 255.0 b = int(rgb_values.group(3)) / 255.0 h, s, v = colorsys.rgb_to_hsv(r, g, b) except (ValueError, AttributeError): return f"Invalid RGB color format: {color}" # Common color names else: color_map = { 'red': (0, 254), # hue, saturation 'green': (25500, 254), 'blue': (46920, 254), 'yellow': (12750, 254), 'orange': (8500, 254), 'purple': (50000, 254), 'pink': (56100, 254), 'white': (0, 0), 'warm': (9000, 100), # warm white 'cool': (37000, 50) # cool white } if color.lower() in color_map: hue_val, sat_val = color_map[color.lower()] # Update with direct values hue_controller.update_light(light_name, "hue", hue_val) hue_controller.update_light(light_name, "sat", sat_val) return f"Successfully set {light_name} to color {color}" else: return f"Unknown color name: {color}. Try a hex color (#RRGGBB) instead." if h is not None and s is not None: # Convert to Hue's color format hue_val = int(h * 65535) sat_val = int(s * 254) # Update the light hue_controller.update_light(light_name, "hue", hue_val) hue_controller.update_light(light_name, "sat", sat_val) return f"Successfully set {light_name} to color {color}" except Exception as e: return f"Error setting light color: {e}" @mcp.tool() async def schedule_lighting(light_name: str, parameter: str, value: Any, minutes_from_now: int, task_id: str = None) -> str: """Schedule a light change for a future time. Args: light_name: Name of the light to control parameter: Parameter to change (on, bri, hue, sat) value: Value to set for the parameter minutes_from_now: Minutes from now to execute the change task_id: Optional identifier for the task (defaults to auto-generated) """ try: if not hue_controller: return "Error: Hue controller not initialized" # Validate parameter valid_params = ["on", "bri", "hue", "sat", "ct", "xy"] if parameter not in valid_params: return f"Invalid parameter. Must be one of: {', '.join(valid_params)}" # Create a task ID if none provided if not task_id: task_id = f"{light_name}_{parameter}_{int(time.time())}" # Check if there's already a task with this ID if task_id in scheduled_tasks: return f"A task with ID '{task_id}' already exists. Choose a different ID or cancel the existing task." # Calculate the target time target_time = datetime.now() + timedelta(minutes=minutes_from_now) # Create and schedule the task async def scheduled_task(): # Calculate seconds to wait wait_seconds = (target_time - datetime.now()).total_seconds() if wait_seconds > 0: await asyncio.sleep(wait_seconds) # Apply the change try: hue_controller.update_light(light_name, parameter, value) print(f"Scheduled task executed: {light_name}.{parameter} = {value}") # Remove from scheduled tasks if task_id in scheduled_tasks: del scheduled_tasks[task_id] except Exception as e: print(f"Error in scheduled task: {e}") # Store and start the task task = asyncio.create_task(scheduled_task()) scheduled_tasks[task_id] = { "task": task, "light_name": light_name, "parameter": parameter, "value": value, "target_time": target_time } formatted_time = target_time.strftime("%H:%M:%S") return f"Scheduled change: {light_name}.{parameter} = {value} at {formatted_time} (Task ID: {task_id})" except Exception as e: return f"Error scheduling light change: {e}" @mcp.tool() async def list_scheduled_tasks() -> str: """List all scheduled lighting tasks.""" try: if not scheduled_tasks: return "No scheduled tasks found." result = ["Scheduled lighting tasks:"] for task_id, info in scheduled_tasks.items(): time_str = info["target_time"].strftime("%H:%M:%S") result.append(f"• ID: {task_id} - {info['light_name']}.{info['parameter']} = {info['value']} at {time_str}") return "\n".join(result) except Exception as e: return f"Error listing scheduled tasks: {e}" @mcp.tool() async def cancel_scheduled_task(task_id: str) -> str: """Cancel a scheduled lighting task. Args: task_id: ID of the task to cancel """ try: if task_id not in scheduled_tasks: return f"No task found with ID '{task_id}'" # Cancel the task task_info = scheduled_tasks[task_id] task_info["task"].cancel() del scheduled_tasks[task_id] return f"Successfully cancelled task: {task_id}" except Exception as e: return f"Error cancelling task: {e}" @mcp.tool() async def group_control(group_name: str = None, parameter: str = None, value: Any = None, light_names: List[str] = None) -> str: """Control multiple lights as a group. Args: group_name: Optional name of predefined group (e.g., room name) parameter: Parameter to change (on, bri, hue, sat) value: Value to set for the parameter light_names: Optional list of specific light names to control as a group """ try: if not hue_controller: return "Error: Hue controller not initialized" # Get lights to control target_lights = [] # If group_name is provided, try to find it in bridge if group_name: try: # Get groups from bridge groups = hue_controller.bridge.get_group() group_found = False for group_id, group_data in groups.items(): if group_data['name'].lower() == group_name.lower(): # Found the group, get its lights group = hue_controller.bridge.get_group(int(group_id), 'lights') light_ids = group['lights'] # Convert light IDs to names all_lights = hue_controller.bridge.get_light_objects('id') for light_id in light_ids: light = all_lights.get(int(light_id)) if light: target_lights.append(light.name) group_found = True break if not group_found: return f"Group '{group_name}' not found. Available groups: {', '.join([g['name'] for g in groups.values()])}" except Exception as e: return f"Error finding group '{group_name}': {e}" # If light_names is provided, use those instead/additionally if light_names: target_lights.extend(light_names) # Remove duplicates target_lights = list(set(target_lights)) if not target_lights: return "No lights to control. Provide either group_name or light_names." # If parameter and value are provided, update all target lights if parameter and value is not None: results = [] for light_name in target_lights: try: hue_controller.update_light(light_name, parameter, value) results.append(f"Updated {light_name}") except Exception as e: results.append(f"Failed to update {light_name}: {e}") return f"Group control results:\n" + "\n".join(results) else: return f"Available lights in group: {', '.join(target_lights)}" except Exception as e: return f"Error in group control: {e}" @mcp.tool() async def get_light_info(light_name: str) -> str: """Get detailed information about a specific light. Args: light_name: Name of the light to query """ try: if not hue_controller: return "Error: Hue controller not initialized" light = hue_controller.get_light_by_name(light_name) if not light: return f"Light '{light_name}' not found" # Get all light details from the bridge light_details = {} for light_id, light_data in hue_controller.bridge.get_light().items(): if light_data['name'] == light_name: light_details = light_data break # Format the details if light_details: state = light_details.get('state', {}) result = [ f"Light: {light_name}", f"Type: {light_details.get('type', 'Unknown')}", f"Model ID: {light_details.get('modelid', 'Unknown')}", f"Manufacturer: {light_details.get('manufacturername', 'Unknown')}", f"Software Version: {light_details.get('swversion', 'Unknown')}", "\nCurrent State:", f" On: {state.get('on', 'Unknown')}", f" Brightness: {state.get('bri', 'N/A')}/254", f" Hue: {state.get('hue', 'N/A')}/65535", f" Saturation: {state.get('sat', 'N/A')}/254", f" Color Temperature: {state.get('ct', 'N/A')} mired", f" Reachable: {state.get('reachable', 'Unknown')}" ] return "\n".join(result) else: return f"Could not retrieve detailed information for light '{light_name}'" except Exception as e: return f"Error getting light information: {e}" # Add more tools here as development continues... def main(bridge_ip: str = None, update_interval: float = 0.05) -> None: """ Run the MCP server. Args: bridge_ip: IP address of the Hue Bridge (optional, can be set from config) update_interval: Time between updates in seconds (default: 0.05) """ # Import here to avoid circular import from config import ConfigManager if not bridge_ip: # If no bridge IP is provided, try to get it from config config = ConfigManager() bridge_ip = config.get_value("bridge_ip") if not bridge_ip: print("Error: No bridge IP provided and none found in config") return # Initialize the Hue controller try: print(f"Initializing Hue controller with bridge IP: {bridge_ip}") initialize_hue_controller(bridge_ip, update_interval) print("Hue controller initialized successfully") except Exception as e: print(f"Error initializing Hue controller: {e}") return # Run the MCP server print("Starting MCP server...") mcp.run() if __name__ == "__main__": main()