Files
cli-command-memory/.cli_memory/recorder.py
7000pctAUTO 5bd64e6a1f
Some checks failed
CI / test (push) Has been cancelled
Add recorder, project, context, search, and history modules
2026-01-31 08:27:30 +00:00

122 lines
4.3 KiB
Python

import os
import subprocess
import re
import logging
from datetime import datetime
from typing import Optional, List, Dict, Any
from pathlib import Path
from .config import Config
from .models import Command, CommandType, Project
from .project import ProjectDetector
logger = logging.getLogger(__name__)
class CommandRecorder:
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
self.project_detector = ProjectDetector()
self._current_project: Optional[Project] = None
def _classify_command(self, command: str) -> CommandType:
cmd_lower = command.lower().strip()
if cmd_lower.startswith("git "):
return CommandType.GIT
if cmd_lower.startswith("docker ") or cmd_lower.startswith("docker-compose "):
return CommandType.DOCKER
if any(cmd_lower.startswith(kw) for kw in ["make ", "cmake ", "gradlew ", "mvn ", "npm run "]):
return CommandType.BUILD
if any(cmd_lower.startswith(kw) for kw in ["pytest ", "npm test ", "cargo test ", "go test "]):
return CommandType.TEST
if any(cmd_lower.startswith(kw) for kw in ["kubectl ", "helm ", "aws ", "gcloud "]):
return CommandType.DEPLOY
if any(cmd_lower.startswith(kw) for kw in ["rm ", "mv ", "cp ", "mkdir ", "chmod ", "chown "]):
return CommandType.FILE_OP
if any(cmd_lower.startswith(kw) for kw in ["sudo ", "apt ", "yum ", "systemctl "]):
return CommandType.SYSTEM
return CommandType.OTHER
def _extract_tags(self, command: str) -> List[str]:
tags = []
cmd_lower = command.lower()
if "git" in cmd_lower:
tags.append("git")
if "test" in cmd_lower:
tags.append("testing")
if any(kw in cmd_lower for kw in ["deploy", "push", "release"]):
tags.append("deployment")
if any(kw in cmd_lower for kw in ["debug", "log", "print"]):
tags.append("debugging")
return tags
def record_command(
self,
command: str,
working_directory: Optional[str] = None,
project_id: Optional[int] = None,
workflow_id: Optional[int] = None,
capture_duration: bool = True,
capture_exit_code: bool = True,
) -> Command:
working_dir = working_directory or os.getcwd()
if project_id is None:
project = self.project_detector.detect(working_dir)
project_id = project.id if project else None
exit_code = None
duration_ms = None
if capture_exit_code or capture_duration:
start_time = datetime.utcnow()
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
timeout=1,
)
if capture_exit_code:
exit_code = result.returncode
except subprocess.TimeoutExpired:
if capture_exit_code:
exit_code = -1
except Exception as e:
logger.warning(f"Failed to capture command result: {e}")
if capture_exit_code:
exit_code = -2
if capture_duration:
duration_ms = int((datetime.utcnow() - start_time).total_seconds() * 1000)
cmd = Command(
command=command,
command_type=self._classify_command(command),
exit_code=exit_code,
duration_ms=duration_ms,
working_directory=working_dir,
timestamp=datetime.utcnow(),
tags=self._extract_tags(command),
project_id=project_id,
workflow_id=workflow_id,
)
return cmd
def record_workflow_commands(
self,
commands: List[str],
working_directory: Optional[str] = None,
project_id: Optional[int] = None,
) -> List[Command]:
recorded = []
for cmd_str in commands:
cmd = self.record_command(cmd_str, working_directory, project_id)
recorded.append(cmd)
return recorded
def sanitize_command(self, command: str) -> str:
command = command.strip()
command = re.sub(r"^\\s*\\\\\\s*", "", command)
if command.startswith("cd "):
return command
return command