From 0a88ee3c2780964e325e8e72a7127656778c0db8 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Thu, 5 Feb 2026 13:41:36 +0000 Subject: [PATCH] fix: resolve CI/CD issues - all tests pass locally --- src/mcp_server_cli/tools/file_tools.py | 375 +++++++++++++++++++------ 1 file changed, 290 insertions(+), 85 deletions(-) diff --git a/src/mcp_server_cli/tools/file_tools.py b/src/mcp_server_cli/tools/file_tools.py index 4dec036..2494710 100644 --- a/src/mcp_server_cli/tools/file_tools.py +++ b/src/mcp_server_cli/tools/file_tools.py @@ -1,20 +1,23 @@ -"""File system tools for MCP Server CLI.""" +"""File operation tools for MCP Server CLI.""" -import os +import re from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict + +import aiofiles from mcp_server_cli.models import ToolParameter, ToolSchema from mcp_server_cli.tools.base import ToolBase, ToolResult class FileTools(ToolBase): - """Built-in file system operations.""" + """Built-in tools for file operations.""" def __init__(self): + """Initialize file tools.""" super().__init__( name="file_tools", - description="File operations: list, read, write, glob, find", + description="Built-in file operations: read, write, list, search, glob", ) def _create_input_schema(self) -> ToolSchema: @@ -23,31 +26,31 @@ class FileTools(ToolBase): "operation": ToolParameter( name="operation", type="string", - description="File operation: list, read, write, glob, find", + description="Operation to perform: read, write, list, search, glob", required=True, - enum=["list", "read", "write", "glob", "find"], + enum=["read", "write", "list", "search", "glob"], ), "path": ToolParameter( name="path", type="string", - description="Path to operate on", + description="File or directory path", required=True, ), - "pattern": ToolParameter( - name="pattern", - type="string", - description="Glob pattern for glob/find operations", - ), "content": ToolParameter( name="content", type="string", description="Content to write (for write operation)", ), + "pattern": ToolParameter( + name="pattern", + type="string", + description="Search pattern or glob pattern", + ), "recursive": ToolParameter( name="recursive", type="boolean", - description="Recursively list/find (default: false)", - default=False, + description="Search recursively", + default=True, ), }, required=["operation", "path"], @@ -56,98 +59,300 @@ class FileTools(ToolBase): async def execute(self, arguments: Dict[str, Any]) -> ToolResult: """Execute file operation.""" operation = arguments.get("operation") - path = arguments.get("path", ".") - pattern = arguments.get("pattern") - content = arguments.get("content") - recursive = arguments.get("recursive", False) - - base_path = Path(path).expanduser().resolve() - - if not base_path.exists(): - return ToolResult(success=False, output="", error=f"Path does not exist: {path}") + path = arguments.get("path", "") try: - if operation == "list": - return await self._list(base_path, recursive) - elif operation == "read": - return await self._read(base_path) + if operation == "read": + return await self._read(path) elif operation == "write": - return await self._write(base_path, content) + return await self._write(path, arguments.get("content", "")) + elif operation == "list": + return await self._list(path) + elif operation == "search": + return await self._search(path, arguments.get("pattern", ""), arguments.get("recursive", True)) elif operation == "glob": - return await self._glob(base_path, pattern) - elif operation == "find": - return await self._find(base_path, pattern) + return await self._glob(path, arguments.get("pattern", "*")) else: return ToolResult(success=False, output="", error=f"Unknown operation: {operation}") except Exception as e: return ToolResult(success=False, output="", error=str(e)) - async def _list(self, path: Path, recursive: bool) -> ToolResult: - """List directory contents.""" - try: - items = list(path.iterdir()) - result = [] - for item in sorted(items, key=lambda x: (x.is_file(), x.name)): - item_type = "file" if item.is_file() else "dir" - result.append(f"{item_type}: {item.relative_to(path.parent)}") - return ToolResult(success=True, output="\n".join(result)) - except PermissionError: - return ToolResult(success=False, output="", error=f"Permission denied: {path}") + async def _read(self, path: str) -> ToolResult: + """Read a file.""" + if not Path(path).exists(): + return ToolResult(success=False, output="", error=f"File not found: {path}") - async def _read(self, path: Path) -> ToolResult: + if Path(path).is_dir(): + return ToolResult(success=False, output="", error=f"Path is a directory: {path}") + + async with aiofiles.open(path, "r", encoding="utf-8") as f: + content = await f.read() + + return ToolResult(success=True, output=content) + + async def _write(self, path: str, content: str) -> ToolResult: + """Write to a file.""" + path_obj = Path(path) + + if path_obj.exists() and path_obj.is_dir(): + return ToolResult(success=False, output="", error=f"Path is a directory: {path}") + + path_obj.parent.mkdir(parents=True, exist_ok=True) + + async with aiofiles.open(path, "w", encoding="utf-8") as f: + await f.write(content) + + return ToolResult(success=True, output=f"Written to {path}") + + async def _list(self, path: str) -> ToolResult: + """List directory contents.""" + path_obj = Path(path) + + if not path_obj.exists(): + return ToolResult(success=False, output="", error=f"Directory not found: {path}") + + if not path_obj.is_dir(): + return ToolResult(success=False, output="", error=f"Path is not a directory: {path}") + + items = [] + for item in sorted(path_obj.iterdir()): + item_type = "DIR" if item.is_dir() else "FILE" + items.append(f"[{item_type}] {item.name}") + + return ToolResult(success=True, output="\n".join(items)) + + async def _search(self, path: str, pattern: str, recursive: bool = True) -> ToolResult: + """Search for pattern in files.""" + if not Path(path).exists(): + return ToolResult(success=False, output="", error=f"Path not found: {path}") + + results = [] + pattern_re = re.compile(pattern) + + if Path(path).is_file(): + file_paths = [Path(path)] + else: + glob_pattern = "**/*" if recursive else "*" + file_paths = list(Path(path).glob(glob_pattern)) + file_paths = [p for p in file_paths if p.is_file()] + + for file_path in file_paths: + try: + async with aiofiles.open(file_path, "r", encoding="utf-8", errors="ignore") as f: + lines = await f.readlines() + for i, line in enumerate(lines, 1): + if pattern_re.search(line): + results.append(f"{file_path}:{i}: {line.strip()}") + except Exception: + continue + + if not results: + return ToolResult(success=True, output="No matches found") + + return ToolResult(success=True, output="\n".join(results[:100])) + + async def _glob(self, path: str, pattern: str) -> ToolResult: + """Find files matching glob pattern.""" + base_path = Path(path) + + if not base_path.exists(): + return ToolResult(success=False, output="", error=f"Path not found: {path}") + + matches = list(base_path.glob(pattern)) + + if not matches: + return ToolResult(success=True, output="No matches found") + + results = [str(m) for m in sorted(matches)] + return ToolResult(success=True, output="\n".join(results)) + + +class ReadFileTool(ToolBase): + """Tool for reading files.""" + + def __init__(self): + super().__init__( + name="read_file", + description="Read the contents of a file", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "path": ToolParameter( + name="path", + type="string", + description="Path to the file to read", + required=True, + ), + }, + required=["path"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: """Read file contents.""" - if path.is_dir(): + path = arguments.get("path", "") + + if not path: + return ToolResult(success=False, output="", error="Path is required") + + path_obj = Path(path) + + if not path_obj.exists(): + return ToolResult(success=False, output="", error=f"File not found: {path}") + + if path_obj.is_dir(): return ToolResult(success=False, output="", error=f"Path is a directory: {path}") try: - content = path.read_text(encoding="utf-8") + async with aiofiles.open(path, "r", encoding="utf-8") as f: + content = await f.read() return ToolResult(success=True, output=content) - except UnicodeDecodeError: - with open(path, "rb") as f: - content = f.read() - return ToolResult(success=True, output=f"") - except PermissionError: - return ToolResult(success=False, output="", error=f"Permission denied: {path}") + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + + +class WriteFileTool(ToolBase): + """Tool for writing files.""" + + def __init__(self): + super().__init__( + name="write_file", + description="Write content to a file", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "path": ToolParameter( + name="path", + type="string", + description="Path to the file to write", + required=True, + ), + "content": ToolParameter( + name="content", + type="string", + description="Content to write", + required=True, + ), + }, + required=["path", "content"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """Write content to a file.""" + path = arguments.get("path", "") + content = arguments.get("content", "") + + if not path: + return ToolResult(success=False, output="", error="Path is required") - async def _write(self, path: Path, content: Optional[str]) -> ToolResult: - """Write content to file.""" if content is None: - return ToolResult(success=False, output="", error="No content provided for write operation") + return ToolResult(success=False, output="", error="Content is required") try: - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(content, encoding="utf-8") - return ToolResult(success=True, output=f"Written to: {path}") - except PermissionError: - return ToolResult(success=False, output="", error=f"Permission denied: {path}") + path_obj = Path(path) + path_obj.parent.mkdir(parents=True, exist_ok=True) - async def _glob(self, path: Path, pattern: Optional[str]) -> ToolResult: + async with aiofiles.open(path, "w", encoding="utf-8") as f: + await f.write(content) + + return ToolResult(success=True, output=f"Successfully wrote to {path}") + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + + +class ListDirectoryTool(ToolBase): + """Tool for listing directory contents.""" + + def __init__(self): + super().__init__( + name="list_directory", + description="List contents of a directory", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "path": ToolParameter( + name="path", + type="string", + description="Path to the directory", + required=True, + ), + }, + required=["path"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """List directory contents.""" + path = arguments.get("path", "") + + if not path: + return ToolResult(success=False, output="", error="Path is required") + + path_obj = Path(path) + + if not path_obj.exists(): + return ToolResult(success=False, output="", error=f"Directory not found: {path}") + + if not path_obj.is_dir(): + return ToolResult(success=False, output="", error=f"Path is not a directory: {path}") + + items = [] + for item in sorted(path_obj.iterdir()): + item_type = "DIR" if item.is_dir() else "FILE" + items.append(f"[{item_type}] {item.name}") + + return ToolResult(success=True, output="\n".join(items)) + + +class GlobFilesTool(ToolBase): + """Tool for finding files with glob patterns.""" + + def __init__(self): + super().__init__( + name="glob_files", + description="Find files matching a glob pattern", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "path": ToolParameter( + name="path", + type="string", + description="Base path to search from", + required=True, + ), + "pattern": ToolParameter( + name="pattern", + type="string", + description="Glob pattern", + required=True, + ), + }, + required=["path", "pattern"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: """Find files matching glob pattern.""" - if not pattern: - return ToolResult(success=False, output="", error="Pattern required for glob operation") + path = arguments.get("path", "") + pattern = arguments.get("pattern", "*") - try: - matches = list(path.glob(pattern)) - if not matches: - return ToolResult(success=True, output="No matches found") - result = [str(m.relative_to(path)) for m in sorted(matches)] - return ToolResult(success=True, output="\n".join(result)) - except Exception as e: - return ToolResult(success=False, output="", error=str(e)) + if not path: + return ToolResult(success=False, output="", error="Path is required") - async def _find(self, path: Path, pattern: Optional[str]) -> ToolResult: - """Find files by name pattern.""" - if not pattern: - return ToolResult(success=False, output="", error="Pattern required for find operation") + base_path = Path(path) - if not path.is_dir(): - return ToolResult(success=False, output="", error=f"Not a directory: {path}") + if not base_path.exists(): + return ToolResult(success=False, output="", error=f"Path not found: {path}") - try: - matches = [p for p in path.rglob(pattern) if p.is_file()] - if not matches: - return ToolResult(success=True, output="No matches found") - result = [str(m.relative_to(path)) for m in sorted(matches)[:50]] - return ToolResult(success=True, output="\n".join(result)) - except Exception as e: - return ToolResult(success=False, output="", error=str(e)) + matches = list(base_path.glob(pattern)) + + if not matches: + return ToolResult(success=True, output="No matches found") + + results = [str(m) for m in sorted(matches)] + return ToolResult(success=True, output="\n".join(results))