359 lines
12 KiB
Python
359 lines
12 KiB
Python
"""File operation tools for MCP Server CLI."""
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Dict
|
|
|
|
import aiofiles
|
|
|
|
from mcp_server_cli.models import ToolParameter, ToolSchema
|
|
from mcp_server_cli.tools.base import ToolBase, ToolResult
|
|
|
|
|
|
class FileTools(ToolBase):
|
|
"""Built-in tools for file operations."""
|
|
|
|
def __init__(self):
|
|
"""Initialize file tools."""
|
|
super().__init__(
|
|
name="file_tools",
|
|
description="Built-in file operations: read, write, list, search, glob",
|
|
)
|
|
|
|
def _create_input_schema(self) -> ToolSchema:
|
|
return ToolSchema(
|
|
properties={
|
|
"operation": ToolParameter(
|
|
name="operation",
|
|
type="string",
|
|
description="Operation to perform: read, write, list, search, glob",
|
|
required=True,
|
|
enum=["read", "write", "list", "search", "glob"],
|
|
),
|
|
"path": ToolParameter(
|
|
name="path",
|
|
type="string",
|
|
description="File or directory path",
|
|
required=True,
|
|
),
|
|
"content": ToolParameter(
|
|
name="content",
|
|
type="string",
|
|
description="Content to write (for write operation)",
|
|
),
|
|
"pattern": ToolParameter(
|
|
name="pattern",
|
|
type="string",
|
|
description="Search pattern or glob pattern",
|
|
),
|
|
"recursive": ToolParameter(
|
|
name="recursive",
|
|
type="boolean",
|
|
description="Search recursively",
|
|
default=True,
|
|
),
|
|
},
|
|
required=["operation", "path"],
|
|
)
|
|
|
|
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
|
|
"""Execute file operation."""
|
|
operation = arguments.get("operation")
|
|
path = arguments.get("path", "")
|
|
|
|
try:
|
|
if operation == "read":
|
|
return await self._read(path)
|
|
elif operation == "write":
|
|
return await self._write(path, arguments.get("content", ""))
|
|
elif operation == "list":
|
|
return await self._list(path)
|
|
elif operation == "search":
|
|
return await self._search(path, arguments.get("pattern", ""), arguments.get("recursive", True))
|
|
elif operation == "glob":
|
|
return await self._glob(path, arguments.get("pattern", "*"))
|
|
else:
|
|
return ToolResult(success=False, output="", error=f"Unknown operation: {operation}")
|
|
except Exception as e:
|
|
return ToolResult(success=False, output="", error=str(e))
|
|
|
|
async def _read(self, path: str) -> ToolResult:
|
|
"""Read a file."""
|
|
if not Path(path).exists():
|
|
return ToolResult(success=False, output="", error=f"File not found: {path}")
|
|
|
|
if Path(path).is_dir():
|
|
return ToolResult(success=False, output="", error=f"Path is a directory: {path}")
|
|
|
|
async with aiofiles.open(path, "r", encoding="utf-8") as f:
|
|
content = await f.read()
|
|
|
|
return ToolResult(success=True, output=content)
|
|
|
|
async def _write(self, path: str, content: str) -> ToolResult:
|
|
"""Write to a file."""
|
|
path_obj = Path(path)
|
|
|
|
if path_obj.exists() and path_obj.is_dir():
|
|
return ToolResult(success=False, output="", error=f"Path is a directory: {path}")
|
|
|
|
path_obj.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
async with aiofiles.open(path, "w", encoding="utf-8") as f:
|
|
await f.write(content)
|
|
|
|
return ToolResult(success=True, output=f"Written to {path}")
|
|
|
|
async def _list(self, path: str) -> ToolResult:
|
|
"""List directory contents."""
|
|
path_obj = Path(path)
|
|
|
|
if not path_obj.exists():
|
|
return ToolResult(success=False, output="", error=f"Directory not found: {path}")
|
|
|
|
if not path_obj.is_dir():
|
|
return ToolResult(success=False, output="", error=f"Path is not a directory: {path}")
|
|
|
|
items = []
|
|
for item in sorted(path_obj.iterdir()):
|
|
item_type = "DIR" if item.is_dir() else "FILE"
|
|
items.append(f"[{item_type}] {item.name}")
|
|
|
|
return ToolResult(success=True, output="\n".join(items))
|
|
|
|
async def _search(self, path: str, pattern: str, recursive: bool = True) -> ToolResult:
|
|
"""Search for pattern in files."""
|
|
if not Path(path).exists():
|
|
return ToolResult(success=False, output="", error=f"Path not found: {path}")
|
|
|
|
results = []
|
|
pattern_re = re.compile(pattern)
|
|
|
|
if Path(path).is_file():
|
|
file_paths = [Path(path)]
|
|
else:
|
|
glob_pattern = "**/*" if recursive else "*"
|
|
file_paths = list(Path(path).glob(glob_pattern))
|
|
file_paths = [p for p in file_paths if p.is_file()]
|
|
|
|
for file_path in file_paths:
|
|
try:
|
|
async with aiofiles.open(file_path, "r", encoding="utf-8", errors="ignore") as f:
|
|
lines = await f.readlines()
|
|
for i, line in enumerate(lines, 1):
|
|
if pattern_re.search(line):
|
|
results.append(f"{file_path}:{i}: {line.strip()}")
|
|
except Exception:
|
|
continue
|
|
|
|
if not results:
|
|
return ToolResult(success=True, output="No matches found")
|
|
|
|
return ToolResult(success=True, output="\n".join(results[:100]))
|
|
|
|
async def _glob(self, path: str, pattern: str) -> ToolResult:
|
|
"""Find files matching glob pattern."""
|
|
base_path = Path(path)
|
|
|
|
if not base_path.exists():
|
|
return ToolResult(success=False, output="", error=f"Path not found: {path}")
|
|
|
|
matches = list(base_path.glob(pattern))
|
|
|
|
if not matches:
|
|
return ToolResult(success=True, output="No matches found")
|
|
|
|
results = [str(m) for m in sorted(matches)]
|
|
return ToolResult(success=True, output="\n".join(results))
|
|
|
|
|
|
class ReadFileTool(ToolBase):
|
|
"""Tool for reading files."""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
name="read_file",
|
|
description="Read the contents of a file",
|
|
)
|
|
|
|
def _create_input_schema(self) -> ToolSchema:
|
|
return ToolSchema(
|
|
properties={
|
|
"path": ToolParameter(
|
|
name="path",
|
|
type="string",
|
|
description="Path to the file to read",
|
|
required=True,
|
|
),
|
|
},
|
|
required=["path"],
|
|
)
|
|
|
|
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
|
|
"""Read file contents."""
|
|
path = arguments.get("path", "")
|
|
|
|
if not path:
|
|
return ToolResult(success=False, output="", error="Path is required")
|
|
|
|
path_obj = Path(path)
|
|
|
|
if not path_obj.exists():
|
|
return ToolResult(success=False, output="", error=f"File not found: {path}")
|
|
|
|
if path_obj.is_dir():
|
|
return ToolResult(success=False, output="", error=f"Path is a directory: {path}")
|
|
|
|
try:
|
|
async with aiofiles.open(path, "r", encoding="utf-8") as f:
|
|
content = await f.read()
|
|
return ToolResult(success=True, output=content)
|
|
except Exception as e:
|
|
return ToolResult(success=False, output="", error=str(e))
|
|
|
|
|
|
class WriteFileTool(ToolBase):
|
|
"""Tool for writing files."""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
name="write_file",
|
|
description="Write content to a file",
|
|
)
|
|
|
|
def _create_input_schema(self) -> ToolSchema:
|
|
return ToolSchema(
|
|
properties={
|
|
"path": ToolParameter(
|
|
name="path",
|
|
type="string",
|
|
description="Path to the file to write",
|
|
required=True,
|
|
),
|
|
"content": ToolParameter(
|
|
name="content",
|
|
type="string",
|
|
description="Content to write",
|
|
required=True,
|
|
),
|
|
},
|
|
required=["path", "content"],
|
|
)
|
|
|
|
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
|
|
"""Write content to a file."""
|
|
path = arguments.get("path", "")
|
|
content = arguments.get("content", "")
|
|
|
|
if not path:
|
|
return ToolResult(success=False, output="", error="Path is required")
|
|
|
|
if content is None:
|
|
return ToolResult(success=False, output="", error="Content is required")
|
|
|
|
try:
|
|
path_obj = Path(path)
|
|
path_obj.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
async with aiofiles.open(path, "w", encoding="utf-8") as f:
|
|
await f.write(content)
|
|
|
|
return ToolResult(success=True, output=f"Successfully wrote to {path}")
|
|
except Exception as e:
|
|
return ToolResult(success=False, output="", error=str(e))
|
|
|
|
|
|
class ListDirectoryTool(ToolBase):
|
|
"""Tool for listing directory contents."""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
name="list_directory",
|
|
description="List contents of a directory",
|
|
)
|
|
|
|
def _create_input_schema(self) -> ToolSchema:
|
|
return ToolSchema(
|
|
properties={
|
|
"path": ToolParameter(
|
|
name="path",
|
|
type="string",
|
|
description="Path to the directory",
|
|
required=True,
|
|
),
|
|
},
|
|
required=["path"],
|
|
)
|
|
|
|
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
|
|
"""List directory contents."""
|
|
path = arguments.get("path", "")
|
|
|
|
if not path:
|
|
return ToolResult(success=False, output="", error="Path is required")
|
|
|
|
path_obj = Path(path)
|
|
|
|
if not path_obj.exists():
|
|
return ToolResult(success=False, output="", error=f"Directory not found: {path}")
|
|
|
|
if not path_obj.is_dir():
|
|
return ToolResult(success=False, output="", error=f"Path is not a directory: {path}")
|
|
|
|
items = []
|
|
for item in sorted(path_obj.iterdir()):
|
|
item_type = "DIR" if item.is_dir() else "FILE"
|
|
items.append(f"[{item_type}] {item.name}")
|
|
|
|
return ToolResult(success=True, output="\n".join(items))
|
|
|
|
|
|
class GlobFilesTool(ToolBase):
|
|
"""Tool for finding files with glob patterns."""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
name="glob_files",
|
|
description="Find files matching a glob pattern",
|
|
)
|
|
|
|
def _create_input_schema(self) -> ToolSchema:
|
|
return ToolSchema(
|
|
properties={
|
|
"path": ToolParameter(
|
|
name="path",
|
|
type="string",
|
|
description="Base path to search from",
|
|
required=True,
|
|
),
|
|
"pattern": ToolParameter(
|
|
name="pattern",
|
|
type="string",
|
|
description="Glob pattern",
|
|
required=True,
|
|
),
|
|
},
|
|
required=["path", "pattern"],
|
|
)
|
|
|
|
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
|
|
"""Find files matching glob pattern."""
|
|
path = arguments.get("path", "")
|
|
pattern = arguments.get("pattern", "*")
|
|
|
|
if not path:
|
|
return ToolResult(success=False, output="", error="Path is required")
|
|
|
|
base_path = Path(path)
|
|
|
|
if not base_path.exists():
|
|
return ToolResult(success=False, output="", error=f"Path not found: {path}")
|
|
|
|
matches = list(base_path.glob(pattern))
|
|
|
|
if not matches:
|
|
return ToolResult(success=True, output="No matches found")
|
|
|
|
results = [str(m) for m in sorted(matches)]
|
|
return ToolResult(success=True, output="\n".join(results))
|