From 79cb5e03810ee607b21294c8428b41812978b3bd Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Thu, 5 Feb 2026 12:33:33 +0000 Subject: [PATCH] Add tool implementations (file, git, shell, custom) --- src/mcp_server_cli/tools/custom_tools.py | 307 +++++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 src/mcp_server_cli/tools/custom_tools.py diff --git a/src/mcp_server_cli/tools/custom_tools.py b/src/mcp_server_cli/tools/custom_tools.py new file mode 100644 index 0000000..03d7c0e --- /dev/null +++ b/src/mcp_server_cli/tools/custom_tools.py @@ -0,0 +1,307 @@ +"""Custom tool loader for YAML/JSON defined tools.""" + +import asyncio +import json +import os +from pathlib import Path +from typing import Any, Dict, List, Optional, Callable +from datetime import datetime +import yaml + +from mcp_server_cli.tools.base import ToolBase, ToolResult, ToolRegistry +from mcp_server_cli.models import ToolSchema, ToolParameter, ToolDefinition + + +class CustomToolLoader: + """Loader for dynamically loading custom tools from YAML/JSON files.""" + + def __init__(self, registry: Optional[ToolRegistry] = None): + """Initialize the custom tool loader. + + Args: + registry: Optional tool registry to register loaded tools. + """ + self.registry = registry or ToolRegistry() + self._loaded_tools: Dict[str, Dict[str, Any]] = {} + self._file_watchers: Dict[str, float] = {} + + def load_file(self, file_path: str) -> List[ToolDefinition]: + """Load tools from a YAML or JSON file. + + Args: + file_path: Path to the tool definition file. + + Returns: + List of loaded tool definitions. + """ + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"Tool file not found: {file_path}") + + with open(path, "r") as f: + if path.suffix == ".json": + data = json.load(f) + else: + data = yaml.safe_load(f) + + if not isinstance(data, list): + data = [data] + + tools = [] + for tool_data in data: + tool = self._parse_tool_definition(tool_data, file_path) + if tool: + tools.append(tool) + self._loaded_tools[tool.name] = { + "definition": tool, + "file_path": file_path, + "loaded_at": datetime.now().isoformat(), + } + + return tools + + def _parse_tool_definition(self, data: Dict[str, Any], source: str) -> Optional[ToolDefinition]: + """Parse a tool definition from raw data. + + Args: + data: Raw tool definition data. + source: Source file path for error messages. + + Returns: + ToolDefinition or None if invalid. + """ + try: + name = data.get("name") + if not name: + raise ValueError(f"Tool missing 'name' field in {source}") + + description = data.get("description", "") + + input_schema_data = data.get("input_schema", {}) + properties = {} + required_fields = input_schema_data.get("required", []) + + for prop_name, prop_data in input_schema_data.get("properties", {}).items(): + param = ToolParameter( + name=prop_name, + type=prop_data.get("type", "string"), + description=prop_data.get("description"), + required=prop_name in required_fields, + enum=prop_data.get("enum"), + default=prop_data.get("default"), + ) + properties[prop_name] = param + + input_schema = ToolSchema( + type=input_schema_data.get("type", "object"), + properties=properties, + required=required_fields, + ) + + return ToolDefinition( + name=name, + description=description, + input_schema=input_schema, + annotations=data.get("annotations"), + ) + except Exception as e: + raise ValueError(f"Invalid tool definition in {source}: {e}") + + def load_directory(self, directory: str, pattern: str = "*.yaml") -> List[ToolDefinition]: + """Load all tool files from a directory. + + Args: + directory: Directory to scan. + pattern: File pattern to match. + + Returns: + List of loaded tool definitions. + """ + dir_path = Path(directory) + if not dir_path.exists(): + raise FileNotFoundError(f"Directory not found: {directory}") + + tools = [] + for file_path in dir_path.glob(pattern): + try: + loaded = self.load_file(str(file_path)) + tools.extend(loaded) + except Exception as e: + print(f"Warning: Failed to load {file_path}: {e}") + + for json_path in dir_path.glob("*.json"): + try: + loaded = self.load_file(str(json_path)) + tools.extend(loaded) + except Exception as e: + print(f"Warning: Failed to load {json_path}: {e}") + + return tools + + def create_tool_from_definition( + self, + definition: ToolDefinition, + executor: Optional[Callable[[Dict[str, Any]], Any]] = None, + ) -> ToolBase: + """Create a ToolBase instance from a definition. + + Args: + definition: Tool definition. + executor: Optional executor function. + + Returns: + ToolBase implementation. + """ + class DynamicTool(ToolBase): + def __init__(self, defn, exec_fn): + self._defn = defn + self._exec_fn = exec_fn + super().__init__( + name=defn.name, + description=defn.description, + annotations=defn.annotations, + ) + + def _create_input_schema(self) -> ToolSchema: + return self._defn.input_schema + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + if self._exec_fn: + try: + result = self._exec_fn(arguments) + if asyncio.iscoroutine(result): + result = await result + return ToolResult(success=True, output=str(result)) + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + return ToolResult(success=False, output="", error="No executor configured") + + return DynamicTool(definition, executor) + + def register_tool_from_file( + self, + file_path: str, + executor: Optional[Callable[[Dict[str, Any]], Any]] = None, + ) -> Optional[ToolBase]: + """Load and register a tool from file. + + Args: + file_path: Path to tool definition file. + executor: Optional executor function. + + Returns: + Registered tool or None. + """ + tools = self.load_file(file_path) + for tool_def in tools: + tool = self.create_tool_from_definition(tool_def, executor) + self.registry.register(tool) + return tool + return None + + def reload_if_changed(self) -> List[ToolDefinition]: + """Reload tools if files have changed. + + Returns: + List of reloaded tool definitions. + """ + reloaded = [] + + for file_path, last_mtime in list(self._file_watchers.items()): + path = Path(file_path) + if not path.exists(): + continue + + current_mtime = path.stat().st_mtime + if current_mtime > last_mtime: + try: + tools = self.load_file(file_path) + reloaded.extend(tools) + self._file_watchers[file_path] = current_mtime + except Exception as e: + print(f"Warning: Failed to reload {file_path}: {e}") + + return reloaded + + def watch_file(self, file_path: str): + """Add a file to be watched for changes. + + Args: + file_path: Path to watch. + """ + path = Path(file_path) + if path.exists(): + self._file_watchers[file_path] = path.stat().st_mtime + + def list_loaded(self) -> Dict[str, Dict[str, Any]]: + """List all loaded custom tools. + + Returns: + Dictionary of tool name to metadata. + """ + return dict(self._loaded_tools) + + def get_registry(self) -> ToolRegistry: + """Get the internal tool registry. + + Returns: + ToolRegistry with all loaded tools. + """ + return self.registry + + +class DynamicTool(ToolBase): + """A dynamically created tool from a definition.""" + + def __init__( + self, + name: str, + description: str, + input_schema: ToolSchema, + executor: Callable[[Dict[str, Any]], Any], + annotations: Optional[Dict[str, Any]] = None, + ): + """Initialize a dynamic tool. + + Args: + name: Tool name. + description: Tool description. + input_schema: Tool input schema. + executor: Function to execute the tool. + annotations: Optional annotations. + """ + super().__init__(name=name, description=description, annotations=annotations) + self._input_schema = input_schema + self._executor = executor + + def _create_input_schema(self) -> ToolSchema: + return self._input_schema + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """Execute the dynamic tool.""" + try: + result = self._executor(arguments) + if asyncio.iscoroutine(result): + result = await result + return ToolResult(success=True, output=str(result)) + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + + +def create_python_executor(module_path: str, function_name: str) -> Callable: + """Create an executor from a Python function. + + Args: + module_path: Path to Python module. + function_name: Name of function to call. + + Returns: + Callable executor function. + """ + import importlib.util + + spec = importlib.util.spec_from_file_location("dynamic_tool", module_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + return getattr(module, function_name)