feat: add script generator, history manager, and workflow playback
Some checks failed
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (push) Has been cancelled

This commit is contained in:
2026-01-31 10:25:30 +00:00
parent 4f12b73f45
commit 7b14c30952

175
cli_memory/playback.py Normal file
View File

@@ -0,0 +1,175 @@
import os
import subprocess
import time
import logging
from datetime import datetime
from typing import Optional, List, Dict, Any, Callable
from .config import Config
from .models import Workflow, Command
from .database import Database
logger = logging.getLogger(__name__)
class WorkflowPlayback:
def __init__(self, config: Optional[Config] = None, db: Optional[Database] = None):
self.config = config or Config()
self.db = db or Database()
def playback(
self,
workflow_id: int,
speed: float = 1.0,
confirm_each: bool = False,
dry_run: bool = False,
on_command: Optional[Callable[[Command, int], None]] = None,
) -> Dict[str, Any]:
workflow = self.db.get_workflow(workflow_id)
if not workflow:
raise ValueError(f"Workflow {workflow_id} not found")
result = {
"workflow_id": workflow_id,
"workflow_name": workflow.name,
"total_commands": len(workflow.commands),
"executed": 0,
"succeeded": 0,
"failed": 0,
"skipped": 0,
"start_time": datetime.utcnow().isoformat(),
"end_time": None,
"commands": [],
}
for i, cmd in enumerate(workflow.commands):
if self.config.get("playback.pause_on_error", True) and result["failed"] > 0:
result["skipped"] += 1
result["commands"].append({
"index": i,
"command": cmd.command,
"status": "skipped",
"reason": "Previous command failed",
})
continue
if on_command:
on_command(cmd, i)
if confirm_each:
response = input(f"Execute '{cmd.command}'? [y/n/s/q]: ")
if response.lower() == "q":
break
elif response.lower() == "s":
result["skipped"] += 1
result["commands"].append({
"index": i,
"command": cmd.command,
"status": "skipped",
"reason": "User skipped",
})
continue
elif response.lower() != "y":
result["skipped"] += 1
continue
result["executed"] += 1
if dry_run:
result["succeeded"] += 1
result["commands"].append({
"index": i,
"command": cmd.command,
"status": "dry_run_success",
})
delay = 1.0 / speed if speed > 0 else 0
time.sleep(delay)
else:
try:
exit_code = self._execute_command(cmd)
if exit_code == 0:
result["succeeded"] += 1
status = "success"
else:
result["failed"] += 1
status = f"failed_exit_code_{exit_code}"
result["commands"].append({
"index": i,
"command": cmd.command,
"status": status,
"exit_code": exit_code,
})
except Exception as e:
result["failed"] += 1
result["commands"].append({
"index": i,
"command": cmd.command,
"status": "error",
"error": str(e),
})
result["end_time"] = datetime.utcnow().isoformat()
self.db.update_workflow_usage(workflow_id)
return result
def _execute_command(self, command: Command) -> int:
try:
result = subprocess.run(
command.command,
shell=True,
capture_output=True,
timeout=300,
)
return result.returncode
except subprocess.TimeoutExpired:
return -1
except Exception:
return -2
def preview_workflow(
self, workflow_id: int, max_commands: int = 50
) -> List[Dict[str, Any]]:
workflow = self.db.get_workflow(workflow_id)
if not workflow:
raise ValueError(f"Workflow {workflow_id} not found")
preview = []
for i, cmd in enumerate(workflow.commands[:max_commands]):
preview.append({
"index": i,
"command": cmd.command,
"working_directory": cmd.working_directory,
"timestamp": cmd.timestamp.isoformat(),
"type": cmd.command_type.value,
})
if len(workflow.commands) > max_commands:
preview.append({
"index": max_commands,
"command": f"... and {len(workflow.commands) - max_commands} more commands",
"working_directory": "",
"timestamp": "",
"type": "info",
})
return preview
def simulate_workflow(self, workflow_id: int) -> Dict[str, Any]:
workflow = self.db.get_workflow(workflow_id)
if not workflow:
raise ValueError(f"Workflow {workflow_id} not found")
return {
"workflow_id": workflow_id,
"name": workflow.name,
"total_commands": len(workflow.commands),
"estimated_duration_seconds": sum(
cmd.duration_ms or 1000 for cmd in workflow.commands
)
/ 1000.0,
"command_types": list(set(cmd.command_type.value for cmd in workflow.commands)),
"working_directories": list(
set(cmd.working_directory for cmd in workflow.commands)
),
}