Add tool implementations (file, git, shell, custom)
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-02-05 12:33:33 +00:00
parent 460f1a6b65
commit 79cb5e0381

View File

@@ -0,0 +1,307 @@
"""Custom tool loader for YAML/JSON defined tools."""
import asyncio
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Callable
from datetime import datetime
import yaml
from mcp_server_cli.tools.base import ToolBase, ToolResult, ToolRegistry
from mcp_server_cli.models import ToolSchema, ToolParameter, ToolDefinition
class CustomToolLoader:
"""Loader for dynamically loading custom tools from YAML/JSON files."""
def __init__(self, registry: Optional[ToolRegistry] = None):
"""Initialize the custom tool loader.
Args:
registry: Optional tool registry to register loaded tools.
"""
self.registry = registry or ToolRegistry()
self._loaded_tools: Dict[str, Dict[str, Any]] = {}
self._file_watchers: Dict[str, float] = {}
def load_file(self, file_path: str) -> List[ToolDefinition]:
"""Load tools from a YAML or JSON file.
Args:
file_path: Path to the tool definition file.
Returns:
List of loaded tool definitions.
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"Tool file not found: {file_path}")
with open(path, "r") as f:
if path.suffix == ".json":
data = json.load(f)
else:
data = yaml.safe_load(f)
if not isinstance(data, list):
data = [data]
tools = []
for tool_data in data:
tool = self._parse_tool_definition(tool_data, file_path)
if tool:
tools.append(tool)
self._loaded_tools[tool.name] = {
"definition": tool,
"file_path": file_path,
"loaded_at": datetime.now().isoformat(),
}
return tools
def _parse_tool_definition(self, data: Dict[str, Any], source: str) -> Optional[ToolDefinition]:
"""Parse a tool definition from raw data.
Args:
data: Raw tool definition data.
source: Source file path for error messages.
Returns:
ToolDefinition or None if invalid.
"""
try:
name = data.get("name")
if not name:
raise ValueError(f"Tool missing 'name' field in {source}")
description = data.get("description", "")
input_schema_data = data.get("input_schema", {})
properties = {}
required_fields = input_schema_data.get("required", [])
for prop_name, prop_data in input_schema_data.get("properties", {}).items():
param = ToolParameter(
name=prop_name,
type=prop_data.get("type", "string"),
description=prop_data.get("description"),
required=prop_name in required_fields,
enum=prop_data.get("enum"),
default=prop_data.get("default"),
)
properties[prop_name] = param
input_schema = ToolSchema(
type=input_schema_data.get("type", "object"),
properties=properties,
required=required_fields,
)
return ToolDefinition(
name=name,
description=description,
input_schema=input_schema,
annotations=data.get("annotations"),
)
except Exception as e:
raise ValueError(f"Invalid tool definition in {source}: {e}")
def load_directory(self, directory: str, pattern: str = "*.yaml") -> List[ToolDefinition]:
"""Load all tool files from a directory.
Args:
directory: Directory to scan.
pattern: File pattern to match.
Returns:
List of loaded tool definitions.
"""
dir_path = Path(directory)
if not dir_path.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
tools = []
for file_path in dir_path.glob(pattern):
try:
loaded = self.load_file(str(file_path))
tools.extend(loaded)
except Exception as e:
print(f"Warning: Failed to load {file_path}: {e}")
for json_path in dir_path.glob("*.json"):
try:
loaded = self.load_file(str(json_path))
tools.extend(loaded)
except Exception as e:
print(f"Warning: Failed to load {json_path}: {e}")
return tools
def create_tool_from_definition(
self,
definition: ToolDefinition,
executor: Optional[Callable[[Dict[str, Any]], Any]] = None,
) -> ToolBase:
"""Create a ToolBase instance from a definition.
Args:
definition: Tool definition.
executor: Optional executor function.
Returns:
ToolBase implementation.
"""
class DynamicTool(ToolBase):
def __init__(self, defn, exec_fn):
self._defn = defn
self._exec_fn = exec_fn
super().__init__(
name=defn.name,
description=defn.description,
annotations=defn.annotations,
)
def _create_input_schema(self) -> ToolSchema:
return self._defn.input_schema
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
if self._exec_fn:
try:
result = self._exec_fn(arguments)
if asyncio.iscoroutine(result):
result = await result
return ToolResult(success=True, output=str(result))
except Exception as e:
return ToolResult(success=False, output="", error=str(e))
return ToolResult(success=False, output="", error="No executor configured")
return DynamicTool(definition, executor)
def register_tool_from_file(
self,
file_path: str,
executor: Optional[Callable[[Dict[str, Any]], Any]] = None,
) -> Optional[ToolBase]:
"""Load and register a tool from file.
Args:
file_path: Path to tool definition file.
executor: Optional executor function.
Returns:
Registered tool or None.
"""
tools = self.load_file(file_path)
for tool_def in tools:
tool = self.create_tool_from_definition(tool_def, executor)
self.registry.register(tool)
return tool
return None
def reload_if_changed(self) -> List[ToolDefinition]:
"""Reload tools if files have changed.
Returns:
List of reloaded tool definitions.
"""
reloaded = []
for file_path, last_mtime in list(self._file_watchers.items()):
path = Path(file_path)
if not path.exists():
continue
current_mtime = path.stat().st_mtime
if current_mtime > last_mtime:
try:
tools = self.load_file(file_path)
reloaded.extend(tools)
self._file_watchers[file_path] = current_mtime
except Exception as e:
print(f"Warning: Failed to reload {file_path}: {e}")
return reloaded
def watch_file(self, file_path: str):
"""Add a file to be watched for changes.
Args:
file_path: Path to watch.
"""
path = Path(file_path)
if path.exists():
self._file_watchers[file_path] = path.stat().st_mtime
def list_loaded(self) -> Dict[str, Dict[str, Any]]:
"""List all loaded custom tools.
Returns:
Dictionary of tool name to metadata.
"""
return dict(self._loaded_tools)
def get_registry(self) -> ToolRegistry:
"""Get the internal tool registry.
Returns:
ToolRegistry with all loaded tools.
"""
return self.registry
class DynamicTool(ToolBase):
"""A dynamically created tool from a definition."""
def __init__(
self,
name: str,
description: str,
input_schema: ToolSchema,
executor: Callable[[Dict[str, Any]], Any],
annotations: Optional[Dict[str, Any]] = None,
):
"""Initialize a dynamic tool.
Args:
name: Tool name.
description: Tool description.
input_schema: Tool input schema.
executor: Function to execute the tool.
annotations: Optional annotations.
"""
super().__init__(name=name, description=description, annotations=annotations)
self._input_schema = input_schema
self._executor = executor
def _create_input_schema(self) -> ToolSchema:
return self._input_schema
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute the dynamic tool."""
try:
result = self._executor(arguments)
if asyncio.iscoroutine(result):
result = await result
return ToolResult(success=True, output=str(result))
except Exception as e:
return ToolResult(success=False, output="", error=str(e))
def create_python_executor(module_path: str, function_name: str) -> Callable:
"""Create an executor from a Python function.
Args:
module_path: Path to Python module.
function_name: Name of function to call.
Returns:
Callable executor function.
"""
import importlib.util
spec = importlib.util.spec_from_file_location("dynamic_tool", module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return getattr(module, function_name)