122 lines
4.3 KiB
Python
122 lines
4.3 KiB
Python
import os
|
|
import subprocess
|
|
import re
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from pathlib import Path
|
|
|
|
from .config import Config
|
|
from .models import Command, CommandType, Project
|
|
from .project import ProjectDetector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CommandRecorder:
|
|
def __init__(self, config: Optional[Config] = None):
|
|
self.config = config or Config()
|
|
self.project_detector = ProjectDetector()
|
|
self._current_project: Optional[Project] = None
|
|
|
|
def _classify_command(self, command: str) -> CommandType:
|
|
cmd_lower = command.lower().strip()
|
|
if cmd_lower.startswith("git "):
|
|
return CommandType.GIT
|
|
if cmd_lower.startswith("docker ") or cmd_lower.startswith("docker-compose "):
|
|
return CommandType.DOCKER
|
|
if any(cmd_lower.startswith(kw) for kw in ["make ", "cmake ", "gradlew ", "mvn ", "npm run "]):
|
|
return CommandType.BUILD
|
|
if any(cmd_lower.startswith(kw) for kw in ["pytest ", "npm test ", "cargo test ", "go test "]):
|
|
return CommandType.TEST
|
|
if any(cmd_lower.startswith(kw) for kw in ["kubectl ", "helm ", "aws ", "gcloud "]):
|
|
return CommandType.DEPLOY
|
|
if any(cmd_lower.startswith(kw) for kw in ["rm ", "mv ", "cp ", "mkdir ", "chmod ", "chown "]):
|
|
return CommandType.FILE_OP
|
|
if any(cmd_lower.startswith(kw) for kw in ["sudo ", "apt ", "yum ", "systemctl "]):
|
|
return CommandType.SYSTEM
|
|
return CommandType.OTHER
|
|
|
|
def _extract_tags(self, command: str) -> List[str]:
|
|
tags = []
|
|
cmd_lower = command.lower()
|
|
if "git" in cmd_lower:
|
|
tags.append("git")
|
|
if "test" in cmd_lower:
|
|
tags.append("testing")
|
|
if any(kw in cmd_lower for kw in ["deploy", "push", "release"]):
|
|
tags.append("deployment")
|
|
if any(kw in cmd_lower for kw in ["debug", "log", "print"]):
|
|
tags.append("debugging")
|
|
return tags
|
|
|
|
def record_command(
|
|
self,
|
|
command: str,
|
|
working_directory: Optional[str] = None,
|
|
project_id: Optional[int] = None,
|
|
workflow_id: Optional[int] = None,
|
|
capture_duration: bool = True,
|
|
capture_exit_code: bool = True,
|
|
) -> Command:
|
|
working_dir = working_directory or os.getcwd()
|
|
if project_id is None:
|
|
project = self.project_detector.detect(working_dir)
|
|
project_id = project.id if project else None
|
|
|
|
exit_code = None
|
|
duration_ms = None
|
|
if capture_exit_code or capture_duration:
|
|
start_time = datetime.utcnow()
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
shell=True,
|
|
capture_output=True,
|
|
timeout=1,
|
|
)
|
|
if capture_exit_code:
|
|
exit_code = result.returncode
|
|
except subprocess.TimeoutExpired:
|
|
if capture_exit_code:
|
|
exit_code = -1
|
|
except Exception as e:
|
|
logger.warning(f"Failed to capture command result: {e}")
|
|
if capture_exit_code:
|
|
exit_code = -2
|
|
if capture_duration:
|
|
duration_ms = int((datetime.utcnow() - start_time).total_seconds() * 1000)
|
|
|
|
cmd = Command(
|
|
command=command,
|
|
command_type=self._classify_command(command),
|
|
exit_code=exit_code,
|
|
duration_ms=duration_ms,
|
|
working_directory=working_dir,
|
|
timestamp=datetime.utcnow(),
|
|
tags=self._extract_tags(command),
|
|
project_id=project_id,
|
|
workflow_id=workflow_id,
|
|
)
|
|
|
|
return cmd
|
|
|
|
def record_workflow_commands(
|
|
self,
|
|
commands: List[str],
|
|
working_directory: Optional[str] = None,
|
|
project_id: Optional[int] = None,
|
|
) -> List[Command]:
|
|
recorded = []
|
|
for cmd_str in commands:
|
|
cmd = self.record_command(cmd_str, working_directory, project_id)
|
|
recorded.append(cmd)
|
|
return recorded
|
|
|
|
def sanitize_command(self, command: str) -> str:
|
|
command = command.strip()
|
|
command = re.sub(r"^\\s*\\\\\\s*", "", command)
|
|
if command.startswith("cd "):
|
|
return command
|
|
return command
|