From 460f1a6b65ee9294a0bc5755342e085fa43ca667 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Thu, 5 Feb 2026 12:33:32 +0000 Subject: [PATCH] Add tool implementations (file, git, shell, custom) --- src/mcp_server_cli/tools/shell_tools.py | 255 ++++++++++++++++++++++++ 1 file changed, 255 insertions(+) create mode 100644 src/mcp_server_cli/tools/shell_tools.py diff --git a/src/mcp_server_cli/tools/shell_tools.py b/src/mcp_server_cli/tools/shell_tools.py new file mode 100644 index 0000000..d7262c0 --- /dev/null +++ b/src/mcp_server_cli/tools/shell_tools.py @@ -0,0 +1,255 @@ +"""Shell execution tools for MCP Server CLI.""" + +import asyncio +import os +import subprocess +from pathlib import Path +from typing import Any, Dict, List, Optional + +from mcp_server_cli.tools.base import ToolBase, ToolResult +from mcp_server_cli.models import ToolSchema, ToolParameter + + +class ShellTools(ToolBase): + """Safe shell command execution tools.""" + + def __init__( + self, + allowed_commands: Optional[List[str]] = None, + blocked_paths: Optional[List[str]] = None, + max_timeout: int = 30, + ): + """Initialize shell tools with security controls. + + Args: + allowed_commands: List of allowed command names. + blocked_paths: List of blocked directory paths. + max_timeout: Maximum command timeout in seconds. + """ + self.allowed_commands = allowed_commands or ["ls", "cat", "echo", "pwd", "git", "grep", "find", "head", "tail"] + self.blocked_paths = blocked_paths or ["/etc", "/root", "/home/*/.ssh"] + self.max_timeout = max_timeout + + super().__init__( + name="shell_tools", + description="Execute shell commands safely", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "command": ToolParameter( + name="command", + type="string", + description="Command to execute", + required=True, + ), + "args": ToolParameter( + name="args", + type="array", + description="Command arguments as array", + ), + "timeout": ToolParameter( + name="timeout", + type="integer", + description="Timeout in seconds", + default=30, + ), + "cwd": ToolParameter( + name="cwd", + type="string", + description="Working directory", + ), + }, + required=["command"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """Execute shell command.""" + command = arguments.get("command", "") + cmd_args = arguments.get("args", []) + timeout = min(arguments.get("timeout", self.max_timeout), self.max_timeout) + cwd = arguments.get("cwd") + + if not command: + return ToolResult(success=False, output="", error="Command is required") + + if not self._is_command_allowed(command): + return ToolResult(success=False, output="", error=f"Command not allowed: {command}") + + return await self._run_command(command, cmd_args, timeout, cwd) + + def _is_command_allowed(self, command: str) -> bool: + """Check if command is in the allowed list.""" + return command in self.allowed_commands + + def _is_path_safe(self, path: str) -> bool: + """Check if path is not blocked.""" + abs_path = str(Path(path).absolute()) + + for blocked in self.blocked_paths: + if blocked.endswith("*"): + if abs_path.startswith(blocked[:-1]): + return False + elif abs_path == blocked or abs_path.startswith(blocked + "/"): + return False + + return True + + async def _run_command( + self, + command: str, + args: List[str], + timeout: int, + cwd: Optional[str], + ) -> ToolResult: + """Run a shell command with security checks.""" + if cwd and not self._is_path_safe(cwd): + return ToolResult(success=False, output="", error=f"Blocked path: {cwd}") + + for arg in args: + if not self._is_path_safe(arg): + return ToolResult(success=False, output="", error=f"Blocked path in arguments: {arg}") + + cmd = [command] + args + work_dir = cwd or str(Path.cwd()) + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=work_dir, + env={**os.environ, "TERM": "dumb"}, + ) + + try: + stdout, stderr = await asyncio.wait_for( + proc.communicate(), + timeout=timeout, + ) + except asyncio.TimeoutError: + proc.kill() + return ToolResult(success=False, output="", error=f"Command timed out after {timeout}s") + + stdout_text = stdout.decode("utf-8", errors="replace").strip() + stderr_text = stderr.decode("utf-8", errors="replace").strip() + + if proc.returncode != 0 and not stdout_text: + return ToolResult(success=False, output="", error=stderr_text or f"Command failed with code {proc.returncode}") + + return ToolResult(success=True, output=stdout_text or stderr_text or "") + + except FileNotFoundError: + return ToolResult(success=False, output="", error=f"Command not found: {command}") + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + + +class ExecuteCommandTool(ToolBase): + """Tool for executing commands.""" + + def __init__(self): + super().__init__( + name="execute_command", + description="Execute a shell command", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "cmd": ToolParameter( + name="cmd", + type="array", + description="Command and arguments as array", + required=True, + ), + "timeout": ToolParameter( + name="timeout", + type="integer", + description="Timeout in seconds", + default=30, + ), + "cwd": ToolParameter( + name="cwd", + type="string", + description="Working directory", + ), + }, + required=["cmd"], + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """Execute a command.""" + cmd = arguments.get("cmd", []) + timeout = arguments.get("timeout", 30) + cwd = arguments.get("cwd") + + if not cmd: + return ToolResult(success=False, output="", error="Command array is required") + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=cwd, + ) + + stdout, stderr = await asyncio.wait_for( + proc.communicate(), + timeout=timeout, + ) + + output = stdout.decode("utf-8", errors="replace").strip() + + if proc.returncode != 0: + error = stderr.decode("utf-8", errors="replace").strip() + return ToolResult(success=False, output=output, error=error) + + return ToolResult(success=True, output=output) + + except asyncio.TimeoutError: + return ToolResult(success=False, output="", error=f"Command timed out after {timeout}s") + except Exception as e: + return ToolResult(success=False, output="", error=str(e)) + + +class ListProcessesTool(ToolBase): + """Tool for listing running processes.""" + + def __init__(self): + super().__init__( + name="list_processes", + description="List running processes", + ) + + def _create_input_schema(self) -> ToolSchema: + return ToolSchema( + properties={ + "full": ToolParameter( + name="full", + type="boolean", + description="Show full command line", + default=False, + ), + }, + ) + + async def execute(self, arguments: Dict[str, Any]) -> ToolResult: + """List processes.""" + try: + cmd = ["ps", "aux"] + if arguments.get("full"): + cmd.extend(["ww", "u"]) + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) + return ToolResult(success=True, output=stdout.decode("utf-8")) + except Exception as e: + return ToolResult(success=False, output="", error=str(e))