diff --git a/src/mcp_server_cli/tools/shell_tools.py b/src/mcp_server_cli/tools/shell_tools.py index d7262c0..d59f863 100644 --- a/src/mcp_server_cli/tools/shell_tools.py +++ b/src/mcp_server_cli/tools/shell_tools.py @@ -1,4 +1,4 @@ -"""Shell execution tools for MCP Server CLI.""" +"""Shell command tools for MCP Server CLI.""" import asyncio import os @@ -6,33 +6,17 @@ import subprocess from pathlib import Path from typing import Any, Dict, List, Optional +from mcp_server_cli.models import ToolParameter, ToolSchema from mcp_server_cli.tools.base import ToolBase, ToolResult -from mcp_server_cli.models import ToolSchema, ToolParameter class ShellTools(ToolBase): - """Safe shell command execution tools.""" - - def __init__( - self, - allowed_commands: Optional[List[str]] = None, - blocked_paths: Optional[List[str]] = None, - max_timeout: int = 30, - ): - """Initialize shell tools with security controls. - - Args: - allowed_commands: List of allowed command names. - blocked_paths: List of blocked directory paths. - max_timeout: Maximum command timeout in seconds. - """ - self.allowed_commands = allowed_commands or ["ls", "cat", "echo", "pwd", "git", "grep", "find", "head", "tail"] - self.blocked_paths = blocked_paths or ["/etc", "/root", "/home/*/.ssh"] - self.max_timeout = max_timeout + """Built-in shell command execution.""" + def __init__(self): super().__init__( name="shell_tools", - description="Execute shell commands safely", + description="Shell execution: run commands safely", ) def _create_input_schema(self) -> ToolSchema: @@ -46,20 +30,15 @@ class ShellTools(ToolBase): ), "args": ToolParameter( name="args", - type="array", - description="Command arguments as array", + type="string", + description="Command arguments (space-separated)", ), "timeout": ToolParameter( name="timeout", type="integer", - description="Timeout in seconds", + description="Timeout in seconds (default: 30)", default=30, ), - "cwd": ToolParameter( - name="cwd", - type="string", - description="Working directory", - ), }, required=["command"], ) @@ -67,189 +46,55 @@ class ShellTools(ToolBase): async def execute(self, arguments: Dict[str, Any]) -> ToolResult: """Execute shell command.""" command = arguments.get("command", "") - cmd_args = arguments.get("args", []) - timeout = min(arguments.get("timeout", self.max_timeout), self.max_timeout) - cwd = arguments.get("cwd") + args = arguments.get("args", "") + timeout = arguments.get("timeout", 30) if not command: - return ToolResult(success=False, output="", error="Command is required") - - if not self._is_command_allowed(command): - return ToolResult(success=False, output="", error=f"Command not allowed: {command}") - - return await self._run_command(command, cmd_args, timeout, cwd) - - def _is_command_allowed(self, command: str) -> bool: - """Check if command is in the allowed list.""" - return command in self.allowed_commands - - def _is_path_safe(self, path: str) -> bool: - """Check if path is not blocked.""" - abs_path = str(Path(path).absolute()) - - for blocked in self.blocked_paths: - if blocked.endswith("*"): - if abs_path.startswith(blocked[:-1]): - return False - elif abs_path == blocked or abs_path.startswith(blocked + "/"): - return False - - return True - - async def _run_command( - self, - command: str, - args: List[str], - timeout: int, - cwd: Optional[str], - ) -> ToolResult: - """Run a shell command with security checks.""" - if cwd and not self._is_path_safe(cwd): - return ToolResult(success=False, output="", error=f"Blocked path: {cwd}") - - for arg in args: - if not self._is_path_safe(arg): - return ToolResult(success=False, output="", error=f"Blocked path in arguments: {arg}") - - cmd = [command] + args - work_dir = cwd or str(Path.cwd()) + return ToolResult(success=False, output="", error="No command specified") + security_config = None try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=work_dir, - env={**os.environ, "TERM": "dumb"}, + from mcp_server_cli.config import ConfigManager + config_manager = ConfigManager() + config = config_manager.load() + security_config = config.security + except Exception: + pass + + allowed_commands = ["ls", "cat", "echo", "pwd", "git", "grep", "find", "head", "tail"] + if security_config: + allowed_commands = security_config.allowed_commands + + if command not in allowed_commands: + return ToolResult( + success=False, + output="", + error=f"Command '{command}' not allowed", ) - try: - stdout, stderr = await asyncio.wait_for( - proc.communicate(), - timeout=timeout, - ) - except asyncio.TimeoutError: - proc.kill() - return ToolResult(success=False, output="", error=f"Command timed out after {timeout}s") + blocked_paths = ["/etc", "/root", "/home/*/.ssh"] + if security_config: + blocked_paths = security_config.blocked_paths - stdout_text = stdout.decode("utf-8", errors="replace").strip() - stderr_text = stderr.decode("utf-8", errors="replace").strip() - - if proc.returncode != 0 and not stdout_text: - return ToolResult(success=False, output="", error=stderr_text or f"Command failed with code {proc.returncode}") - - return ToolResult(success=True, output=stdout_text or stderr_text or "") - - except FileNotFoundError: - return ToolResult(success=False, output="", error=f"Command not found: {command}") - except Exception as e: - return ToolResult(success=False, output="", error=str(e)) - - -class ExecuteCommandTool(ToolBase): - """Tool for executing commands.""" - - def __init__(self): - super().__init__( - name="execute_command", - description="Execute a shell command", - ) - - def _create_input_schema(self) -> ToolSchema: - return ToolSchema( - properties={ - "cmd": ToolParameter( - name="cmd", - type="array", - description="Command and arguments as array", - required=True, - ), - "timeout": ToolParameter( - name="timeout", - type="integer", - description="Timeout in seconds", - default=30, - ), - "cwd": ToolParameter( - name="cwd", - type="string", - description="Working directory", - ), - }, - required=["cmd"], - ) - - async def execute(self, arguments: Dict[str, Any]) -> ToolResult: - """Execute a command.""" - cmd = arguments.get("cmd", []) - timeout = arguments.get("timeout", 30) - cwd = arguments.get("cwd") - - if not cmd: - return ToolResult(success=False, output="", error="Command array is required") + full_command = [command] + if args: + full_command.extend(args.split()) try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=cwd, - ) + timeout = min(timeout, security_config.max_shell_timeout if security_config else 30) + except Exception: + pass - stdout, stderr = await asyncio.wait_for( - proc.communicate(), + try: + result = await asyncio.to_thread( + subprocess.run, + full_command, + capture_output=True, + text=True, timeout=timeout, ) - - output = stdout.decode("utf-8", errors="replace").strip() - - if proc.returncode != 0: - error = stderr.decode("utf-8", errors="replace").strip() - return ToolResult(success=False, output=output, error=error) - - return ToolResult(success=True, output=output) - - except asyncio.TimeoutError: + return ToolResult(success=True, output=result.stdout or result.stderr) + except subprocess.TimeoutExpired: return ToolResult(success=False, output="", error=f"Command timed out after {timeout}s") except Exception as e: return ToolResult(success=False, output="", error=str(e)) - - -class ListProcessesTool(ToolBase): - """Tool for listing running processes.""" - - def __init__(self): - super().__init__( - name="list_processes", - description="List running processes", - ) - - def _create_input_schema(self) -> ToolSchema: - return ToolSchema( - properties={ - "full": ToolParameter( - name="full", - type="boolean", - description="Show full command line", - default=False, - ), - }, - ) - - async def execute(self, arguments: Dict[str, Any]) -> ToolResult: - """List processes.""" - try: - cmd = ["ps", "aux"] - if arguments.get("full"): - cmd.extend(["ww", "u"]) - - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=10) - return ToolResult(success=True, output=stdout.decode("utf-8")) - except Exception as e: - return ToolResult(success=False, output="", error=str(e))