"""Authentication and local LLM configuration for MCP Server CLI.""" from typing import Any, Dict, List, Optional from pydantic import BaseModel import httpx import json from mcp_server_cli.models import LocalLLMConfig class LLMMessage(BaseModel): """A message in an LLM conversation.""" role: str content: str class LLMChoice(BaseModel): """A choice in an LLM response.""" index: int message: LLMMessage finish_reason: Optional[str] = None class LLMResponse(BaseModel): """Response from an LLM provider.""" id: str object: str created: int model: str choices: List[LLMChoice] usage: Optional[Dict[str, Any]] = None class ChatCompletionRequest(BaseModel): """Request for chat completion.""" messages: List[Dict[str, str]] model: str temperature: Optional[float] = None max_tokens: Optional[int] = None stream: Optional[bool] = False class LocalLLMClient: """Client for interacting with local LLM providers.""" def __init__(self, config: LocalLLMConfig): """Initialize the LLM client. Args: config: Local LLM configuration. """ self.config = config self.base_url = config.base_url.rstrip("/") self.model = config.model self.timeout = config.timeout async def chat_complete( self, messages: List[Dict[str, str]], temperature: Optional[float] = None, max_tokens: Optional[int] = None, stream: bool = False, ) -> LLMResponse: """Send a chat completion request to the local LLM. Args: messages: List of conversation messages. temperature: Sampling temperature. max_tokens: Maximum tokens to generate. stream: Whether to stream the response. Returns: LLM response with generated text. """ payload = { "messages": messages, "model": self.model, "temperature": temperature or self.config.temperature, "max_tokens": max_tokens or self.config.max_tokens, "stream": stream, } async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/v1/chat/completions", json=payload, headers={"Content-Type": "application/json"}, ) response.raise_for_status() data = response.json() return LLMResponse( id=data.get("id", "local-llm"), object=data.get("object", "chat.completion"), created=data.get("created", 0), model=data.get("model", self.model), choices=[ LLMChoice( index=choice.get("index", 0), message=LLMMessage( role=choice.get("message", {}).get("role", "assistant"), content=choice.get("message", {}).get("content", ""), ), finish_reason=choice.get("finish_reason"), ) for choice in data.get("choices", []) ], usage=data.get("usage"), ) async def stream_chat_complete( self, messages: List[Dict[str, str]], temperature: Optional[float] = None, max_tokens: Optional[int] = None, ): """Stream a chat completion from the local LLM. Args: messages: List of conversation messages. temperature: Sampling temperature. max_tokens: Maximum tokens to generate. Yields: Chunks of generated text. """ payload = { "messages": messages, "model": self.model, "temperature": temperature or self.config.temperature, "max_tokens": max_tokens or self.config.max_tokens, "stream": True, } async with httpx.AsyncClient(timeout=self.timeout) as client: async with client.stream( "POST", f"{self.base_url}/v1/chat/completions", json=payload, headers={"Content-Type": "application/json"}, ) as response: async for line in response.aiter_lines(): if line.startswith("data: "): data = line[6:] if data == "[DONE]": break try: chunk = json.loads(data) delta = chunk.get("choices", [{}])[0].get("delta", {}) content = delta.get("content", "") if content: yield content except json.JSONDecodeError: continue async def test_connection(self) -> Dict[str, Any]: """Test the connection to the local LLM. Returns: Dictionary with connection status and details. """ try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get(f"{self.base_url}/api/tags") if response.status_code == 200: return {"status": "connected", "details": response.json()} except httpx.RequestError: pass try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get(f"{self.base_url}/v1/models") if response.status_code == 200: return {"status": "connected", "details": response.json()} except httpx.RequestError: pass return {"status": "failed", "error": "Could not connect to local LLM server"} class LLMProviderRegistry: """Registry for managing LLM providers.""" def __init__(self): """Initialize the provider registry.""" self._providers: Dict[str, LocalLLMClient] = {} def register(self, name: str, client: LocalLLMClient): """Register an LLM provider. Args: name: Provider name. client: LLM client instance. """ self._providers[name] = client def get(self, name: str) -> Optional[LocalLLMClient]: """Get an LLM provider by name. Args: name: Provider name. Returns: LLM client or None if not found. """ return self._providers.get(name) def list_providers(self) -> List[str]: """List all registered provider names. Returns: List of provider names. """ return list(self._providers.keys()) def create_default(self, config: LocalLLMConfig) -> LocalLLMClient: """Create and register the default LLM provider. Args: config: Local LLM configuration. Returns: Created LLM client. """ client = LocalLLMClient(config) self.register("default", client) return client def create_llm_client(config: LocalLLMConfig) -> LocalLLMClient: """Create an LLM client from configuration. Args: config: Local LLM configuration. Returns: Configured LLM client. """ return LocalLLMClient(config)