feat: try adding LLM
This commit is contained in:
parent
d3ed8d1ee0
commit
30244b942d
4 changed files with 822 additions and 5 deletions
61
README.md
61
README.md
|
|
@ -52,3 +52,64 @@ The default configuration file (`midi_hue_config.json`) contains:
|
||||||
|
|
||||||
- mido
|
- mido
|
||||||
- phue
|
- phue
|
||||||
|
- mcp (FastMCP)
|
||||||
|
|
||||||
|
## MCP Server Integration
|
||||||
|
|
||||||
|
The application now includes MCP (Machine Control Protocol) server integration, allowing you to control your Hue lights through an MCP client. This enables voice assistants and other AI agents to control your lighting system.
|
||||||
|
|
||||||
|
### Running the MCP Server
|
||||||
|
|
||||||
|
By default, the MCP server starts automatically alongside the main application. To run only the MCP server:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python main.py --mcp-only
|
||||||
|
```
|
||||||
|
|
||||||
|
### Available MCP Tools
|
||||||
|
|
||||||
|
The following tools are available through the MCP server:
|
||||||
|
|
||||||
|
#### Core Light Control
|
||||||
|
|
||||||
|
- **list_all_lights** - List all available lights and their current states
|
||||||
|
- **set_light_state** - Turn a specific light on or off
|
||||||
|
- **set_light_brightness** - Adjust brightness of a specific light (0-254)
|
||||||
|
- **flash_alert** - Make a light blink/flash for notifications
|
||||||
|
|
||||||
|
#### Advanced Light Control
|
||||||
|
|
||||||
|
- **set_light_color** - Set a light to a specific color using hex (#RRGGBB), RGB format, or common names
|
||||||
|
- **get_light_info** - Get detailed information about a specific light
|
||||||
|
- **group_control** - Control multiple lights as a group
|
||||||
|
|
||||||
|
#### Scheduling
|
||||||
|
|
||||||
|
- **schedule_lighting** - Schedule a light change for a future time
|
||||||
|
- **list_scheduled_tasks** - List all scheduled lighting tasks
|
||||||
|
- **cancel_scheduled_task** - Cancel a scheduled lighting task
|
||||||
|
|
||||||
|
### Example MCP Usage
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Using the list_all_lights tool to see available lights
|
||||||
|
response = await hue_control.list_all_lights()
|
||||||
|
print(response)
|
||||||
|
|
||||||
|
# Turn on a specific light
|
||||||
|
response = await hue_control.set_light_state(light_name="Living Room", on=True)
|
||||||
|
print(response)
|
||||||
|
|
||||||
|
# Set a light to a specific color
|
||||||
|
response = await hue_control.set_light_color(light_name="Bedroom", color="#FF0000")
|
||||||
|
print(response)
|
||||||
|
|
||||||
|
# Schedule a light to turn off in 30 minutes
|
||||||
|
response = await hue_control.schedule_lighting(
|
||||||
|
light_name="Kitchen",
|
||||||
|
parameter="on",
|
||||||
|
value=False,
|
||||||
|
minutes_from_now=30
|
||||||
|
)
|
||||||
|
print(response)
|
||||||
|
```
|
||||||
|
|
|
||||||
243
llm_processor.py
Normal file
243
llm_processor.py
Normal file
|
|
@ -0,0 +1,243 @@
|
||||||
|
"""
|
||||||
|
LLM Processor module for MIDI-to-Hue application.
|
||||||
|
Processes speech-to-text results through an LLM with MCP tools.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Dict, Any, List, Optional, Callable
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
|
# Try to import Anthropic - suggest installation if not found
|
||||||
|
try:
|
||||||
|
from anthropic import Anthropic
|
||||||
|
HAS_ANTHROPIC = True
|
||||||
|
except ImportError:
|
||||||
|
HAS_ANTHROPIC = False
|
||||||
|
print("Warning: Anthropic package not found. Install with: pip install anthropic")
|
||||||
|
|
||||||
|
# Try to import dotenv - suggest installation if not found
|
||||||
|
try:
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv() # Load environment variables from .env file
|
||||||
|
HAS_DOTENV = True
|
||||||
|
except ImportError:
|
||||||
|
HAS_DOTENV = False
|
||||||
|
print("Warning: python-dotenv package not found. Install with: pip install python-dotenv")
|
||||||
|
|
||||||
|
# MCP imports
|
||||||
|
from mcp import ClientSession
|
||||||
|
from mcp.client.stdio import stdio_client
|
||||||
|
from mcp.client.param.stdio import StdioServerParameters
|
||||||
|
|
||||||
|
|
||||||
|
class LLMProcessor:
|
||||||
|
"""Processes text through LLM and executes MCP tools."""
|
||||||
|
|
||||||
|
def __init__(self, mcp_script_path: str):
|
||||||
|
"""
|
||||||
|
Initialize the LLM processor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
mcp_script_path: Path to the MCP server script
|
||||||
|
"""
|
||||||
|
self.mcp_script_path = mcp_script_path
|
||||||
|
self.anthropic = None
|
||||||
|
self.session = None
|
||||||
|
self.available_tools = []
|
||||||
|
self.loop = asyncio.new_event_loop()
|
||||||
|
self.initialized = False
|
||||||
|
self.is_processing = False
|
||||||
|
|
||||||
|
# Start initialization in background
|
||||||
|
self.init_thread = Thread(target=self._run_init, daemon=True)
|
||||||
|
self.init_thread.start()
|
||||||
|
|
||||||
|
def _run_init(self):
|
||||||
|
"""Run initialization in a separate thread."""
|
||||||
|
asyncio.set_event_loop(self.loop)
|
||||||
|
self.loop.run_until_complete(self._initialize())
|
||||||
|
|
||||||
|
async def _initialize(self):
|
||||||
|
"""Initialize LLM and MCP client."""
|
||||||
|
try:
|
||||||
|
# Initialize Anthropic client
|
||||||
|
if HAS_ANTHROPIC:
|
||||||
|
self.anthropic = Anthropic()
|
||||||
|
print("Initialized Anthropic client")
|
||||||
|
else:
|
||||||
|
print("Anthropic package not available, LLM processing disabled")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Connect to MCP server
|
||||||
|
print(f"Connecting to MCP server: {self.mcp_script_path}")
|
||||||
|
await self._connect_to_server()
|
||||||
|
self.initialized = True
|
||||||
|
print("LLM Processor fully initialized and ready")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error initializing LLM processor: {e}")
|
||||||
|
|
||||||
|
async def _connect_to_server(self):
|
||||||
|
"""Connect to the MCP server."""
|
||||||
|
command = "python" # Assuming Python script
|
||||||
|
server_params = StdioServerParameters(
|
||||||
|
command=command,
|
||||||
|
args=[self.mcp_script_path, "--mcp-only"],
|
||||||
|
env=None
|
||||||
|
)
|
||||||
|
|
||||||
|
self.exit_stack = asyncio.AsyncExitStack()
|
||||||
|
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
|
||||||
|
stdio, write = stdio_transport
|
||||||
|
self.session = await self.exit_stack.enter_async_context(ClientSession(stdio, write))
|
||||||
|
|
||||||
|
await self.session.initialize()
|
||||||
|
|
||||||
|
# List available tools
|
||||||
|
response = await self.session.list_tools()
|
||||||
|
self.available_tools = [{
|
||||||
|
"name": tool.name,
|
||||||
|
"description": tool.description,
|
||||||
|
"input_schema": tool.inputSchema
|
||||||
|
} for tool in response.tools]
|
||||||
|
|
||||||
|
print(f"Connected to MCP server with {len(self.available_tools)} tools")
|
||||||
|
|
||||||
|
def process_text(self, text: str, callback: Optional[Callable[[str], None]] = None):
|
||||||
|
"""
|
||||||
|
Process text through LLM and execute any tool calls.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: The text to process (from speech-to-text)
|
||||||
|
callback: Optional callback function for results
|
||||||
|
"""
|
||||||
|
if self.is_processing:
|
||||||
|
print("Already processing a request, please wait")
|
||||||
|
if callback:
|
||||||
|
callback("Already processing a request, please wait.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self.initialized:
|
||||||
|
print("LLM processor not yet initialized, please wait")
|
||||||
|
if callback:
|
||||||
|
callback("LLM processor not yet initialized, please wait.")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.is_processing = True
|
||||||
|
|
||||||
|
# Start async processing in background
|
||||||
|
task = self.loop.create_task(self._process_text_async(text, callback))
|
||||||
|
|
||||||
|
# Add done callback to handle exceptions
|
||||||
|
task.add_done_callback(lambda t: self._process_done(t))
|
||||||
|
|
||||||
|
def _process_done(self, task):
|
||||||
|
"""Handle completed processing task."""
|
||||||
|
self.is_processing = False
|
||||||
|
# Check for exceptions
|
||||||
|
if task.exception():
|
||||||
|
print(f"Error during text processing: {task.exception()}")
|
||||||
|
|
||||||
|
async def _process_text_async(self, text: str, callback: Optional[Callable[[str], None]] = None):
|
||||||
|
"""Process text asynchronously."""
|
||||||
|
try:
|
||||||
|
if not self.anthropic or not self.session:
|
||||||
|
error = "LLM or MCP session not initialized"
|
||||||
|
print(error)
|
||||||
|
if callback:
|
||||||
|
callback(error)
|
||||||
|
return
|
||||||
|
|
||||||
|
prompt = f"The user said: \"{text}\"\n\nProcess this request to control Philips Hue lights. Use the available tools to fulfill the request."
|
||||||
|
|
||||||
|
messages = [
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": prompt
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
# Initial LLM call
|
||||||
|
response = self.anthropic.messages.create(
|
||||||
|
model="claude-3-5-sonnet-20241022",
|
||||||
|
max_tokens=1000,
|
||||||
|
messages=messages,
|
||||||
|
tools=self.available_tools
|
||||||
|
)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
final_text = []
|
||||||
|
|
||||||
|
# Process response content
|
||||||
|
for content in response.content:
|
||||||
|
if content.type == 'text':
|
||||||
|
final_text.append(content.text)
|
||||||
|
elif content.type == 'tool_use':
|
||||||
|
tool_name = content.name
|
||||||
|
tool_args = content.input
|
||||||
|
|
||||||
|
# Execute tool call
|
||||||
|
result_message = f"\n[Calling tool {tool_name}]"
|
||||||
|
print(result_message)
|
||||||
|
final_text.append(result_message)
|
||||||
|
|
||||||
|
# Execute the tool
|
||||||
|
result = await self.session.call_tool(tool_name, tool_args)
|
||||||
|
results.append({"call": tool_name, "result": result.content})
|
||||||
|
|
||||||
|
result_message = f"Result: {result.content}"
|
||||||
|
print(result_message)
|
||||||
|
final_text.append(result_message)
|
||||||
|
|
||||||
|
# Continue conversation with tool results
|
||||||
|
if hasattr(content, 'text') and content.text:
|
||||||
|
messages.append({
|
||||||
|
"role": "assistant",
|
||||||
|
"content": content.text
|
||||||
|
})
|
||||||
|
messages.append({
|
||||||
|
"role": "user",
|
||||||
|
"content": f"Tool result: {result.content}"
|
||||||
|
})
|
||||||
|
|
||||||
|
# Get next response from LLM
|
||||||
|
follow_up = self.anthropic.messages.create(
|
||||||
|
model="claude-3-5-sonnet-20241022",
|
||||||
|
max_tokens=1000,
|
||||||
|
messages=messages,
|
||||||
|
)
|
||||||
|
|
||||||
|
if follow_up.content:
|
||||||
|
follow_up_text = follow_up.content[0].text
|
||||||
|
final_text.append(follow_up_text)
|
||||||
|
print(follow_up_text)
|
||||||
|
|
||||||
|
# Combine all text for final result
|
||||||
|
final_result = "\n".join(final_text)
|
||||||
|
|
||||||
|
# Call the callback with final result
|
||||||
|
if callback:
|
||||||
|
callback(final_result)
|
||||||
|
|
||||||
|
return final_result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_message = f"Error processing text: {str(e)}"
|
||||||
|
print(error_message)
|
||||||
|
if callback:
|
||||||
|
callback(error_message)
|
||||||
|
return error_message
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
"""Clean up resources."""
|
||||||
|
async def _cleanup():
|
||||||
|
if hasattr(self, 'exit_stack'):
|
||||||
|
await self.exit_stack.aclose()
|
||||||
|
|
||||||
|
if self.loop and self.loop.is_running():
|
||||||
|
self.loop.create_task(_cleanup())
|
||||||
|
else:
|
||||||
|
# Create a new event loop if needed
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
loop.run_until_complete(_cleanup())
|
||||||
56
main.py
56
main.py
|
|
@ -13,6 +13,9 @@ from mapper import MidiToHueMapper
|
||||||
from animations import AnimationManager, MidiLedAnimation
|
from animations import AnimationManager, MidiLedAnimation
|
||||||
from speech_to_text import SpeechToText
|
from speech_to_text import SpeechToText
|
||||||
import mido
|
import mido
|
||||||
|
from mcp_server import initialize_hue_controller as mcp_init_hue, main as mcp_main
|
||||||
|
from llm_processor import LLMProcessor
|
||||||
|
import os
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Main application entry point."""
|
"""Main application entry point."""
|
||||||
|
|
@ -22,6 +25,13 @@ def main():
|
||||||
# Create animation manager
|
# Create animation manager
|
||||||
animation_manager = AnimationManager()
|
animation_manager = AnimationManager()
|
||||||
|
|
||||||
|
# Initialize LLM processor if enabled
|
||||||
|
llm_processor = None
|
||||||
|
if config_manager.get_value("enable_llm", False):
|
||||||
|
print("Initializing LLM processor...")
|
||||||
|
script_path = os.path.abspath(__file__)
|
||||||
|
llm_processor = LLMProcessor(script_path)
|
||||||
|
|
||||||
# Initialize speech-to-text if enabled
|
# Initialize speech-to-text if enabled
|
||||||
stt = None
|
stt = None
|
||||||
if config_manager.is_stt_enabled():
|
if config_manager.is_stt_enabled():
|
||||||
|
|
@ -30,13 +40,28 @@ def main():
|
||||||
if not stt.initialize():
|
if not stt.initialize():
|
||||||
print("Warning: Failed to initialize speech-to-text.")
|
print("Warning: Failed to initialize speech-to-text.")
|
||||||
|
|
||||||
|
# Make LLM processor available to STT callback
|
||||||
|
if stt and llm_processor:
|
||||||
|
stt.llm_processor = llm_processor
|
||||||
|
|
||||||
# Initialize Hue controller
|
# Initialize Hue controller
|
||||||
try:
|
try:
|
||||||
hue_controller = HueController(
|
bridge_ip = config_manager.get_bridge_ip()
|
||||||
config_manager.get_bridge_ip(),
|
update_interval = config_manager.get_update_interval_sec()
|
||||||
config_manager.get_update_interval_sec()
|
|
||||||
)
|
hue_controller = HueController(bridge_ip, update_interval)
|
||||||
hue_controller.list_lights()
|
hue_controller.list_lights()
|
||||||
|
|
||||||
|
# Initialize the MCP server with the same Hue controller
|
||||||
|
# This runs in a separate thread
|
||||||
|
if config_manager.get_value("enable_mcp", True): # Enable by default
|
||||||
|
mcp_thread = threading.Thread(
|
||||||
|
target=run_mcp_server,
|
||||||
|
args=(bridge_ip, update_interval),
|
||||||
|
daemon=True # Make thread exit when main thread exits
|
||||||
|
)
|
||||||
|
mcp_thread.start()
|
||||||
|
print("MCP server thread started")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Failed to initialize Hue controller: {e}")
|
print(f"Failed to initialize Hue controller: {e}")
|
||||||
return 1
|
return 1
|
||||||
|
|
@ -87,6 +112,11 @@ def main():
|
||||||
def stt_result_callback(text):
|
def stt_result_callback(text):
|
||||||
print(f"\nSpeech recognition result: {text}\n")
|
print(f"\nSpeech recognition result: {text}\n")
|
||||||
|
|
||||||
|
# Process the text through LLM if configured
|
||||||
|
if hasattr(stt, 'llm_processor') and stt.llm_processor:
|
||||||
|
print("Processing through LLM...")
|
||||||
|
stt.llm_processor.process_text(text, lambda result: print(f"\nLLM Response:\n{result}\n"))
|
||||||
|
|
||||||
# Set result callback
|
# Set result callback
|
||||||
stt.set_callback(stt_result_callback)
|
stt.set_callback(stt_result_callback)
|
||||||
|
|
||||||
|
|
@ -167,8 +197,26 @@ def main():
|
||||||
# Clean up STT resources
|
# Clean up STT resources
|
||||||
if stt:
|
if stt:
|
||||||
stt.cleanup()
|
stt.cleanup()
|
||||||
|
# Clean up LLM processor resources
|
||||||
|
if llm_processor:
|
||||||
|
llm_processor.cleanup()
|
||||||
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
def run_mcp_server(bridge_ip, update_interval):
|
||||||
|
"""Run the MCP server in a separate thread."""
|
||||||
|
try:
|
||||||
|
print("Starting MCP server...")
|
||||||
|
mcp_main(bridge_ip, update_interval)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error in MCP server: {e}")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
# Check if we should run only the MCP server
|
||||||
|
if len(sys.argv) > 1 and sys.argv[1] == "--mcp-only":
|
||||||
|
# Load config and run only MCP server
|
||||||
|
config_manager = ConfigManager()
|
||||||
|
run_mcp_server(config_manager.get_bridge_ip(), config_manager.get_update_interval_sec())
|
||||||
|
else:
|
||||||
|
# Run full application
|
||||||
sys.exit(main())
|
sys.exit(main())
|
||||||
|
|
|
||||||
465
mcp_server.py
Normal file
465
mcp_server.py
Normal file
|
|
@ -0,0 +1,465 @@
|
||||||
|
"""
|
||||||
|
MCP Server module for MIDI-to-Hue application.
|
||||||
|
Provides MCP tools for controlling Philips Hue lights through an MCP server.
|
||||||
|
"""
|
||||||
|
from typing import List, Dict, Any, Optional
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
import re
|
||||||
|
import colorsys
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from mcp.server.fastmcp import FastMCP
|
||||||
|
from hue_controller import HueController
|
||||||
|
|
||||||
|
# Dictionary to store scheduled tasks
|
||||||
|
scheduled_tasks = {}
|
||||||
|
|
||||||
|
# Initialize FastMCP server
|
||||||
|
mcp = FastMCP("hue_control")
|
||||||
|
|
||||||
|
# Global reference to the HueController instance
|
||||||
|
hue_controller = None
|
||||||
|
|
||||||
|
def initialize_hue_controller(bridge_ip: str, update_interval: float = 0.05) -> None:
|
||||||
|
"""Initialize the global HueController instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bridge_ip: IP address of the Hue Bridge
|
||||||
|
update_interval: Time between updates in seconds
|
||||||
|
"""
|
||||||
|
global hue_controller
|
||||||
|
hue_controller = HueController(bridge_ip, update_interval)
|
||||||
|
return hue_controller
|
||||||
|
|
||||||
|
|
||||||
|
# Tier 1: Core Functionality (Essential)
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def list_all_lights() -> str:
|
||||||
|
"""List all available Hue lights and their current states."""
|
||||||
|
try:
|
||||||
|
if not hue_controller or not hue_controller.bridge:
|
||||||
|
return "Error: Hue controller not initialized"
|
||||||
|
|
||||||
|
lights = hue_controller.bridge.get_light_objects('name')
|
||||||
|
result = []
|
||||||
|
|
||||||
|
for name, light in lights.items():
|
||||||
|
state = "ON" if light.on else "OFF"
|
||||||
|
result.append(f"• {name}: {state}, Brightness: {light.brightness}/254, Reachable: {light.reachable}")
|
||||||
|
|
||||||
|
return "\n".join(result) if result else "No lights found."
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error listing lights: {e}"
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def set_light_state(light_name: str, on: bool) -> str:
|
||||||
|
"""Turn a specific light on or off.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
light_name: Name of the light to control
|
||||||
|
on: True to turn on, False to turn off
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not hue_controller:
|
||||||
|
return "Error: Hue controller not initialized"
|
||||||
|
|
||||||
|
hue_controller.update_light(light_name, "on", on)
|
||||||
|
state = "on" if on else "off"
|
||||||
|
return f"Successfully turned {light_name} {state}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error setting light state: {e}"
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def set_light_brightness(light_name: str, brightness: int) -> str:
|
||||||
|
"""Set the brightness of a specific light.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
light_name: Name of the light to control
|
||||||
|
brightness: Brightness value (0-254)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not hue_controller:
|
||||||
|
return "Error: Hue controller not initialized"
|
||||||
|
|
||||||
|
if brightness < 0 or brightness > 254:
|
||||||
|
return "Brightness must be between 0 and 254"
|
||||||
|
|
||||||
|
hue_controller.update_light(light_name, "bri", brightness)
|
||||||
|
return f"Successfully set {light_name} brightness to {brightness}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error setting brightness: {e}"
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def flash_alert(light_name: str, flashes: int = 3, interval: float = 0.5) -> str:
|
||||||
|
"""Make a light blink/flash for notifications.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
light_name: Name of the light to flash
|
||||||
|
flashes: Number of times to flash the light (default: 3)
|
||||||
|
interval: Time in seconds between flashes (default: 0.5)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not hue_controller:
|
||||||
|
return "Error: Hue controller not initialized"
|
||||||
|
|
||||||
|
light = hue_controller.get_light_by_name(light_name)
|
||||||
|
if not light:
|
||||||
|
return f"Light '{light_name}' not found"
|
||||||
|
|
||||||
|
# Store original state
|
||||||
|
original_state = light.on
|
||||||
|
original_brightness = light.brightness
|
||||||
|
|
||||||
|
# Flash the light
|
||||||
|
for _ in range(flashes):
|
||||||
|
hue_controller.update_light(light_name, "on", True)
|
||||||
|
hue_controller.update_light(light_name, "bri", 254) # Full brightness
|
||||||
|
await asyncio.sleep(interval)
|
||||||
|
hue_controller.update_light(light_name, "on", False)
|
||||||
|
await asyncio.sleep(interval)
|
||||||
|
|
||||||
|
# Restore original state
|
||||||
|
hue_controller.update_light(light_name, "on", original_state)
|
||||||
|
if original_state:
|
||||||
|
hue_controller.update_light(light_name, "bri", original_brightness)
|
||||||
|
|
||||||
|
return f"Successfully flashed {light_name} {flashes} times"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error flashing light: {e}"
|
||||||
|
|
||||||
|
# Tier 2: Common Controls (Very Important)
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def set_light_color(light_name: str, color: str) -> str:
|
||||||
|
"""Set a light to a specific color using hex, RGB, or common names.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
light_name: Name of the light to control
|
||||||
|
color: Color in hex format (#RRGGBB), RGB format (rgb(r,g,b)),
|
||||||
|
or common name (red, blue, green, etc.)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not hue_controller:
|
||||||
|
return "Error: Hue controller not initialized"
|
||||||
|
|
||||||
|
light = hue_controller.get_light_by_name(light_name)
|
||||||
|
if not light:
|
||||||
|
return f"Light '{light_name}' not found"
|
||||||
|
|
||||||
|
# Convert color to hue/sat values based on format
|
||||||
|
h, s, v = None, None, None
|
||||||
|
|
||||||
|
# Hex color format (#RRGGBB)
|
||||||
|
if color.startswith('#') and len(color) == 7:
|
||||||
|
try:
|
||||||
|
r = int(color[1:3], 16) / 255.0
|
||||||
|
g = int(color[3:5], 16) / 255.0
|
||||||
|
b = int(color[5:7], 16) / 255.0
|
||||||
|
h, s, v = colorsys.rgb_to_hsv(r, g, b)
|
||||||
|
except ValueError:
|
||||||
|
return f"Invalid hex color format: {color}"
|
||||||
|
|
||||||
|
# RGB format (rgb(r,g,b))
|
||||||
|
elif color.lower().startswith('rgb(') and color.endswith(')'):
|
||||||
|
try:
|
||||||
|
rgb_values = re.search(r'rgb\((\d+),\s*(\d+),\s*(\d+)\)', color.lower())
|
||||||
|
if rgb_values:
|
||||||
|
r = int(rgb_values.group(1)) / 255.0
|
||||||
|
g = int(rgb_values.group(2)) / 255.0
|
||||||
|
b = int(rgb_values.group(3)) / 255.0
|
||||||
|
h, s, v = colorsys.rgb_to_hsv(r, g, b)
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
return f"Invalid RGB color format: {color}"
|
||||||
|
|
||||||
|
# Common color names
|
||||||
|
else:
|
||||||
|
color_map = {
|
||||||
|
'red': (0, 254), # hue, saturation
|
||||||
|
'green': (25500, 254),
|
||||||
|
'blue': (46920, 254),
|
||||||
|
'yellow': (12750, 254),
|
||||||
|
'orange': (8500, 254),
|
||||||
|
'purple': (50000, 254),
|
||||||
|
'pink': (56100, 254),
|
||||||
|
'white': (0, 0),
|
||||||
|
'warm': (9000, 100), # warm white
|
||||||
|
'cool': (37000, 50) # cool white
|
||||||
|
}
|
||||||
|
|
||||||
|
if color.lower() in color_map:
|
||||||
|
hue_val, sat_val = color_map[color.lower()]
|
||||||
|
# Update with direct values
|
||||||
|
hue_controller.update_light(light_name, "hue", hue_val)
|
||||||
|
hue_controller.update_light(light_name, "sat", sat_val)
|
||||||
|
return f"Successfully set {light_name} to color {color}"
|
||||||
|
else:
|
||||||
|
return f"Unknown color name: {color}. Try a hex color (#RRGGBB) instead."
|
||||||
|
|
||||||
|
if h is not None and s is not None:
|
||||||
|
# Convert to Hue's color format
|
||||||
|
hue_val = int(h * 65535)
|
||||||
|
sat_val = int(s * 254)
|
||||||
|
|
||||||
|
# Update the light
|
||||||
|
hue_controller.update_light(light_name, "hue", hue_val)
|
||||||
|
hue_controller.update_light(light_name, "sat", sat_val)
|
||||||
|
|
||||||
|
return f"Successfully set {light_name} to color {color}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error setting light color: {e}"
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def schedule_lighting(light_name: str, parameter: str, value: Any,
|
||||||
|
minutes_from_now: int, task_id: str = None) -> str:
|
||||||
|
"""Schedule a light change for a future time.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
light_name: Name of the light to control
|
||||||
|
parameter: Parameter to change (on, bri, hue, sat)
|
||||||
|
value: Value to set for the parameter
|
||||||
|
minutes_from_now: Minutes from now to execute the change
|
||||||
|
task_id: Optional identifier for the task (defaults to auto-generated)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not hue_controller:
|
||||||
|
return "Error: Hue controller not initialized"
|
||||||
|
|
||||||
|
# Validate parameter
|
||||||
|
valid_params = ["on", "bri", "hue", "sat", "ct", "xy"]
|
||||||
|
if parameter not in valid_params:
|
||||||
|
return f"Invalid parameter. Must be one of: {', '.join(valid_params)}"
|
||||||
|
|
||||||
|
# Create a task ID if none provided
|
||||||
|
if not task_id:
|
||||||
|
task_id = f"{light_name}_{parameter}_{int(time.time())}"
|
||||||
|
|
||||||
|
# Check if there's already a task with this ID
|
||||||
|
if task_id in scheduled_tasks:
|
||||||
|
return f"A task with ID '{task_id}' already exists. Choose a different ID or cancel the existing task."
|
||||||
|
|
||||||
|
# Calculate the target time
|
||||||
|
target_time = datetime.now() + timedelta(minutes=minutes_from_now)
|
||||||
|
|
||||||
|
# Create and schedule the task
|
||||||
|
async def scheduled_task():
|
||||||
|
# Calculate seconds to wait
|
||||||
|
wait_seconds = (target_time - datetime.now()).total_seconds()
|
||||||
|
if wait_seconds > 0:
|
||||||
|
await asyncio.sleep(wait_seconds)
|
||||||
|
|
||||||
|
# Apply the change
|
||||||
|
try:
|
||||||
|
hue_controller.update_light(light_name, parameter, value)
|
||||||
|
print(f"Scheduled task executed: {light_name}.{parameter} = {value}")
|
||||||
|
|
||||||
|
# Remove from scheduled tasks
|
||||||
|
if task_id in scheduled_tasks:
|
||||||
|
del scheduled_tasks[task_id]
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error in scheduled task: {e}")
|
||||||
|
|
||||||
|
# Store and start the task
|
||||||
|
task = asyncio.create_task(scheduled_task())
|
||||||
|
scheduled_tasks[task_id] = {
|
||||||
|
"task": task,
|
||||||
|
"light_name": light_name,
|
||||||
|
"parameter": parameter,
|
||||||
|
"value": value,
|
||||||
|
"target_time": target_time
|
||||||
|
}
|
||||||
|
|
||||||
|
formatted_time = target_time.strftime("%H:%M:%S")
|
||||||
|
return f"Scheduled change: {light_name}.{parameter} = {value} at {formatted_time} (Task ID: {task_id})"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error scheduling light change: {e}"
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def list_scheduled_tasks() -> str:
|
||||||
|
"""List all scheduled lighting tasks."""
|
||||||
|
try:
|
||||||
|
if not scheduled_tasks:
|
||||||
|
return "No scheduled tasks found."
|
||||||
|
|
||||||
|
result = ["Scheduled lighting tasks:"]
|
||||||
|
for task_id, info in scheduled_tasks.items():
|
||||||
|
time_str = info["target_time"].strftime("%H:%M:%S")
|
||||||
|
result.append(f"• ID: {task_id} - {info['light_name']}.{info['parameter']} = {info['value']} at {time_str}")
|
||||||
|
|
||||||
|
return "\n".join(result)
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error listing scheduled tasks: {e}"
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def cancel_scheduled_task(task_id: str) -> str:
|
||||||
|
"""Cancel a scheduled lighting task.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: ID of the task to cancel
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if task_id not in scheduled_tasks:
|
||||||
|
return f"No task found with ID '{task_id}'"
|
||||||
|
|
||||||
|
# Cancel the task
|
||||||
|
task_info = scheduled_tasks[task_id]
|
||||||
|
task_info["task"].cancel()
|
||||||
|
del scheduled_tasks[task_id]
|
||||||
|
|
||||||
|
return f"Successfully cancelled task: {task_id}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error cancelling task: {e}"
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def group_control(group_name: str = None, parameter: str = None, value: Any = None, light_names: List[str] = None) -> str:
|
||||||
|
"""Control multiple lights as a group.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
group_name: Optional name of predefined group (e.g., room name)
|
||||||
|
parameter: Parameter to change (on, bri, hue, sat)
|
||||||
|
value: Value to set for the parameter
|
||||||
|
light_names: Optional list of specific light names to control as a group
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not hue_controller:
|
||||||
|
return "Error: Hue controller not initialized"
|
||||||
|
|
||||||
|
# Get lights to control
|
||||||
|
target_lights = []
|
||||||
|
|
||||||
|
# If group_name is provided, try to find it in bridge
|
||||||
|
if group_name:
|
||||||
|
try:
|
||||||
|
# Get groups from bridge
|
||||||
|
groups = hue_controller.bridge.get_group()
|
||||||
|
group_found = False
|
||||||
|
|
||||||
|
for group_id, group_data in groups.items():
|
||||||
|
if group_data['name'].lower() == group_name.lower():
|
||||||
|
# Found the group, get its lights
|
||||||
|
group = hue_controller.bridge.get_group(int(group_id), 'lights')
|
||||||
|
light_ids = group['lights']
|
||||||
|
|
||||||
|
# Convert light IDs to names
|
||||||
|
all_lights = hue_controller.bridge.get_light_objects('id')
|
||||||
|
for light_id in light_ids:
|
||||||
|
light = all_lights.get(int(light_id))
|
||||||
|
if light:
|
||||||
|
target_lights.append(light.name)
|
||||||
|
|
||||||
|
group_found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not group_found:
|
||||||
|
return f"Group '{group_name}' not found. Available groups: {', '.join([g['name'] for g in groups.values()])}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error finding group '{group_name}': {e}"
|
||||||
|
|
||||||
|
# If light_names is provided, use those instead/additionally
|
||||||
|
if light_names:
|
||||||
|
target_lights.extend(light_names)
|
||||||
|
|
||||||
|
# Remove duplicates
|
||||||
|
target_lights = list(set(target_lights))
|
||||||
|
|
||||||
|
if not target_lights:
|
||||||
|
return "No lights to control. Provide either group_name or light_names."
|
||||||
|
|
||||||
|
# If parameter and value are provided, update all target lights
|
||||||
|
if parameter and value is not None:
|
||||||
|
results = []
|
||||||
|
for light_name in target_lights:
|
||||||
|
try:
|
||||||
|
hue_controller.update_light(light_name, parameter, value)
|
||||||
|
results.append(f"Updated {light_name}")
|
||||||
|
except Exception as e:
|
||||||
|
results.append(f"Failed to update {light_name}: {e}")
|
||||||
|
|
||||||
|
return f"Group control results:\n" + "\n".join(results)
|
||||||
|
else:
|
||||||
|
return f"Available lights in group: {', '.join(target_lights)}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error in group control: {e}"
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def get_light_info(light_name: str) -> str:
|
||||||
|
"""Get detailed information about a specific light.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
light_name: Name of the light to query
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not hue_controller:
|
||||||
|
return "Error: Hue controller not initialized"
|
||||||
|
|
||||||
|
light = hue_controller.get_light_by_name(light_name)
|
||||||
|
if not light:
|
||||||
|
return f"Light '{light_name}' not found"
|
||||||
|
|
||||||
|
# Get all light details from the bridge
|
||||||
|
light_details = {}
|
||||||
|
for light_id, light_data in hue_controller.bridge.get_light().items():
|
||||||
|
if light_data['name'] == light_name:
|
||||||
|
light_details = light_data
|
||||||
|
break
|
||||||
|
|
||||||
|
# Format the details
|
||||||
|
if light_details:
|
||||||
|
state = light_details.get('state', {})
|
||||||
|
result = [
|
||||||
|
f"Light: {light_name}",
|
||||||
|
f"Type: {light_details.get('type', 'Unknown')}",
|
||||||
|
f"Model ID: {light_details.get('modelid', 'Unknown')}",
|
||||||
|
f"Manufacturer: {light_details.get('manufacturername', 'Unknown')}",
|
||||||
|
f"Software Version: {light_details.get('swversion', 'Unknown')}",
|
||||||
|
"\nCurrent State:",
|
||||||
|
f" On: {state.get('on', 'Unknown')}",
|
||||||
|
f" Brightness: {state.get('bri', 'N/A')}/254",
|
||||||
|
f" Hue: {state.get('hue', 'N/A')}/65535",
|
||||||
|
f" Saturation: {state.get('sat', 'N/A')}/254",
|
||||||
|
f" Color Temperature: {state.get('ct', 'N/A')} mired",
|
||||||
|
f" Reachable: {state.get('reachable', 'Unknown')}"
|
||||||
|
]
|
||||||
|
return "\n".join(result)
|
||||||
|
else:
|
||||||
|
return f"Could not retrieve detailed information for light '{light_name}'"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error getting light information: {e}"
|
||||||
|
|
||||||
|
# Add more tools here as development continues...
|
||||||
|
|
||||||
|
def main(bridge_ip: str = None, update_interval: float = 0.05) -> None:
|
||||||
|
"""
|
||||||
|
Run the MCP server.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bridge_ip: IP address of the Hue Bridge (optional, can be set from config)
|
||||||
|
update_interval: Time between updates in seconds (default: 0.05)
|
||||||
|
"""
|
||||||
|
# Import here to avoid circular import
|
||||||
|
from config import ConfigManager
|
||||||
|
|
||||||
|
if not bridge_ip:
|
||||||
|
# If no bridge IP is provided, try to get it from config
|
||||||
|
config = ConfigManager()
|
||||||
|
bridge_ip = config.get_value("bridge_ip")
|
||||||
|
|
||||||
|
if not bridge_ip:
|
||||||
|
print("Error: No bridge IP provided and none found in config")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Initialize the Hue controller
|
||||||
|
try:
|
||||||
|
print(f"Initializing Hue controller with bridge IP: {bridge_ip}")
|
||||||
|
initialize_hue_controller(bridge_ip, update_interval)
|
||||||
|
print("Hue controller initialized successfully")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error initializing Hue controller: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Run the MCP server
|
||||||
|
print("Starting MCP server...")
|
||||||
|
mcp.run()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Add table
Reference in a new issue