""" LLM Processor module for MIDI-to-Hue application. Processes speech-to-text results through an LLM with MCP tools. """ import asyncio import contextlib import json import os from typing import Dict, Any, List, Optional, Callable from threading import Thread # Try to import Anthropic - suggest installation if not found try: from anthropic import Anthropic HAS_ANTHROPIC = True except ImportError: HAS_ANTHROPIC = False print("Warning: Anthropic package not found. Install with: pip install anthropic") # Try to import dotenv - suggest installation if not found try: from dotenv import load_dotenv load_dotenv() # Load environment variables from .env file HAS_DOTENV = True except ImportError: HAS_DOTENV = False print("Warning: python-dotenv package not found. Install with: pip install python-dotenv") # MCP imports from mcp import ClientSession, StdioServerParameters, types from mcp.client.stdio import stdio_client class LLMProcessor: """Processes text through LLM and executes MCP tools.""" def __init__(self, mcp_script_path: str): """ Initialize the LLM processor. Args: mcp_script_path: Path to the MCP server script """ self.mcp_script_path = mcp_script_path self.anthropic = None self.session = None self.available_tools = [] self.loop = asyncio.new_event_loop() self.initialized = False self.is_processing = False # Start initialization in background self.init_thread = Thread(target=self._run_init, daemon=True) self.init_thread.start() def _run_init(self): """Run initialization in a separate thread.""" asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self._initialize()) async def _initialize(self): """Initialize LLM and MCP client.""" try: # Initialize Anthropic client if HAS_ANTHROPIC: self.anthropic = Anthropic() print("Initialized Anthropic client") else: print("Anthropic package not available, LLM processing disabled") return # Connect to MCP server print(f"Connecting to MCP server: {self.mcp_script_path}") await self._connect_to_server() self.initialized = True print("LLM Processor fully initialized and ready") except Exception as e: print(f"Error initializing LLM processor: {e}") async def _connect_to_server(self): """Connect to the MCP server.""" command = "python" # Assuming Python script server_params = StdioServerParameters( command=command, args=[self.mcp_script_path, "--mcp-only"], env=None ) self.exit_stack = contextlib.AsyncExitStack() stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) stdio, write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(stdio, write)) await self.session.initialize() # List available tools response = await self.session.list_tools() self.available_tools = [{ "name": tool.name, "description": tool.description, "input_schema": tool.inputSchema } for tool in response.tools] print(f"Connected to MCP server with {len(self.available_tools)} tools") def process_text(self, text: str, callback: Optional[Callable[[str], None]] = None): """ Process text through LLM and execute any tool calls. Args: text: The text to process (from speech-to-text) callback: Optional callback function for results """ if self.is_processing: print("Already processing a request, please wait") if callback: callback("Already processing a request, please wait.") return if not self.initialized: print("LLM processor not yet initialized, please wait") if callback: callback("LLM processor not yet initialized, please wait.") return self.is_processing = True print("trace: creating task for text processing") # Start async processing in background task = self.loop.create_task(self._process_text_async(text, callback)) # Add done callback to handle exceptions task.add_done_callback(lambda t: self._process_done(t)) def _process_done(self, task): """Handle completed processing task.""" self.is_processing = False # Check for exceptions if task.exception(): print(f"Error during text processing: {task.exception()}") async def _process_text_async(self, text: str, callback: Optional[Callable[[str], None]] = None): print("trace: starting to process text") """Process text asynchronously.""" try: if not self.anthropic or not self.session: error = "LLM or MCP session not initialized" print("trace: ", error) print(error) if callback: callback(error) return prompt = f"The user said: \"{text}\"\n\nProcess this request to control Philips Hue lights. Use the available tools to fulfill the request." messages = [ { "role": "user", "content": prompt } ] print("trace: sending request to LLM", messages) # Initial LLM call response = self.anthropic.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1000, messages=messages, tools=self.available_tools ) results = [] final_text = [] # Process response content for content in response.content: if content.type == 'text': final_text.append(content.text) elif content.type == 'tool_use': tool_name = content.name tool_args = content.input # Execute tool call result_message = f"\n[Calling tool {tool_name}]" print(result_message) final_text.append(result_message) # Execute the tool result = await self.session.call_tool(tool_name, tool_args) results.append({"call": tool_name, "result": result.content}) result_message = f"Result: {result.content}" print(result_message) final_text.append(result_message) # Continue conversation with tool results if hasattr(content, 'text') and content.text: messages.append({ "role": "assistant", "content": content.text }) messages.append({ "role": "user", "content": f"Tool result: {result.content}" }) # Get next response from LLM follow_up = self.anthropic.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=1000, messages=messages, ) if follow_up.content: follow_up_text = follow_up.content[0].text final_text.append(follow_up_text) print(follow_up_text) # Combine all text for final result final_result = "\n".join(final_text) # Call the callback with final result if callback: callback(final_result) return final_result except Exception as e: error_message = f"Error processing text: {str(e)}" print(error_message) if callback: callback(error_message) return error_message def cleanup(self): """Clean up resources.""" async def _cleanup(): if hasattr(self, 'exit_stack'): await self.exit_stack.aclose() if self.loop and self.loop.is_running(): self.loop.create_task(_cleanup()) else: # Create a new event loop if needed loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(_cleanup())