From 703150713e8419a7271b6f633eac220a2304338b Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Thu, 5 Feb 2026 12:33:31 +0000 Subject: [PATCH] Add tool implementations (file, git, shell, custom) --- src/mcp_server_cli/tools/file_tools.py | 359 +++++++++++++++++++++++++ 1 file changed, 359 insertions(+) create mode 100644 src/mcp_server_cli/tools/file_tools.py diff --git a/src/mcp_server_cli/tools/file_tools.py b/src/mcp_server_cli/tools/file_tools.py new file mode 100644 index 0000000..ba4d87f --- /dev/null +++ b/src/mcp_server_cli/tools/file_tools.py @@ -0,0 +1,359 @@ +"""File operation tools for MCP Server CLI.""" + +import os +import glob as glob_lib +import aiofiles +from pathlib import Path +from typing import Any, Dict, List, Optional +import re + +from mcp_server_cli.tools.base import ToolBase, ToolResult +from mcp_server_cli.models import ToolSchema, ToolParameter + + +class FileTools(ToolBase): + """Built-in tools for file operations.""" + + def __init__(self): + """Initialize file tools.""" + super().__init__( + name="file_tools", + description="Built-in file operations: read, write, list, search, glob", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "operation": ToolParameter( + name="operation", + type="string", + description="Operation to perform: read, write, list, search, glob", + required=True, + enum=["read", "write", "list", "search", "glob"], + ), + "path": ToolParameter( + name="path", + type="string", + description="File or directory path", + required=True, + ), + "content": ToolParameter( + name="content", + type="string", + description="Content to write (for write operation)", + ), + "pattern": ToolParameter( + name="pattern", + type="string", + description="Search pattern or glob pattern", + ), + "recursive": ToolParameter( + name="recursive", + type="boolean", + description="Search recursively", + default=True, + ), + }, + required=["operation", "path"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """Execute file operation.""" + operation = arguments.get("operation") + path = arguments.get("path", "") + + try: + if operation == "read": + return await self._read(path) + elif operation == "write": + return await self._write(path, arguments.get("content", "")) + elif operation == "list": + return await self._list(path) + elif operation == "search": + return await self._search(path, arguments.get("pattern", ""), arguments.get("recursive", True)) + elif operation == "glob": + return await self._glob(path, arguments.get("pattern", "*")) + else: + return ToolResult(success=False, output="", error=f"Unknown operation: {operation}") + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + + async def _read(self, path: str) -> ToolResult: + """Read a file.""" + if not Path(path).exists(): + return ToolResult(success=False, output="", error=f"File not found: {path}") + + if Path(path).is_dir(): + return ToolResult(success=False, output="", error=f"Path is a directory: {path}") + + async with aiofiles.open(path, "r", encoding="utf-8") as f: + content = await f.read() + + return ToolResult(success=True, output=content) + + async def _write(self, path: str, content: str) -> ToolResult: + """Write to a file.""" + path_obj = Path(path) + + if path_obj.exists() and path_obj.is_dir(): + return ToolResult(success=False, output="", error=f"Path is a directory: {path}") + + path_obj.parent.mkdir(parents=True, exist_ok=True) + + async with aiofiles.open(path, "w", encoding="utf-8") as f: + await f.write(content) + + return ToolResult(success=True, output=f"Written to {path}") + + async def _list(self, path: str) -> ToolResult: + """List directory contents.""" + path_obj = Path(path) + + if not path_obj.exists(): + return ToolResult(success=False, output="", error=f"Directory not found: {path}") + + if not path_obj.is_dir(): + return ToolResult(success=False, output="", error=f"Path is not a directory: {path}") + + items = [] + for item in sorted(path_obj.iterdir()): + item_type = "DIR" if item.is_dir() else "FILE" + items.append(f"[{item_type}] {item.name}") + + return ToolResult(success=True, output="\n".join(items)) + + async def _search(self, path: str, pattern: str, recursive: bool = True) -> ToolResult: + """Search for pattern in files.""" + if not Path(path).exists(): + return ToolResult(success=False, output="", error=f"Path not found: {path}") + + results = [] + pattern_re = re.compile(pattern) + + if Path(path).is_file(): + file_paths = [Path(path)] + else: + glob_pattern = "**/*" if recursive else "*" + file_paths = list(Path(path).glob(glob_pattern)) + file_paths = [p for p in file_paths if p.is_file()] + + for file_path in file_paths: + try: + async with aiofiles.open(file_path, "r", encoding="utf-8", errors="ignore") as f: + lines = await f.readlines() + for i, line in enumerate(lines, 1): + if pattern_re.search(line): + results.append(f"{file_path}:{i}: {line.strip()}") + except Exception: + continue + + if not results: + return ToolResult(success=True, output="No matches found") + + return ToolResult(success=True, output="\n".join(results[:100])) + + async def _glob(self, path: str, pattern: str) -> ToolResult: + """Find files matching glob pattern.""" + base_path = Path(path) + + if not base_path.exists(): + return ToolResult(success=False, output="", error=f"Path not found: {path}") + + matches = list(base_path.glob(pattern)) + + if not matches: + return ToolResult(success=True, output="No matches found") + + results = [str(m) for m in sorted(matches)] + return ToolResult(success=True, output="\n".join(results)) + + +class ReadFileTool(ToolBase): + """Tool for reading files.""" + + def __init__(self): + super().__init__( + name="read_file", + description="Read the contents of a file", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "path": ToolParameter( + name="path", + type="string", + description="Path to the file to read", + required=True, + ), + }, + required=["path"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """Read file contents.""" + path = arguments.get("path", "") + + if not path: + return ToolResult(success=False, output="", error="Path is required") + + path_obj = Path(path) + + if not path_obj.exists(): + return ToolResult(success=False, output="", error=f"File not found: {path}") + + if path_obj.is_dir(): + return ToolResult(success=False, output="", error=f"Path is a directory: {path}") + + try: + async with aiofiles.open(path, "r", encoding="utf-8") as f: + content = await f.read() + return ToolResult(success=True, output=content) + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + + +class WriteFileTool(ToolBase): + """Tool for writing files.""" + + def __init__(self): + super().__init__( + name="write_file", + description="Write content to a file", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "path": ToolParameter( + name="path", + type="string", + description="Path to the file to write", + required=True, + ), + "content": ToolParameter( + name="content", + type="string", + description="Content to write", + required=True, + ), + }, + required=["path", "content"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """Write content to a file.""" + path = arguments.get("path", "") + content = arguments.get("content", "") + + if not path: + return ToolResult(success=False, output="", error="Path is required") + + if content is None: + return ToolResult(success=False, output="", error="Content is required") + + try: + path_obj = Path(path) + path_obj.parent.mkdir(parents=True, exist_ok=True) + + async with aiofiles.open(path, "w", encoding="utf-8") as f: + await f.write(content) + + return ToolResult(success=True, output=f"Successfully wrote to {path}") + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + + +class ListDirectoryTool(ToolBase): + """Tool for listing directory contents.""" + + def __init__(self): + super().__init__( + name="list_directory", + description="List contents of a directory", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "path": ToolParameter( + name="path", + type="string", + description="Path to the directory", + required=True, + ), + }, + required=["path"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """List directory contents.""" + path = arguments.get("path", "") + + if not path: + return ToolResult(success=False, output="", error="Path is required") + + path_obj = Path(path) + + if not path_obj.exists(): + return ToolResult(success=False, output="", error=f"Directory not found: {path}") + + if not path_obj.is_dir(): + return ToolResult(success=False, output="", error=f"Path is not a directory: {path}") + + items = [] + for item in sorted(path_obj.iterdir()): + item_type = "DIR" if item.is_dir() else "FILE" + items.append(f"[{item_type}] {item.name}") + + return ToolResult(success=True, output="\n".join(items)) + + +class GlobFilesTool(ToolBase): + """Tool for finding files with glob patterns.""" + + def __init__(self): + super().__init__( + name="glob_files", + description="Find files matching a glob pattern", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "path": ToolParameter( + name="path", + type="string", + description="Base path to search from", + required=True, + ), + "pattern": ToolParameter( + name="pattern", + type="string", + description="Glob pattern", + required=True, + ), + }, + required=["path", "pattern"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """Find files matching glob pattern.""" + path = arguments.get("path", "") + pattern = arguments.get("pattern", "*") + + if not path: + return ToolResult(success=False, output="", error="Path is required") + + base_path = Path(path) + + if not base_path.exists(): + return ToolResult(success=False, output="", error=f"Path not found: {path}") + + matches = list(base_path.glob(pattern)) + + if not matches: + return ToolResult(success=True, output="No matches found") + + results = [str(m) for m in sorted(matches)] + return ToolResult(success=True, output="\n".join(results))