465 lines
17 KiB
Python
465 lines
17 KiB
Python
"""
|
|
MCP Server module for MIDI-to-Hue application.
|
|
Provides MCP tools for controlling Philips Hue lights through an MCP server.
|
|
"""
|
|
from typing import List, Dict, Any, Optional
|
|
import asyncio
|
|
import time
|
|
import re
|
|
import colorsys
|
|
from datetime import datetime, timedelta
|
|
from mcp.server.fastmcp import FastMCP
|
|
from hue_controller import HueController
|
|
|
|
# Dictionary to store scheduled tasks
|
|
scheduled_tasks = {}
|
|
|
|
# Initialize FastMCP server
|
|
mcp = FastMCP("hue_control")
|
|
|
|
# Global reference to the HueController instance
|
|
hue_controller = None
|
|
|
|
def initialize_hue_controller(bridge_ip: str, update_interval: float = 0.05) -> None:
|
|
"""Initialize the global HueController instance.
|
|
|
|
Args:
|
|
bridge_ip: IP address of the Hue Bridge
|
|
update_interval: Time between updates in seconds
|
|
"""
|
|
global hue_controller
|
|
hue_controller = HueController(bridge_ip, update_interval)
|
|
return hue_controller
|
|
|
|
|
|
# Tier 1: Core Functionality (Essential)
|
|
|
|
@mcp.tool()
|
|
async def list_all_lights() -> str:
|
|
"""List all available Hue lights and their current states."""
|
|
try:
|
|
if not hue_controller or not hue_controller.bridge:
|
|
return "Error: Hue controller not initialized"
|
|
|
|
lights = hue_controller.bridge.get_light_objects('name')
|
|
result = []
|
|
|
|
for name, light in lights.items():
|
|
state = "ON" if light.on else "OFF"
|
|
result.append(f"• {name}: {state}, Brightness: {light.brightness}/254, Reachable: {light.reachable}")
|
|
|
|
return "\n".join(result) if result else "No lights found."
|
|
except Exception as e:
|
|
return f"Error listing lights: {e}"
|
|
|
|
@mcp.tool()
|
|
async def set_light_state(light_name: str, on: bool) -> str:
|
|
"""Turn a specific light on or off.
|
|
|
|
Args:
|
|
light_name: Name of the light to control
|
|
on: True to turn on, False to turn off
|
|
"""
|
|
try:
|
|
if not hue_controller:
|
|
return "Error: Hue controller not initialized"
|
|
|
|
hue_controller.update_light(light_name, "on", on)
|
|
state = "on" if on else "off"
|
|
return f"Successfully turned {light_name} {state}"
|
|
except Exception as e:
|
|
return f"Error setting light state: {e}"
|
|
|
|
@mcp.tool()
|
|
async def set_light_brightness(light_name: str, brightness: int) -> str:
|
|
"""Set the brightness of a specific light.
|
|
|
|
Args:
|
|
light_name: Name of the light to control
|
|
brightness: Brightness value (0-254)
|
|
"""
|
|
try:
|
|
if not hue_controller:
|
|
return "Error: Hue controller not initialized"
|
|
|
|
if brightness < 0 or brightness > 254:
|
|
return "Brightness must be between 0 and 254"
|
|
|
|
hue_controller.update_light(light_name, "bri", brightness)
|
|
return f"Successfully set {light_name} brightness to {brightness}"
|
|
except Exception as e:
|
|
return f"Error setting brightness: {e}"
|
|
|
|
@mcp.tool()
|
|
async def flash_alert(light_name: str, flashes: int = 3, interval: float = 0.5) -> str:
|
|
"""Make a light blink/flash for notifications.
|
|
|
|
Args:
|
|
light_name: Name of the light to flash
|
|
flashes: Number of times to flash the light (default: 3)
|
|
interval: Time in seconds between flashes (default: 0.5)
|
|
"""
|
|
try:
|
|
if not hue_controller:
|
|
return "Error: Hue controller not initialized"
|
|
|
|
light = hue_controller.get_light_by_name(light_name)
|
|
if not light:
|
|
return f"Light '{light_name}' not found"
|
|
|
|
# Store original state
|
|
original_state = light.on
|
|
original_brightness = light.brightness
|
|
|
|
# Flash the light
|
|
for _ in range(flashes):
|
|
hue_controller.update_light(light_name, "on", True)
|
|
hue_controller.update_light(light_name, "bri", 254) # Full brightness
|
|
await asyncio.sleep(interval)
|
|
hue_controller.update_light(light_name, "on", False)
|
|
await asyncio.sleep(interval)
|
|
|
|
# Restore original state
|
|
hue_controller.update_light(light_name, "on", original_state)
|
|
if original_state:
|
|
hue_controller.update_light(light_name, "bri", original_brightness)
|
|
|
|
return f"Successfully flashed {light_name} {flashes} times"
|
|
except Exception as e:
|
|
return f"Error flashing light: {e}"
|
|
|
|
# Tier 2: Common Controls (Very Important)
|
|
|
|
@mcp.tool()
|
|
async def set_light_color(light_name: str, color: str) -> str:
|
|
"""Set a light to a specific color using hex, RGB, or common names.
|
|
|
|
Args:
|
|
light_name: Name of the light to control
|
|
color: Color in hex format (#RRGGBB), RGB format (rgb(r,g,b)),
|
|
or common name (red, blue, green, etc.)
|
|
"""
|
|
try:
|
|
if not hue_controller:
|
|
return "Error: Hue controller not initialized"
|
|
|
|
light = hue_controller.get_light_by_name(light_name)
|
|
if not light:
|
|
return f"Light '{light_name}' not found"
|
|
|
|
# Convert color to hue/sat values based on format
|
|
h, s, v = None, None, None
|
|
|
|
# Hex color format (#RRGGBB)
|
|
if color.startswith('#') and len(color) == 7:
|
|
try:
|
|
r = int(color[1:3], 16) / 255.0
|
|
g = int(color[3:5], 16) / 255.0
|
|
b = int(color[5:7], 16) / 255.0
|
|
h, s, v = colorsys.rgb_to_hsv(r, g, b)
|
|
except ValueError:
|
|
return f"Invalid hex color format: {color}"
|
|
|
|
# RGB format (rgb(r,g,b))
|
|
elif color.lower().startswith('rgb(') and color.endswith(')'):
|
|
try:
|
|
rgb_values = re.search(r'rgb\((\d+),\s*(\d+),\s*(\d+)\)', color.lower())
|
|
if rgb_values:
|
|
r = int(rgb_values.group(1)) / 255.0
|
|
g = int(rgb_values.group(2)) / 255.0
|
|
b = int(rgb_values.group(3)) / 255.0
|
|
h, s, v = colorsys.rgb_to_hsv(r, g, b)
|
|
except (ValueError, AttributeError):
|
|
return f"Invalid RGB color format: {color}"
|
|
|
|
# Common color names
|
|
else:
|
|
color_map = {
|
|
'red': (0, 254), # hue, saturation
|
|
'green': (25500, 254),
|
|
'blue': (46920, 254),
|
|
'yellow': (12750, 254),
|
|
'orange': (8500, 254),
|
|
'purple': (50000, 254),
|
|
'pink': (56100, 254),
|
|
'white': (0, 0),
|
|
'warm': (9000, 100), # warm white
|
|
'cool': (37000, 50) # cool white
|
|
}
|
|
|
|
if color.lower() in color_map:
|
|
hue_val, sat_val = color_map[color.lower()]
|
|
# Update with direct values
|
|
hue_controller.update_light(light_name, "hue", hue_val)
|
|
hue_controller.update_light(light_name, "sat", sat_val)
|
|
return f"Successfully set {light_name} to color {color}"
|
|
else:
|
|
return f"Unknown color name: {color}. Try a hex color (#RRGGBB) instead."
|
|
|
|
if h is not None and s is not None:
|
|
# Convert to Hue's color format
|
|
hue_val = int(h * 65535)
|
|
sat_val = int(s * 254)
|
|
|
|
# Update the light
|
|
hue_controller.update_light(light_name, "hue", hue_val)
|
|
hue_controller.update_light(light_name, "sat", sat_val)
|
|
|
|
return f"Successfully set {light_name} to color {color}"
|
|
except Exception as e:
|
|
return f"Error setting light color: {e}"
|
|
|
|
@mcp.tool()
|
|
async def schedule_lighting(light_name: str, parameter: str, value: Any,
|
|
minutes_from_now: int, task_id: str = None) -> str:
|
|
"""Schedule a light change for a future time.
|
|
|
|
Args:
|
|
light_name: Name of the light to control
|
|
parameter: Parameter to change (on, bri, hue, sat)
|
|
value: Value to set for the parameter
|
|
minutes_from_now: Minutes from now to execute the change
|
|
task_id: Optional identifier for the task (defaults to auto-generated)
|
|
"""
|
|
try:
|
|
if not hue_controller:
|
|
return "Error: Hue controller not initialized"
|
|
|
|
# Validate parameter
|
|
valid_params = ["on", "bri", "hue", "sat", "ct", "xy"]
|
|
if parameter not in valid_params:
|
|
return f"Invalid parameter. Must be one of: {', '.join(valid_params)}"
|
|
|
|
# Create a task ID if none provided
|
|
if not task_id:
|
|
task_id = f"{light_name}_{parameter}_{int(time.time())}"
|
|
|
|
# Check if there's already a task with this ID
|
|
if task_id in scheduled_tasks:
|
|
return f"A task with ID '{task_id}' already exists. Choose a different ID or cancel the existing task."
|
|
|
|
# Calculate the target time
|
|
target_time = datetime.now() + timedelta(minutes=minutes_from_now)
|
|
|
|
# Create and schedule the task
|
|
async def scheduled_task():
|
|
# Calculate seconds to wait
|
|
wait_seconds = (target_time - datetime.now()).total_seconds()
|
|
if wait_seconds > 0:
|
|
await asyncio.sleep(wait_seconds)
|
|
|
|
# Apply the change
|
|
try:
|
|
hue_controller.update_light(light_name, parameter, value)
|
|
print(f"Scheduled task executed: {light_name}.{parameter} = {value}")
|
|
|
|
# Remove from scheduled tasks
|
|
if task_id in scheduled_tasks:
|
|
del scheduled_tasks[task_id]
|
|
except Exception as e:
|
|
print(f"Error in scheduled task: {e}")
|
|
|
|
# Store and start the task
|
|
task = asyncio.create_task(scheduled_task())
|
|
scheduled_tasks[task_id] = {
|
|
"task": task,
|
|
"light_name": light_name,
|
|
"parameter": parameter,
|
|
"value": value,
|
|
"target_time": target_time
|
|
}
|
|
|
|
formatted_time = target_time.strftime("%H:%M:%S")
|
|
return f"Scheduled change: {light_name}.{parameter} = {value} at {formatted_time} (Task ID: {task_id})"
|
|
except Exception as e:
|
|
return f"Error scheduling light change: {e}"
|
|
|
|
@mcp.tool()
|
|
async def list_scheduled_tasks() -> str:
|
|
"""List all scheduled lighting tasks."""
|
|
try:
|
|
if not scheduled_tasks:
|
|
return "No scheduled tasks found."
|
|
|
|
result = ["Scheduled lighting tasks:"]
|
|
for task_id, info in scheduled_tasks.items():
|
|
time_str = info["target_time"].strftime("%H:%M:%S")
|
|
result.append(f"• ID: {task_id} - {info['light_name']}.{info['parameter']} = {info['value']} at {time_str}")
|
|
|
|
return "\n".join(result)
|
|
except Exception as e:
|
|
return f"Error listing scheduled tasks: {e}"
|
|
|
|
@mcp.tool()
|
|
async def cancel_scheduled_task(task_id: str) -> str:
|
|
"""Cancel a scheduled lighting task.
|
|
|
|
Args:
|
|
task_id: ID of the task to cancel
|
|
"""
|
|
try:
|
|
if task_id not in scheduled_tasks:
|
|
return f"No task found with ID '{task_id}'"
|
|
|
|
# Cancel the task
|
|
task_info = scheduled_tasks[task_id]
|
|
task_info["task"].cancel()
|
|
del scheduled_tasks[task_id]
|
|
|
|
return f"Successfully cancelled task: {task_id}"
|
|
except Exception as e:
|
|
return f"Error cancelling task: {e}"
|
|
|
|
@mcp.tool()
|
|
async def group_control(group_name: str = None, parameter: str = None, value: Any = None, light_names: List[str] = None) -> str:
|
|
"""Control multiple lights as a group.
|
|
|
|
Args:
|
|
group_name: Optional name of predefined group (e.g., room name)
|
|
parameter: Parameter to change (on, bri, hue, sat)
|
|
value: Value to set for the parameter
|
|
light_names: Optional list of specific light names to control as a group
|
|
"""
|
|
try:
|
|
if not hue_controller:
|
|
return "Error: Hue controller not initialized"
|
|
|
|
# Get lights to control
|
|
target_lights = []
|
|
|
|
# If group_name is provided, try to find it in bridge
|
|
if group_name:
|
|
try:
|
|
# Get groups from bridge
|
|
groups = hue_controller.bridge.get_group()
|
|
group_found = False
|
|
|
|
for group_id, group_data in groups.items():
|
|
if group_data['name'].lower() == group_name.lower():
|
|
# Found the group, get its lights
|
|
group = hue_controller.bridge.get_group(int(group_id), 'lights')
|
|
light_ids = group['lights']
|
|
|
|
# Convert light IDs to names
|
|
all_lights = hue_controller.bridge.get_light_objects('id')
|
|
for light_id in light_ids:
|
|
light = all_lights.get(int(light_id))
|
|
if light:
|
|
target_lights.append(light.name)
|
|
|
|
group_found = True
|
|
break
|
|
|
|
if not group_found:
|
|
return f"Group '{group_name}' not found. Available groups: {', '.join([g['name'] for g in groups.values()])}"
|
|
except Exception as e:
|
|
return f"Error finding group '{group_name}': {e}"
|
|
|
|
# If light_names is provided, use those instead/additionally
|
|
if light_names:
|
|
target_lights.extend(light_names)
|
|
|
|
# Remove duplicates
|
|
target_lights = list(set(target_lights))
|
|
|
|
if not target_lights:
|
|
return "No lights to control. Provide either group_name or light_names."
|
|
|
|
# If parameter and value are provided, update all target lights
|
|
if parameter and value is not None:
|
|
results = []
|
|
for light_name in target_lights:
|
|
try:
|
|
hue_controller.update_light(light_name, parameter, value)
|
|
results.append(f"Updated {light_name}")
|
|
except Exception as e:
|
|
results.append(f"Failed to update {light_name}: {e}")
|
|
|
|
return f"Group control results:\n" + "\n".join(results)
|
|
else:
|
|
return f"Available lights in group: {', '.join(target_lights)}"
|
|
except Exception as e:
|
|
return f"Error in group control: {e}"
|
|
|
|
@mcp.tool()
|
|
async def get_light_info(light_name: str) -> str:
|
|
"""Get detailed information about a specific light.
|
|
|
|
Args:
|
|
light_name: Name of the light to query
|
|
"""
|
|
try:
|
|
if not hue_controller:
|
|
return "Error: Hue controller not initialized"
|
|
|
|
light = hue_controller.get_light_by_name(light_name)
|
|
if not light:
|
|
return f"Light '{light_name}' not found"
|
|
|
|
# Get all light details from the bridge
|
|
light_details = {}
|
|
for light_id, light_data in hue_controller.bridge.get_light().items():
|
|
if light_data['name'] == light_name:
|
|
light_details = light_data
|
|
break
|
|
|
|
# Format the details
|
|
if light_details:
|
|
state = light_details.get('state', {})
|
|
result = [
|
|
f"Light: {light_name}",
|
|
f"Type: {light_details.get('type', 'Unknown')}",
|
|
f"Model ID: {light_details.get('modelid', 'Unknown')}",
|
|
f"Manufacturer: {light_details.get('manufacturername', 'Unknown')}",
|
|
f"Software Version: {light_details.get('swversion', 'Unknown')}",
|
|
"\nCurrent State:",
|
|
f" On: {state.get('on', 'Unknown')}",
|
|
f" Brightness: {state.get('bri', 'N/A')}/254",
|
|
f" Hue: {state.get('hue', 'N/A')}/65535",
|
|
f" Saturation: {state.get('sat', 'N/A')}/254",
|
|
f" Color Temperature: {state.get('ct', 'N/A')} mired",
|
|
f" Reachable: {state.get('reachable', 'Unknown')}"
|
|
]
|
|
return "\n".join(result)
|
|
else:
|
|
return f"Could not retrieve detailed information for light '{light_name}'"
|
|
except Exception as e:
|
|
return f"Error getting light information: {e}"
|
|
|
|
# Add more tools here as development continues...
|
|
|
|
def main(bridge_ip: str = None, update_interval: float = 0.05) -> None:
|
|
"""
|
|
Run the MCP server.
|
|
|
|
Args:
|
|
bridge_ip: IP address of the Hue Bridge (optional, can be set from config)
|
|
update_interval: Time between updates in seconds (default: 0.05)
|
|
"""
|
|
# Import here to avoid circular import
|
|
from config import ConfigManager
|
|
|
|
if not bridge_ip:
|
|
# If no bridge IP is provided, try to get it from config
|
|
config = ConfigManager()
|
|
bridge_ip = config.get_value("bridge_ip")
|
|
|
|
if not bridge_ip:
|
|
print("Error: No bridge IP provided and none found in config")
|
|
return
|
|
|
|
# Initialize the Hue controller
|
|
try:
|
|
print(f"Initializing Hue controller with bridge IP: {bridge_ip}")
|
|
initialize_hue_controller(bridge_ip, update_interval)
|
|
print("Hue controller initialized successfully")
|
|
except Exception as e:
|
|
print(f"Error initializing Hue controller: {e}")
|
|
return
|
|
|
|
# Run the MCP server
|
|
print("Starting MCP server...")
|
|
mcp.run()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|