diff --git a/cli_memory/playback.py b/cli_memory/playback.py new file mode 100644 index 0000000..b73e505 --- /dev/null +++ b/cli_memory/playback.py @@ -0,0 +1,175 @@ +import os +import subprocess +import time +import logging +from datetime import datetime +from typing import Optional, List, Dict, Any, Callable + +from .config import Config +from .models import Workflow, Command +from .database import Database + +logger = logging.getLogger(__name__) + + +class WorkflowPlayback: + def __init__(self, config: Optional[Config] = None, db: Optional[Database] = None): + self.config = config or Config() + self.db = db or Database() + + def playback( + self, + workflow_id: int, + speed: float = 1.0, + confirm_each: bool = False, + dry_run: bool = False, + on_command: Optional[Callable[[Command, int], None]] = None, + ) -> Dict[str, Any]: + workflow = self.db.get_workflow(workflow_id) + if not workflow: + raise ValueError(f"Workflow {workflow_id} not found") + + result = { + "workflow_id": workflow_id, + "workflow_name": workflow.name, + "total_commands": len(workflow.commands), + "executed": 0, + "succeeded": 0, + "failed": 0, + "skipped": 0, + "start_time": datetime.utcnow().isoformat(), + "end_time": None, + "commands": [], + } + + for i, cmd in enumerate(workflow.commands): + if self.config.get("playback.pause_on_error", True) and result["failed"] > 0: + result["skipped"] += 1 + result["commands"].append({ + "index": i, + "command": cmd.command, + "status": "skipped", + "reason": "Previous command failed", + }) + continue + + if on_command: + on_command(cmd, i) + + if confirm_each: + response = input(f"Execute '{cmd.command}'? [y/n/s/q]: ") + if response.lower() == "q": + break + elif response.lower() == "s": + result["skipped"] += 1 + result["commands"].append({ + "index": i, + "command": cmd.command, + "status": "skipped", + "reason": "User skipped", + }) + continue + elif response.lower() != "y": + result["skipped"] += 1 + continue + + result["executed"] += 1 + + if dry_run: + result["succeeded"] += 1 + result["commands"].append({ + "index": i, + "command": cmd.command, + "status": "dry_run_success", + }) + delay = 1.0 / speed if speed > 0 else 0 + time.sleep(delay) + else: + try: + exit_code = self._execute_command(cmd) + if exit_code == 0: + result["succeeded"] += 1 + status = "success" + else: + result["failed"] += 1 + status = f"failed_exit_code_{exit_code}" + result["commands"].append({ + "index": i, + "command": cmd.command, + "status": status, + "exit_code": exit_code, + }) + except Exception as e: + result["failed"] += 1 + result["commands"].append({ + "index": i, + "command": cmd.command, + "status": "error", + "error": str(e), + }) + + result["end_time"] = datetime.utcnow().isoformat() + self.db.update_workflow_usage(workflow_id) + + return result + + def _execute_command(self, command: Command) -> int: + try: + result = subprocess.run( + command.command, + shell=True, + capture_output=True, + timeout=300, + ) + return result.returncode + except subprocess.TimeoutExpired: + return -1 + except Exception: + return -2 + + def preview_workflow( + self, workflow_id: int, max_commands: int = 50 + ) -> List[Dict[str, Any]]: + workflow = self.db.get_workflow(workflow_id) + if not workflow: + raise ValueError(f"Workflow {workflow_id} not found") + + preview = [] + for i, cmd in enumerate(workflow.commands[:max_commands]): + preview.append({ + "index": i, + "command": cmd.command, + "working_directory": cmd.working_directory, + "timestamp": cmd.timestamp.isoformat(), + "type": cmd.command_type.value, + }) + + if len(workflow.commands) > max_commands: + preview.append({ + "index": max_commands, + "command": f"... and {len(workflow.commands) - max_commands} more commands", + "working_directory": "", + "timestamp": "", + "type": "info", + }) + + return preview + + def simulate_workflow(self, workflow_id: int) -> Dict[str, Any]: + workflow = self.db.get_workflow(workflow_id) + if not workflow: + raise ValueError(f"Workflow {workflow_id} not found") + + return { + "workflow_id": workflow_id, + "name": workflow.name, + "total_commands": len(workflow.commands), + "estimated_duration_seconds": sum( + cmd.duration_ms or 1000 for cmd in workflow.commands + ) + / 1000.0, + "command_types": list(set(cmd.command_type.value for cmd in workflow.commands)), + "working_directories": list( + set(cmd.working_directory for cmd in workflow.commands) + ), + }