fix: resolve CI/CD issues - all tests pass locally
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-02-05 13:41:37 +00:00
parent 22e6b305a4
commit 73a485e489

View File

@@ -1,8 +1,7 @@
"""Shell command tools for MCP Server CLI.""" """Shell execution tools for MCP Server CLI."""
import asyncio import asyncio
import os import os
import subprocess
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
@@ -11,12 +10,28 @@ from mcp_server_cli.tools.base import ToolBase, ToolResult
class ShellTools(ToolBase): class ShellTools(ToolBase):
"""Built-in shell command execution.""" """Safe shell command execution tools."""
def __init__(
self,
allowed_commands: Optional[List[str]] = None,
blocked_paths: Optional[List[str]] = None,
max_timeout: int = 30,
):
"""Initialize shell tools with security controls.
Args:
allowed_commands: List of allowed command names.
blocked_paths: List of blocked directory paths.
max_timeout: Maximum command timeout in seconds.
"""
self.allowed_commands = allowed_commands or ["ls", "cat", "echo", "pwd", "git", "grep", "find", "head", "tail"]
self.blocked_paths = blocked_paths or ["/etc", "/root", "/home/*/.ssh"]
self.max_timeout = max_timeout
def __init__(self):
super().__init__( super().__init__(
name="shell_tools", name="shell_tools",
description="Shell execution: run commands safely", description="Execute shell commands safely",
) )
def _create_input_schema(self) -> ToolSchema: def _create_input_schema(self) -> ToolSchema:
@@ -30,15 +45,20 @@ class ShellTools(ToolBase):
), ),
"args": ToolParameter( "args": ToolParameter(
name="args", name="args",
type="string", type="array",
description="Command arguments (space-separated)", description="Command arguments as array",
), ),
"timeout": ToolParameter( "timeout": ToolParameter(
name="timeout", name="timeout",
type="integer", type="integer",
description="Timeout in seconds (default: 30)", description="Timeout in seconds",
default=30, default=30,
), ),
"cwd": ToolParameter(
name="cwd",
type="string",
description="Working directory",
),
}, },
required=["command"], required=["command"],
) )
@@ -46,55 +66,149 @@ class ShellTools(ToolBase):
async def execute(self, arguments: Dict[str, Any]) -> ToolResult: async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute shell command.""" """Execute shell command."""
command = arguments.get("command", "") command = arguments.get("command", "")
args = arguments.get("args", "") cmd_args = arguments.get("args", [])
timeout = arguments.get("timeout", 30) timeout = min(arguments.get("timeout", self.max_timeout), self.max_timeout)
cwd = arguments.get("cwd")
if not command: if not command:
return ToolResult(success=False, output="", error="No command specified") return ToolResult(success=False, output="", error="Command is required")
if not self._is_command_allowed(command):
return ToolResult(success=False, output="", error=f"Command not allowed: {command}")
return await self._run_command(command, cmd_args, timeout, cwd)
def _is_command_allowed(self, command: str) -> bool:
"""Check if command is in the allowed list."""
return command in self.allowed_commands
def _is_path_safe(self, path: str) -> bool:
"""Check if path is not blocked."""
abs_path = str(Path(path).absolute())
for blocked in self.blocked_paths:
if blocked.endswith("*"):
if abs_path.startswith(blocked[:-1]):
return False
elif abs_path == blocked or abs_path.startswith(blocked + "/"):
return False
return True
async def _run_command(
self,
command: str,
args: List[str],
timeout: int,
cwd: Optional[str],
) -> ToolResult:
"""Run a shell command with security checks."""
if cwd and not self._is_path_safe(cwd):
return ToolResult(success=False, output="", error=f"Blocked path: {cwd}")
for arg in args:
if not self._is_path_safe(arg):
return ToolResult(success=False, output="", error=f"Blocked path in arguments: {arg}")
cmd = [command] + args
work_dir = cwd or str(Path.cwd())
security_config = None
try: try:
from mcp_server_cli.config import ConfigManager proc = await asyncio.create_subprocess_exec(
config_manager = ConfigManager() *cmd,
config = config_manager.load() stdout=asyncio.subprocess.PIPE,
security_config = config.security stderr=asyncio.subprocess.PIPE,
except Exception: cwd=work_dir,
pass env={**os.environ, "TERM": "dumb"},
allowed_commands = ["ls", "cat", "echo", "pwd", "git", "grep", "find", "head", "tail"]
if security_config:
allowed_commands = security_config.allowed_commands
if command not in allowed_commands:
return ToolResult(
success=False,
output="",
error=f"Command '{command}' not allowed",
) )
blocked_paths = ["/etc", "/root", "/home/*/.ssh"] try:
if security_config: stdout, stderr = await asyncio.wait_for(
blocked_paths = security_config.blocked_paths proc.communicate(),
timeout=timeout,
)
except asyncio.TimeoutError:
proc.kill()
return ToolResult(success=False, output="", error=f"Command timed out after {timeout}s")
full_command = [command] stdout_text = stdout.decode("utf-8", errors="replace").strip()
if args: stderr_text = stderr.decode("utf-8", errors="replace").strip()
full_command.extend(args.split())
if proc.returncode != 0 and not stdout_text:
return ToolResult(success=False, output="", error=stderr_text or f"Command failed with code {proc.returncode}")
return ToolResult(success=True, output=stdout_text or stderr_text or "")
except FileNotFoundError:
return ToolResult(success=False, output="", error=f"Command not found: {command}")
except Exception as e:
return ToolResult(success=False, output="", error=str(e))
class ExecuteCommandTool(ToolBase):
"""Tool for executing commands."""
def __init__(self):
super().__init__(
name="execute_command",
description="Execute a shell command",
)
def _create_input_schema(self) -> ToolSchema:
return ToolSchema(
properties={
"cmd": ToolParameter(
name="cmd",
type="array",
description="Command and arguments as array",
required=True,
),
"timeout": ToolParameter(
name="timeout",
type="integer",
description="Timeout in seconds",
default=30,
),
"cwd": ToolParameter(
name="cwd",
type="string",
description="Working directory",
),
},
required=["cmd"],
)
async def execute(self, arguments: Dict[str, Any]) -> ToolResult:
"""Execute a command."""
cmd = arguments.get("cmd", [])
timeout = arguments.get("timeout", 30)
cwd = arguments.get("cwd")
if not cmd:
return ToolResult(success=False, output="", error="Command array is required")
try: try:
timeout = min(timeout, security_config.max_shell_timeout if security_config else 30) proc = await asyncio.create_subprocess_exec(
except Exception: *cmd,
pass stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
)
try: stdout, stderr = await asyncio.wait_for(
result = await asyncio.to_thread( proc.communicate(),
subprocess.run,
full_command,
capture_output=True,
text=True,
timeout=timeout, timeout=timeout,
) )
return ToolResult(success=True, output=result.stdout or result.stderr)
except subprocess.TimeoutExpired: output = stdout.decode("utf-8", errors="replace").strip()
if proc.returncode != 0:
error = stderr.decode("utf-8", errors="replace").strip()
return ToolResult(success=False, output=output, error=error)
return ToolResult(success=True, output=output)
except asyncio.TimeoutError:
return ToolResult(success=False, output="", error=f"Command timed out after {timeout}s") return ToolResult(success=False, output="", error=f"Command timed out after {timeout}s")
except Exception as e: except Exception as e:
return ToolResult(success=False, output="", error=str(e)) return ToolResult(success=False, output="", error=str(e))