feat: add script generator, history manager, and workflow playback
This commit is contained in:
131
cli_memory/generator.py
Normal file
131
cli_memory/generator.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import os
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional, List, Dict, Any
|
||||
|
||||
from .config import Config
|
||||
from .models import Workflow, Command
|
||||
from .database import Database
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ScriptGenerator:
|
||||
def __init__(self, config: Optional[Config] = None, db: Optional[Database] = None):
|
||||
self.config = config or Config()
|
||||
self.db = db or Database()
|
||||
|
||||
def generate_script(
|
||||
self,
|
||||
workflow: Workflow,
|
||||
name: Optional[str] = None,
|
||||
output_dir: Optional[str] = None,
|
||||
include_error_handling: bool = True,
|
||||
include_logging: bool = True,
|
||||
) -> str:
|
||||
output_dir = output_dir or self.config.get("script.output_dir", "~/.cli_memory/scripts")
|
||||
output_dir = os.path.expanduser(output_dir)
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
script_name = name or workflow.name or f"workflow_{workflow.id}"
|
||||
safe_name = "".join(c if c.isalnum() or c in "_-" else "_" for c in script_name)
|
||||
script_path = os.path.join(output_dir, f"{safe_name}.sh")
|
||||
|
||||
script_content = self._build_script_content(
|
||||
workflow,
|
||||
include_error_handling=include_error_handling,
|
||||
include_logging=include_logging,
|
||||
)
|
||||
|
||||
with open(script_path, "w") as f:
|
||||
f.write(script_content)
|
||||
|
||||
os.chmod(script_path, 0o755)
|
||||
|
||||
logger.info(f"Generated script: {script_path}")
|
||||
return script_path
|
||||
|
||||
def _build_script_content(
|
||||
self,
|
||||
workflow: Workflow,
|
||||
include_error_handling: bool = True,
|
||||
include_logging: bool = True,
|
||||
) -> str:
|
||||
lines = ["#!/bin/bash", ""]
|
||||
|
||||
if include_logging:
|
||||
lines.extend([
|
||||
f"# Generated by CLI Command Memory on {datetime.utcnow().isoformat()}",
|
||||
f"# Workflow: {workflow.name}",
|
||||
f"# Description: {workflow.description or 'No description'}",
|
||||
"",
|
||||
"set -e",
|
||||
"",
|
||||
"SCRIPT_DIR=\"$(cd \"$(dirname \"${BASH_SOURCE[0]}\")\" && pwd)\"",
|
||||
"LOG_FILE=\"${SCRIPT_DIR}/logs/$(date +%Y%m%d_%H%M%S).log\"",
|
||||
"",
|
||||
"log() {",
|
||||
' echo "[$(date +%Y-%m-%d\\ %H:%M:%S)] $1\" | tee -a "$LOG_FILE"',
|
||||
"}",
|
||||
"",
|
||||
])
|
||||
else:
|
||||
lines.extend(["#!/bin/bash", "set -e", ""])
|
||||
|
||||
lines.append("# Workflow commands")
|
||||
lines.append("# Total commands: " + str(len(workflow.commands)))
|
||||
lines.append("")
|
||||
|
||||
for i, cmd in enumerate(workflow.commands, 1):
|
||||
comment = f"# Command {i}: {cmd.command[:50]}..." if len(cmd.command) > 50 else f"# Command {i}: {cmd.command}"
|
||||
lines.append(comment)
|
||||
|
||||
if include_error_handling:
|
||||
lines.append(f"log 'Executing: {cmd.command.replace(chr(39), chr(39)+chr(92)+chr(39))}'")
|
||||
lines.append(f"{cmd.command} || {{ log 'ERROR: Command failed: {cmd.command}'; exit 1; }}")
|
||||
else:
|
||||
lines.append(cmd.command)
|
||||
lines.append("")
|
||||
|
||||
if include_logging:
|
||||
lines.extend([
|
||||
"log 'Workflow completed successfully'",
|
||||
"exit 0",
|
||||
])
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def generate_from_commands(
|
||||
self,
|
||||
commands: List[Command],
|
||||
name: str,
|
||||
output_dir: Optional[str] = None,
|
||||
) -> str:
|
||||
workflow = Workflow(
|
||||
name=name,
|
||||
description="Generated from command history",
|
||||
commands=commands,
|
||||
)
|
||||
return self.generate_script(workflow, name, output_dir)
|
||||
|
||||
def list_generated_scripts(self, output_dir: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
output_dir = output_dir or self.config.get("script.output_dir", "~/.cli_memory/scripts")
|
||||
output_dir = os.path.expanduser(output_dir)
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
return []
|
||||
|
||||
scripts = []
|
||||
for filename in os.listdir(output_dir):
|
||||
if filename.endswith(".sh"):
|
||||
filepath = os.path.join(output_dir, filename)
|
||||
stat = os.stat(filepath)
|
||||
scripts.append({
|
||||
"name": filename,
|
||||
"path": filepath,
|
||||
"size": stat.st_size,
|
||||
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
||||
"executable": os.access(filepath, os.X_OK),
|
||||
})
|
||||
|
||||
return sorted(scripts, key=lambda s: s["modified"], reverse=True)
|
||||
Reference in New Issue
Block a user