import os import subprocess import time import logging from datetime import datetime from typing import Optional, List, Dict, Any, Callable from .config import Config from .models import Workflow, Command from .database import Database logger = logging.getLogger(__name__) class WorkflowPlayback: def __init__(self, config: Optional[Config] = None, db: Optional[Database] = None): self.config = config or Config() self.db = db or Database() def playback( self, workflow_id: int, speed: float = 1.0, confirm_each: bool = False, dry_run: bool = False, on_command: Optional[Callable[[Command, int], None]] = None, ) -> Dict[str, Any]: workflow = self.db.get_workflow(workflow_id) if not workflow: raise ValueError(f"Workflow {workflow_id} not found") result = { "workflow_id": workflow_id, "workflow_name": workflow.name, "total_commands": len(workflow.commands), "executed": 0, "succeeded": 0, "failed": 0, "skipped": 0, "start_time": datetime.utcnow().isoformat(), "end_time": None, "commands": [], } for i, cmd in enumerate(workflow.commands): if self.config.get("playback.pause_on_error", True) and result["failed"] > 0: result["skipped"] += 1 result["commands"].append({ "index": i, "command": cmd.command, "status": "skipped", "reason": "Previous command failed", }) continue if on_command: on_command(cmd, i) if confirm_each: response = input(f"Execute '{cmd.command}'? [y/n/s/q]: ") if response.lower() == "q": break elif response.lower() == "s": result["skipped"] += 1 result["commands"].append({ "index": i, "command": cmd.command, "status": "skipped", "reason": "User skipped", }) continue elif response.lower() != "y": result["skipped"] += 1 continue result["executed"] += 1 if dry_run: result["succeeded"] += 1 result["commands"].append({ "index": i, "command": cmd.command, "status": "dry_run_success", }) delay = 1.0 / speed if speed > 0 else 0 time.sleep(delay) else: try: exit_code = self._execute_command(cmd) if exit_code == 0: result["succeeded"] += 1 status = "success" else: result["failed"] += 1 status = f"failed_exit_code_{exit_code}" result["commands"].append({ "index": i, "command": cmd.command, "status": status, "exit_code": exit_code, }) except Exception as e: result["failed"] += 1 result["commands"].append({ "index": i, "command": cmd.command, "status": "error", "error": str(e), }) result["end_time"] = datetime.utcnow().isoformat() self.db.update_workflow_usage(workflow_id) return result def _execute_command(self, command: Command) -> int: try: result = subprocess.run( command.command, shell=True, capture_output=True, timeout=300, ) return result.returncode except subprocess.TimeoutExpired: return -1 except Exception: return -2 def preview_workflow( self, workflow_id: int, max_commands: int = 50 ) -> List[Dict[str, Any]]: workflow = self.db.get_workflow(workflow_id) if not workflow: raise ValueError(f"Workflow {workflow_id} not found") preview = [] for i, cmd in enumerate(workflow.commands[:max_commands]): preview.append({ "index": i, "command": cmd.command, "working_directory": cmd.working_directory, "timestamp": cmd.timestamp.isoformat(), "type": cmd.command_type.value, }) if len(workflow.commands) > max_commands: preview.append({ "index": max_commands, "command": f"... and {len(workflow.commands) - max_commands} more commands", "working_directory": "", "timestamp": "", "type": "info", }) return preview def simulate_workflow(self, workflow_id: int) -> Dict[str, Any]: workflow = self.db.get_workflow(workflow_id) if not workflow: raise ValueError(f"Workflow {workflow_id} not found") return { "workflow_id": workflow_id, "name": workflow.name, "total_commands": len(workflow.commands), "estimated_duration_seconds": sum( cmd.duration_ms or 1000 for cmd in workflow.commands ) / 1000.0, "command_types": list(set(cmd.command_type.value for cmd in workflow.commands)), "working_directories": list( set(cmd.working_directory for cmd in workflow.commands) ), }