ship-controller/mcp_server.py
2025-07-01 01:22:36 +02:00

465 lines
17 KiB
Python

"""
MCP Server module for MIDI-to-Hue application.
Provides MCP tools for controlling Philips Hue lights through an MCP server.
"""
from typing import List, Dict, Any, Optional
import asyncio
import time
import re
import colorsys
from datetime import datetime, timedelta
from mcp.server.fastmcp import FastMCP
from hue_controller import HueController
# Dictionary to store scheduled tasks
scheduled_tasks = {}
# Initialize FastMCP server
mcp = FastMCP("hue_control")
# Global reference to the HueController instance
hue_controller = None
def initialize_hue_controller(bridge_ip: str, update_interval: float = 0.05) -> None:
"""Initialize the global HueController instance.
Args:
bridge_ip: IP address of the Hue Bridge
update_interval: Time between updates in seconds
"""
global hue_controller
hue_controller = HueController(bridge_ip, update_interval)
return hue_controller
# Tier 1: Core Functionality (Essential)
@mcp.tool()
async def list_all_lights() -> str:
"""List all available Hue lights and their current states."""
try:
if not hue_controller or not hue_controller.bridge:
return "Error: Hue controller not initialized"
lights = hue_controller.bridge.get_light_objects('name')
result = []
for name, light in lights.items():
state = "ON" if light.on else "OFF"
result.append(f"{name}: {state}, Brightness: {light.brightness}/254, Reachable: {light.reachable}")
return "\n".join(result) if result else "No lights found."
except Exception as e:
return f"Error listing lights: {e}"
@mcp.tool()
async def set_light_state(light_name: str, on: bool) -> str:
"""Turn a specific light on or off.
Args:
light_name: Name of the light to control
on: True to turn on, False to turn off
"""
try:
if not hue_controller:
return "Error: Hue controller not initialized"
hue_controller.update_light(light_name, "on", on)
state = "on" if on else "off"
return f"Successfully turned {light_name} {state}"
except Exception as e:
return f"Error setting light state: {e}"
@mcp.tool()
async def set_light_brightness(light_name: str, brightness: int) -> str:
"""Set the brightness of a specific light.
Args:
light_name: Name of the light to control
brightness: Brightness value (0-254)
"""
try:
if not hue_controller:
return "Error: Hue controller not initialized"
if brightness < 0 or brightness > 254:
return "Brightness must be between 0 and 254"
hue_controller.update_light(light_name, "bri", brightness)
return f"Successfully set {light_name} brightness to {brightness}"
except Exception as e:
return f"Error setting brightness: {e}"
@mcp.tool()
async def flash_alert(light_name: str, flashes: int = 3, interval: float = 0.5) -> str:
"""Make a light blink/flash for notifications.
Args:
light_name: Name of the light to flash
flashes: Number of times to flash the light (default: 3)
interval: Time in seconds between flashes (default: 0.5)
"""
try:
if not hue_controller:
return "Error: Hue controller not initialized"
light = hue_controller.get_light_by_name(light_name)
if not light:
return f"Light '{light_name}' not found"
# Store original state
original_state = light.on
original_brightness = light.brightness
# Flash the light
for _ in range(flashes):
hue_controller.update_light(light_name, "on", True)
hue_controller.update_light(light_name, "bri", 254) # Full brightness
await asyncio.sleep(interval)
hue_controller.update_light(light_name, "on", False)
await asyncio.sleep(interval)
# Restore original state
hue_controller.update_light(light_name, "on", original_state)
if original_state:
hue_controller.update_light(light_name, "bri", original_brightness)
return f"Successfully flashed {light_name} {flashes} times"
except Exception as e:
return f"Error flashing light: {e}"
# Tier 2: Common Controls (Very Important)
@mcp.tool()
async def set_light_color(light_name: str, color: str) -> str:
"""Set a light to a specific color using hex, RGB, or common names.
Args:
light_name: Name of the light to control
color: Color in hex format (#RRGGBB), RGB format (rgb(r,g,b)),
or common name (red, blue, green, etc.)
"""
try:
if not hue_controller:
return "Error: Hue controller not initialized"
light = hue_controller.get_light_by_name(light_name)
if not light:
return f"Light '{light_name}' not found"
# Convert color to hue/sat values based on format
h, s, v = None, None, None
# Hex color format (#RRGGBB)
if color.startswith('#') and len(color) == 7:
try:
r = int(color[1:3], 16) / 255.0
g = int(color[3:5], 16) / 255.0
b = int(color[5:7], 16) / 255.0
h, s, v = colorsys.rgb_to_hsv(r, g, b)
except ValueError:
return f"Invalid hex color format: {color}"
# RGB format (rgb(r,g,b))
elif color.lower().startswith('rgb(') and color.endswith(')'):
try:
rgb_values = re.search(r'rgb\((\d+),\s*(\d+),\s*(\d+)\)', color.lower())
if rgb_values:
r = int(rgb_values.group(1)) / 255.0
g = int(rgb_values.group(2)) / 255.0
b = int(rgb_values.group(3)) / 255.0
h, s, v = colorsys.rgb_to_hsv(r, g, b)
except (ValueError, AttributeError):
return f"Invalid RGB color format: {color}"
# Common color names
else:
color_map = {
'red': (0, 254), # hue, saturation
'green': (25500, 254),
'blue': (46920, 254),
'yellow': (12750, 254),
'orange': (8500, 254),
'purple': (50000, 254),
'pink': (56100, 254),
'white': (0, 0),
'warm': (9000, 100), # warm white
'cool': (37000, 50) # cool white
}
if color.lower() in color_map:
hue_val, sat_val = color_map[color.lower()]
# Update with direct values
hue_controller.update_light(light_name, "hue", hue_val)
hue_controller.update_light(light_name, "sat", sat_val)
return f"Successfully set {light_name} to color {color}"
else:
return f"Unknown color name: {color}. Try a hex color (#RRGGBB) instead."
if h is not None and s is not None:
# Convert to Hue's color format
hue_val = int(h * 65535)
sat_val = int(s * 254)
# Update the light
hue_controller.update_light(light_name, "hue", hue_val)
hue_controller.update_light(light_name, "sat", sat_val)
return f"Successfully set {light_name} to color {color}"
except Exception as e:
return f"Error setting light color: {e}"
@mcp.tool()
async def schedule_lighting(light_name: str, parameter: str, value: Any,
minutes_from_now: int, task_id: str = None) -> str:
"""Schedule a light change for a future time.
Args:
light_name: Name of the light to control
parameter: Parameter to change (on, bri, hue, sat)
value: Value to set for the parameter
minutes_from_now: Minutes from now to execute the change
task_id: Optional identifier for the task (defaults to auto-generated)
"""
try:
if not hue_controller:
return "Error: Hue controller not initialized"
# Validate parameter
valid_params = ["on", "bri", "hue", "sat", "ct", "xy"]
if parameter not in valid_params:
return f"Invalid parameter. Must be one of: {', '.join(valid_params)}"
# Create a task ID if none provided
if not task_id:
task_id = f"{light_name}_{parameter}_{int(time.time())}"
# Check if there's already a task with this ID
if task_id in scheduled_tasks:
return f"A task with ID '{task_id}' already exists. Choose a different ID or cancel the existing task."
# Calculate the target time
target_time = datetime.now() + timedelta(minutes=minutes_from_now)
# Create and schedule the task
async def scheduled_task():
# Calculate seconds to wait
wait_seconds = (target_time - datetime.now()).total_seconds()
if wait_seconds > 0:
await asyncio.sleep(wait_seconds)
# Apply the change
try:
hue_controller.update_light(light_name, parameter, value)
print(f"Scheduled task executed: {light_name}.{parameter} = {value}")
# Remove from scheduled tasks
if task_id in scheduled_tasks:
del scheduled_tasks[task_id]
except Exception as e:
print(f"Error in scheduled task: {e}")
# Store and start the task
task = asyncio.create_task(scheduled_task())
scheduled_tasks[task_id] = {
"task": task,
"light_name": light_name,
"parameter": parameter,
"value": value,
"target_time": target_time
}
formatted_time = target_time.strftime("%H:%M:%S")
return f"Scheduled change: {light_name}.{parameter} = {value} at {formatted_time} (Task ID: {task_id})"
except Exception as e:
return f"Error scheduling light change: {e}"
@mcp.tool()
async def list_scheduled_tasks() -> str:
"""List all scheduled lighting tasks."""
try:
if not scheduled_tasks:
return "No scheduled tasks found."
result = ["Scheduled lighting tasks:"]
for task_id, info in scheduled_tasks.items():
time_str = info["target_time"].strftime("%H:%M:%S")
result.append(f"• ID: {task_id} - {info['light_name']}.{info['parameter']} = {info['value']} at {time_str}")
return "\n".join(result)
except Exception as e:
return f"Error listing scheduled tasks: {e}"
@mcp.tool()
async def cancel_scheduled_task(task_id: str) -> str:
"""Cancel a scheduled lighting task.
Args:
task_id: ID of the task to cancel
"""
try:
if task_id not in scheduled_tasks:
return f"No task found with ID '{task_id}'"
# Cancel the task
task_info = scheduled_tasks[task_id]
task_info["task"].cancel()
del scheduled_tasks[task_id]
return f"Successfully cancelled task: {task_id}"
except Exception as e:
return f"Error cancelling task: {e}"
@mcp.tool()
async def group_control(group_name: str = None, parameter: str = None, value: Any = None, light_names: List[str] = None) -> str:
"""Control multiple lights as a group.
Args:
group_name: Optional name of predefined group (e.g., room name)
parameter: Parameter to change (on, bri, hue, sat)
value: Value to set for the parameter
light_names: Optional list of specific light names to control as a group
"""
try:
if not hue_controller:
return "Error: Hue controller not initialized"
# Get lights to control
target_lights = []
# If group_name is provided, try to find it in bridge
if group_name:
try:
# Get groups from bridge
groups = hue_controller.bridge.get_group()
group_found = False
for group_id, group_data in groups.items():
if group_data['name'].lower() == group_name.lower():
# Found the group, get its lights
group = hue_controller.bridge.get_group(int(group_id), 'lights')
light_ids = group['lights']
# Convert light IDs to names
all_lights = hue_controller.bridge.get_light_objects('id')
for light_id in light_ids:
light = all_lights.get(int(light_id))
if light:
target_lights.append(light.name)
group_found = True
break
if not group_found:
return f"Group '{group_name}' not found. Available groups: {', '.join([g['name'] for g in groups.values()])}"
except Exception as e:
return f"Error finding group '{group_name}': {e}"
# If light_names is provided, use those instead/additionally
if light_names:
target_lights.extend(light_names)
# Remove duplicates
target_lights = list(set(target_lights))
if not target_lights:
return "No lights to control. Provide either group_name or light_names."
# If parameter and value are provided, update all target lights
if parameter and value is not None:
results = []
for light_name in target_lights:
try:
hue_controller.update_light(light_name, parameter, value)
results.append(f"Updated {light_name}")
except Exception as e:
results.append(f"Failed to update {light_name}: {e}")
return f"Group control results:\n" + "\n".join(results)
else:
return f"Available lights in group: {', '.join(target_lights)}"
except Exception as e:
return f"Error in group control: {e}"
@mcp.tool()
async def get_light_info(light_name: str) -> str:
"""Get detailed information about a specific light.
Args:
light_name: Name of the light to query
"""
try:
if not hue_controller:
return "Error: Hue controller not initialized"
light = hue_controller.get_light_by_name(light_name)
if not light:
return f"Light '{light_name}' not found"
# Get all light details from the bridge
light_details = {}
for light_id, light_data in hue_controller.bridge.get_light().items():
if light_data['name'] == light_name:
light_details = light_data
break
# Format the details
if light_details:
state = light_details.get('state', {})
result = [
f"Light: {light_name}",
f"Type: {light_details.get('type', 'Unknown')}",
f"Model ID: {light_details.get('modelid', 'Unknown')}",
f"Manufacturer: {light_details.get('manufacturername', 'Unknown')}",
f"Software Version: {light_details.get('swversion', 'Unknown')}",
"\nCurrent State:",
f" On: {state.get('on', 'Unknown')}",
f" Brightness: {state.get('bri', 'N/A')}/254",
f" Hue: {state.get('hue', 'N/A')}/65535",
f" Saturation: {state.get('sat', 'N/A')}/254",
f" Color Temperature: {state.get('ct', 'N/A')} mired",
f" Reachable: {state.get('reachable', 'Unknown')}"
]
return "\n".join(result)
else:
return f"Could not retrieve detailed information for light '{light_name}'"
except Exception as e:
return f"Error getting light information: {e}"
# Add more tools here as development continues...
def main(bridge_ip: str = None, update_interval: float = 0.05) -> None:
"""
Run the MCP server.
Args:
bridge_ip: IP address of the Hue Bridge (optional, can be set from config)
update_interval: Time between updates in seconds (default: 0.05)
"""
# Import here to avoid circular import
from config import ConfigManager
if not bridge_ip:
# If no bridge IP is provided, try to get it from config
config = ConfigManager()
bridge_ip = config.get_value("bridge_ip")
if not bridge_ip:
print("Error: No bridge IP provided and none found in config")
return
# Initialize the Hue controller
try:
print(f"Initializing Hue controller with bridge IP: {bridge_ip}")
initialize_hue_controller(bridge_ip, update_interval)
print("Hue controller initialized successfully")
except Exception as e:
print(f"Error initializing Hue controller: {e}")
return
# Run the MCP server
print("Starting MCP server...")
mcp.run()
if __name__ == "__main__":
main()