import os import logging from datetime import datetime from typing import Optional, List, Dict, Any from .config import Config from .models import Workflow, Command from .database import Database logger = logging.getLogger(__name__) class ScriptGenerator: def __init__(self, config: Optional[Config] = None, db: Optional[Database] = None): self.config = config or Config() self.db = db or Database() def generate_script( self, workflow: Workflow, name: Optional[str] = None, output_dir: Optional[str] = None, include_error_handling: bool = True, include_logging: bool = True, ) -> str: output_dir = output_dir or self.config.get("script.output_dir", "~/.cli_memory/scripts") output_dir = os.path.expanduser(output_dir) os.makedirs(output_dir, exist_ok=True) script_name = name or workflow.name or f"workflow_{workflow.id}" safe_name = "".join(c if c.isalnum() or c in "_-" else "_" for c in script_name) script_path = os.path.join(output_dir, f"{safe_name}.sh") script_content = self._build_script_content( workflow, include_error_handling=include_error_handling, include_logging=include_logging, ) with open(script_path, "w") as f: f.write(script_content) os.chmod(script_path, 0o755) logger.info(f"Generated script: {script_path}") return script_path def _build_script_content( self, workflow: Workflow, include_error_handling: bool = True, include_logging: bool = True, ) -> str: lines = ["#!/bin/bash", ""] if include_logging: lines.extend([ f"# Generated by CLI Command Memory on {datetime.utcnow().isoformat()}", f"# Workflow: {workflow.name}", f"# Description: {workflow.description or 'No description'}", "", "set -e", "", "SCRIPT_DIR=\"$(cd \"$(dirname \"${BASH_SOURCE[0]}\")\" && pwd)\"", "LOG_FILE=\"${SCRIPT_DIR}/logs/$(date +%Y%m%d_%H%M%S).log\"", "", "log() {", ' echo "[$(date +%Y-%m-%d\ %H:%M:%S)] $1" | tee -a "$LOG_FILE"', "}", "", ]) else: lines.extend(["#!/bin/bash", "set -e", ""]) lines.append("# Workflow commands") lines.append("# Total commands: " + str(len(workflow.commands))) lines.append("") for i, cmd in enumerate(workflow.commands, 1): comment = f"# Command {i}: {cmd.command[:50]}..." if len(cmd.command) > 50 else f"# Command {i}: {cmd.command}" lines.append(comment) if include_error_handling: lines.append(f"log 'Executing: {cmd.command.replace(chr(39), chr(39)+chr(92)+chr(39))}'") lines.append(f"{cmd.command} || {{ log 'ERROR: Command failed: {cmd.command}'; exit 1; }}") else: lines.append(cmd.command) lines.append("") if include_logging: lines.extend([ "log 'Workflow completed successfully'", "exit 0", ]) return "\n".join(lines) def generate_from_commands( self, commands: List[Command], name: str, output_dir: Optional[str] = None, ) -> str: workflow = Workflow( name=name, description="Generated from command history", commands=commands, ) return self.generate_script(workflow, name, output_dir) def list_generated_scripts(self, output_dir: Optional[str] = None) -> List[Dict[str, Any]]: output_dir = output_dir or self.config.get("script.output_dir", "~/.cli_memory/scripts") output_dir = os.path.expanduser(output_dir) if not os.path.exists(output_dir): return [] scripts = [] for filename in os.listdir(output_dir): if filename.endswith(".sh"): filepath = os.path.join(output_dir, filename) stat = os.stat(filepath) scripts.append({ "name": filename, "path": filepath, "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "executable": os.access(filepath, os.X_OK), }) return sorted(scripts, key=lambda s: s["modified"], reverse=True)