import click from rich import print as rprint from rich.panel import Panel from rich.table import Table from ..config import Config from ..database import Database from ..generator import ScriptGenerator from ..playback import WorkflowPlayback @click.group() def workflow(): """Manage workflows.""" pass @workflow.command() def list(): """List all workflows.""" db = Database() workflows = db.get_all_workflows() db.close() if not workflows: rprint("[yellow]No workflows recorded yet[/yellow]") return table = Table(title="Workflows") table.add_column("ID", style="cyan") table.add_column("Name", style="magenta") table.add_column("Commands", style="green") table.add_column("Usage", style="yellow") for w in workflows: table.add_row(str(w.id), w.name, str(len(w.commands)), str(w.usage_count)) rprint(Panel(table, title=f"{len(workflows)} Workflows")) @workflow.command() @click.argument("workflow_id", type=int) def show(workflow_id): """Show workflow details.""" db = Database() workflow = db.get_workflow(workflow_id) db.close() if not workflow: rprint(f"[yellow]Workflow {workflow_id} not found[/yellow]") return table = Table(title=f"Workflow: {workflow.name}") table.add_column("Property", style="cyan") table.add_column("Value", style="magenta") table.add_row("ID", str(workflow.id)) table.add_row("Description", workflow.description or "N/A") table.add_row("Commands", str(len(workflow.commands))) table.add_row("Usage Count", str(workflow.usage_count)) rprint(Panel(table, title="Workflow Details")) @workflow.command() @click.argument("workflow_id", type=int) @click.option("--speed", "-s", default=1.0, help="Playback speed multiplier") @click.option("--dry-run", "-n", is_flag=True, help="Dry run (no execution)") def play(workflow_id, speed, dry_run): """Playback a workflow.""" db = Database() playback = WorkflowPlayback(db=db) try: result = playback.playback( workflow_id, speed=speed, dry_run=dry_run, ) rprint("[green]Workflow playback completed[/green]") rprint(f" Executed: {result['executed']}") rprint(f" Succeeded: {result['succeeded']}") rprint(f" Failed: {result['failed']}") except ValueError as e: rprint(f"[yellow]{str(e)}[/yellow]") db.close() @workflow.command() @click.argument("workflow_id", type=int) def preview(workflow_id): """Preview workflow commands.""" db = Database() playback = WorkflowPlayback(db=db) try: preview = playback.preview_workflow(workflow_id) table = Table(title=f"Workflow Preview") table.add_column("#", style="cyan") table.add_column("Command", style="magenta") for item in preview: table.add_row(str(item["index"]), item["command"]) rprint(Panel(table, title="Preview")) except ValueError as e: rprint(f"[yellow]{str(e)}[/yellow]") db.close() @workflow.command() @click.argument("workflow_id", type=int) @click.option("--name", "-n", help="Script name") @click.option("--output", "-o", help="Output directory") def generate(workflow_id, name, output): """Generate script from workflow.""" db = Database() generator = ScriptGenerator(db=db) workflow = db.get_workflow(workflow_id) if not workflow: rprint(f"[yellow]Workflow {workflow_id} not found[/yellow]") db.close() return script_path = generator.generate_script( workflow, name=name, output_dir=output, ) db.close() rprint(f"[green]Script generated: {script_path}[/green]")