Add source files: core modules (auth, config, models)
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-02-05 12:29:04 +00:00
parent 0cee3a4178
commit 4eaf8acfda

243
src/mcp_server_cli/auth.py Normal file
View File

@@ -0,0 +1,243 @@
"""Authentication and local LLM configuration for MCP Server CLI."""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
import httpx
import json
from mcp_server_cli.models import LocalLLMConfig
class LLMMessage(BaseModel):
"""A message in an LLM conversation."""
role: str
content: str
class LLMChoice(BaseModel):
"""A choice in an LLM response."""
index: int
message: LLMMessage
finish_reason: Optional[str] = None
class LLMResponse(BaseModel):
"""Response from an LLM provider."""
id: str
object: str
created: int
model: str
choices: List[LLMChoice]
usage: Optional[Dict[str, Any]] = None
class ChatCompletionRequest(BaseModel):
"""Request for chat completion."""
messages: List[Dict[str, str]]
model: str
temperature: Optional[float] = None
max_tokens: Optional[int] = None
stream: Optional[bool] = False
class LocalLLMClient:
"""Client for interacting with local LLM providers."""
def __init__(self, config: LocalLLMConfig):
"""Initialize the LLM client.
Args:
config: Local LLM configuration.
"""
self.config = config
self.base_url = config.base_url.rstrip("/")
self.model = config.model
self.timeout = config.timeout
async def chat_complete(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
stream: bool = False,
) -> LLMResponse:
"""Send a chat completion request to the local LLM.
Args:
messages: List of conversation messages.
temperature: Sampling temperature.
max_tokens: Maximum tokens to generate.
stream: Whether to stream the response.
Returns:
LLM response with generated text.
"""
payload = {
"messages": messages,
"model": self.model,
"temperature": temperature or self.config.temperature,
"max_tokens": max_tokens or self.config.max_tokens,
"stream": stream,
}
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.base_url}/v1/chat/completions",
json=payload,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
data = response.json()
return LLMResponse(
id=data.get("id", "local-llm"),
object=data.get("object", "chat.completion"),
created=data.get("created", 0),
model=data.get("model", self.model),
choices=[
LLMChoice(
index=choice.get("index", 0),
message=LLMMessage(
role=choice.get("message", {}).get("role", "assistant"),
content=choice.get("message", {}).get("content", ""),
),
finish_reason=choice.get("finish_reason"),
)
for choice in data.get("choices", [])
],
usage=data.get("usage"),
)
async def stream_chat_complete(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
):
"""Stream a chat completion from the local LLM.
Args:
messages: List of conversation messages.
temperature: Sampling temperature.
max_tokens: Maximum tokens to generate.
Yields:
Chunks of generated text.
"""
payload = {
"messages": messages,
"model": self.model,
"temperature": temperature or self.config.temperature,
"max_tokens": max_tokens or self.config.max_tokens,
"stream": True,
}
async with httpx.AsyncClient(timeout=self.timeout) as client:
async with client.stream(
"POST",
f"{self.base_url}/v1/chat/completions",
json=payload,
headers={"Content-Type": "application/json"},
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
async def test_connection(self) -> Dict[str, Any]:
"""Test the connection to the local LLM.
Returns:
Dictionary with connection status and details.
"""
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(f"{self.base_url}/api/tags")
if response.status_code == 200:
return {"status": "connected", "details": response.json()}
except httpx.RequestError:
pass
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(f"{self.base_url}/v1/models")
if response.status_code == 200:
return {"status": "connected", "details": response.json()}
except httpx.RequestError:
pass
return {"status": "failed", "error": "Could not connect to local LLM server"}
class LLMProviderRegistry:
"""Registry for managing LLM providers."""
def __init__(self):
"""Initialize the provider registry."""
self._providers: Dict[str, LocalLLMClient] = {}
def register(self, name: str, client: LocalLLMClient):
"""Register an LLM provider.
Args:
name: Provider name.
client: LLM client instance.
"""
self._providers[name] = client
def get(self, name: str) -> Optional[LocalLLMClient]:
"""Get an LLM provider by name.
Args:
name: Provider name.
Returns:
LLM client or None if not found.
"""
return self._providers.get(name)
def list_providers(self) -> List[str]:
"""List all registered provider names.
Returns:
List of provider names.
"""
return list(self._providers.keys())
def create_default(self, config: LocalLLMConfig) -> LocalLLMClient:
"""Create and register the default LLM provider.
Args:
config: Local LLM configuration.
Returns:
Created LLM client.
"""
client = LocalLLMClient(config)
self.register("default", client)
return client
def create_llm_client(config: LocalLLMConfig) -> LocalLLMClient:
"""Create an LLM client from configuration.
Args:
config: Local LLM configuration.
Returns:
Configured LLM client.
"""
return LocalLLMClient(config)