import os import subprocess import re import logging from datetime import datetime from typing import Optional, List, Dict, Any from pathlib import Path from .config import Config from .models import Command, CommandType, Project from .project import ProjectDetector logger = logging.getLogger(__name__) class CommandRecorder: def __init__(self, config: Optional[Config] = None): self.config = config or Config() self.project_detector = ProjectDetector() self._current_project: Optional[Project] = None def _classify_command(self, command: str) -> CommandType: cmd_lower = command.lower().strip() if cmd_lower.startswith("git "): return CommandType.GIT if cmd_lower.startswith("docker ") or cmd_lower.startswith("docker-compose "): return CommandType.DOCKER if any(cmd_lower.startswith(kw) for kw in ["make ", "cmake ", "gradlew ", "mvn ", "npm run "]): return CommandType.BUILD if any(cmd_lower.startswith(kw) for kw in ["pytest ", "npm test ", "cargo test ", "go test "]): return CommandType.TEST if any(cmd_lower.startswith(kw) for kw in ["kubectl ", "helm ", "aws ", "gcloud "]): return CommandType.DEPLOY if any(cmd_lower.startswith(kw) for kw in ["rm ", "mv ", "cp ", "mkdir ", "chmod ", "chown "]): return CommandType.FILE_OP if any(cmd_lower.startswith(kw) for kw in ["sudo ", "apt ", "yum ", "systemctl "]): return CommandType.SYSTEM return CommandType.OTHER def _extract_tags(self, command: str) -> List[str]: tags = [] cmd_lower = command.lower() if "git" in cmd_lower: tags.append("git") if "test" in cmd_lower: tags.append("testing") if any(kw in cmd_lower for kw in ["deploy", "push", "release"]): tags.append("deployment") if any(kw in cmd_lower for kw in ["debug", "log", "print"]): tags.append("debugging") return tags def record_command( self, command: str, working_directory: Optional[str] = None, project_id: Optional[int] = None, workflow_id: Optional[int] = None, capture_duration: bool = True, capture_exit_code: bool = True, ) -> Command: working_dir = working_directory or os.getcwd() if project_id is None: project = self.project_detector.detect(working_dir) project_id = project.id if project else None exit_code = None duration_ms = None if capture_exit_code or capture_duration: start_time = datetime.utcnow() try: result = subprocess.run( command, shell=True, capture_output=True, timeout=1, ) if capture_exit_code: exit_code = result.returncode except subprocess.TimeoutExpired: if capture_exit_code: exit_code = -1 except Exception as e: logger.warning(f"Failed to capture command result: {e}") if capture_exit_code: exit_code = -2 if capture_duration: duration_ms = int((datetime.utcnow() - start_time).total_seconds() * 1000) cmd = Command( command=command, command_type=self._classify_command(command), exit_code=exit_code, duration_ms=duration_ms, working_directory=working_dir, timestamp=datetime.utcnow(), tags=self._extract_tags(command), project_id=project_id, workflow_id=workflow_id, ) return cmd def record_workflow_commands( self, commands: List[str], working_directory: Optional[str] = None, project_id: Optional[int] = None, ) -> List[Command]: recorded = [] for cmd_str in commands: cmd = self.record_command(cmd_str, working_directory, project_id) recorded.append(cmd) return recorded def sanitize_command(self, command: str) -> str: command = command.strip() command = re.sub(r"^\\s*\\\\\\s*", "", command) if command.startswith("cd "): return command return command