ship-controller/llm_processor.py
2025-07-03 01:53:10 +02:00

248 lines
9 KiB
Python

"""
LLM Processor module for MIDI-to-Hue application.
Processes speech-to-text results through an LLM with MCP tools.
"""
import asyncio
import contextlib
import json
import os
from typing import Dict, Any, List, Optional, Callable
from threading import Thread
# Try to import Anthropic - suggest installation if not found
try:
from anthropic import Anthropic
HAS_ANTHROPIC = True
except ImportError:
HAS_ANTHROPIC = False
print("Warning: Anthropic package not found. Install with: pip install anthropic")
# Try to import dotenv - suggest installation if not found
try:
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file
HAS_DOTENV = True
except ImportError:
HAS_DOTENV = False
print("Warning: python-dotenv package not found. Install with: pip install python-dotenv")
# MCP imports
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
class LLMProcessor:
"""Processes text through LLM and executes MCP tools."""
def __init__(self, mcp_script_path: str):
"""
Initialize the LLM processor.
Args:
mcp_script_path: Path to the MCP server script
"""
self.mcp_script_path = mcp_script_path
self.anthropic = None
self.session = None
self.available_tools = []
self.loop = asyncio.new_event_loop()
self.initialized = False
self.is_processing = False
# Start initialization in background
self.init_thread = Thread(target=self._run_init, daemon=True)
self.init_thread.start()
def _run_init(self):
"""Run initialization in a separate thread."""
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._initialize())
async def _initialize(self):
"""Initialize LLM and MCP client."""
try:
# Initialize Anthropic client
if HAS_ANTHROPIC:
self.anthropic = Anthropic()
print("Initialized Anthropic client")
else:
print("Anthropic package not available, LLM processing disabled")
return
# Connect to MCP server
print(f"Connecting to MCP server: {self.mcp_script_path}")
await self._connect_to_server()
self.initialized = True
print("LLM Processor fully initialized and ready")
except Exception as e:
print(f"Error initializing LLM processor: {e}")
async def _connect_to_server(self):
"""Connect to the MCP server."""
command = "python" # Assuming Python script
server_params = StdioServerParameters(
command=command,
args=[self.mcp_script_path, "--mcp-only"],
env=None
)
self.exit_stack = contextlib.AsyncExitStack()
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
stdio, write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(stdio, write))
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
self.available_tools = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
} for tool in response.tools]
print(f"Connected to MCP server with {len(self.available_tools)} tools")
def process_text(self, text: str, callback: Optional[Callable[[str], None]] = None):
"""
Process text through LLM and execute any tool calls.
Args:
text: The text to process (from speech-to-text)
callback: Optional callback function for results
"""
if self.is_processing:
print("Already processing a request, please wait")
if callback:
callback("Already processing a request, please wait.")
return
if not self.initialized:
print("LLM processor not yet initialized, please wait")
if callback:
callback("LLM processor not yet initialized, please wait.")
return
self.is_processing = True
print("trace: creating task for text processing")
# Start async processing in background
task = self.loop.create_task(self._process_text_async(text, callback))
# Add done callback to handle exceptions
task.add_done_callback(lambda t: self._process_done(t))
def _process_done(self, task):
"""Handle completed processing task."""
self.is_processing = False
# Check for exceptions
if task.exception():
print(f"Error during text processing: {task.exception()}")
async def _process_text_async(self, text: str, callback: Optional[Callable[[str], None]] = None):
print("trace: starting to process text")
"""Process text asynchronously."""
try:
if not self.anthropic or not self.session:
error = "LLM or MCP session not initialized"
print("trace: ", error)
print(error)
if callback:
callback(error)
return
prompt = f"The user said: \"{text}\"\n\nProcess this request to control Philips Hue lights. Use the available tools to fulfill the request."
messages = [
{
"role": "user",
"content": prompt
}
]
print("trace: sending request to LLM", messages)
# Initial LLM call
response = self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=messages,
tools=self.available_tools
)
results = []
final_text = []
# Process response content
for content in response.content:
if content.type == 'text':
final_text.append(content.text)
elif content.type == 'tool_use':
tool_name = content.name
tool_args = content.input
# Execute tool call
result_message = f"\n[Calling tool {tool_name}]"
print(result_message)
final_text.append(result_message)
# Execute the tool
result = await self.session.call_tool(tool_name, tool_args)
results.append({"call": tool_name, "result": result.content})
result_message = f"Result: {result.content}"
print(result_message)
final_text.append(result_message)
# Continue conversation with tool results
if hasattr(content, 'text') and content.text:
messages.append({
"role": "assistant",
"content": content.text
})
messages.append({
"role": "user",
"content": f"Tool result: {result.content}"
})
# Get next response from LLM
follow_up = self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=messages,
)
if follow_up.content:
follow_up_text = follow_up.content[0].text
final_text.append(follow_up_text)
print(follow_up_text)
# Combine all text for final result
final_result = "\n".join(final_text)
# Call the callback with final result
if callback:
callback(final_result)
return final_result
except Exception as e:
error_message = f"Error processing text: {str(e)}"
print(error_message)
if callback:
callback(error_message)
return error_message
def cleanup(self):
"""Clean up resources."""
async def _cleanup():
if hasattr(self, 'exit_stack'):
await self.exit_stack.aclose()
if self.loop and self.loop.is_running():
self.loop.create_task(_cleanup())
else:
# Create a new event loop if needed
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(_cleanup())