"""Record command module.""" import json from typing import Dict, List, Optional import click from rich.console import Console from rich.progress import BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn from rich.table import Table from api_snapshot.recorder.recorder import RecordingSession, RequestResponsePair from api_snapshot.snapshot.manager import SnapshotManager console = Console() def parse_headers(header_str: str) -> Dict[str, str]: """Parse headers from a string format.""" headers: Dict[str, str] = {} if not header_str: return headers for pair in header_str.split(","): if ":" in pair: key, value = pair.split(":", 1) headers[key.strip()] = value.strip() return headers def show_recording_preview(recordings: List[RequestResponsePair]) -> None: """Show a preview of recorded requests.""" if not recordings: console.print("[yellow]No requests recorded[/yellow]") return table = Table(title="Recording Preview") table.add_column("#", style="dim") table.add_column("Method", style="cyan") table.add_column("URL", style="green") table.add_column("Status", style="magenta") table.add_column("Latency", style="blue") for i, pair in enumerate(recordings, 1): url_len = len(pair.request.url) url_display = pair.request.url[:60] + "..." if url_len > 60 else pair.request.url table.add_row( str(i), pair.request.method, url_display, str(pair.response.status_code), f"{pair.response.latency_ms}ms" ) console.print(table) @click.command(name="record") @click.argument("url") @click.option("--name", "-n", required=True, help="Name for the snapshot") @click.option("--method", "-m", default="GET", help="HTTP method") @click.option("--headers", "-H", help="Request headers (comma-separated key:value pairs)") @click.option("--body", "-d", help="Request body (string or @file:path)") @click.option("--output-dir", default=None, help="Override snapshot directory") @click.option("--description", "-D", default="", help="Snapshot description") @click.option("--tag", multiple=True, help="Tags for the snapshot") @click.option("--interactive", "-i", is_flag=True, help="Interactive recording mode") @click.option("--count", "-c", default=1, help="Number of requests to make", type=int) @click.option("--delay", default=0, help="Delay between requests in seconds", type=float) @click.pass_context def record_command( ctx: click.Context, url: str, name: str, method: str, headers: Optional[str], body: Optional[str], output_dir: Optional[str], description: str, tag: tuple, interactive: bool, count: int, delay: float ) -> None: """Record HTTP API traffic and save as a snapshot. URL is the endpoint to record. Use --name to specify the snapshot name. Examples: api-snapshot record https://api.example.com/users --name my-api api-snapshot record https://api.example.com/data -m POST -d '{"key":"value"}' -n my-post """ snapshot_dir = output_dir or ctx.obj.get("snapshot_dir", "./snapshots") verbose = ctx.obj.get("verbose", False) manager = SnapshotManager(snapshot_dir) if manager.snapshot_exists(name): if not click.confirm(f"Snapshot '{name}' already exists. Overwrite?"): console.print("[yellow]Cancelled[/yellow]") return if body and body.startswith("@file:"): file_path = body[5:] try: with open(file_path, "r", encoding="utf-8") as f: body = f.read() except Exception as e: console.print(f"[red]Error reading body file: {e}[/red]") raise click.Abort() parsed_headers = parse_headers(headers) if headers else {} recordings: List[RequestResponsePair] = [] if interactive: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TaskProgressColumn(), console=console ) as progress: task = progress.add_task(f"Recording {count} request(s)...", total=count) def on_record(pair: RequestResponsePair) -> None: recordings.append(pair) if verbose: msg = f" [cyan]{pair.request.method}[/cyan] {pair.request.url}" msg += f" -> {pair.response.status_code}" console.print(msg) session = RecordingSession(on_request=on_record) for i in range(count): progress.update(task, advance=1, description=f"Request {i+1}/{count}") try: session.record_request( method=method.upper(), url=url, headers=parsed_headers, body=body ) except Exception as e: console.print(f"[red]Request failed: {e}[/red]") if i < count - 1 and delay > 0: import time time.sleep(delay) progress.update(task, completed=True) else: def on_record(pair: RequestResponsePair) -> None: recordings.append(pair) session = RecordingSession(on_request=on_record) for i in range(count): try: session.record_request( method=method.upper(), url=url, headers=parsed_headers, body=body ) except Exception as e: console.print(f"[red]Request failed: {e}[/red]") if i < count - 1 and delay > 0: import time time.sleep(delay) if not recordings: console.print("[red]No requests were recorded[/red]") raise click.Abort() try: path = manager.save_snapshot( name=name, requests=recordings, description=description, source_url=url, tags=list(tag) if tag else [] ) console.print(f"[green]Snapshot saved: {path}[/green]") console.print(f"[green]Recorded {len(recordings)} request(s)[/green]") if verbose or interactive: show_recording_preview(recordings) except Exception as e: console.print(f"[red]Error saving snapshot: {e}[/red]") raise click.Abort() @click.command(name="record-multi") @click.argument("config_file", type=click.File()) @click.option("--name", "-n", required=True, help="Name for the snapshot") @click.option("--output-dir", default=None, help="Override snapshot directory") @click.option("--description", "-D", default="", help="Snapshot description") @click.option("--base-url", "-b", help="Base URL for relative URLs") @click.pass_context def record_multi_command( ctx: click.Context, config_file: click.File, name: str, output_dir: Optional[str], description: str, base_url: Optional[str] ) -> None: """Record multiple requests from a JSON config file. CONFIG_FILE should contain a JSON array of request objects with: - method: HTTP method - url: Request URL - headers: Optional headers object - body: Optional request body Example config: [ {"method": "GET", "url": "/users"}, {"method": "POST", "url": "/users", "body": {"name": "test"}} ] """ snapshot_dir = output_dir or ctx.obj.get("snapshot_dir", "./snapshots") verbose = ctx.obj.get("verbose", False) try: config_content = config_file.read() # type: ignore[attr-defined] config = json.loads(config_content) except json.JSONDecodeError as e: console.print(f"[red]Invalid JSON in config file: {e}[/red]") raise click.Abort() if not isinstance(config, list): console.print("[red]Config file must contain a JSON array[/red]") raise click.Abort() manager = SnapshotManager(snapshot_dir) if manager.snapshot_exists(name): if not click.confirm(f"Snapshot '{name}' already exists. Overwrite?"): console.print("[yellow]Cancelled[/yellow]") return recordings: List[RequestResponsePair] = [] def on_record(pair: RequestResponsePair) -> None: recordings.append(pair) if verbose: msg = f" [cyan]{pair.request.method}[/cyan] {pair.request.url}" msg += f" -> {pair.response.status_code}" console.print(msg) console.print(f"[cyan]Recording {len(config)} request(s)...[/cyan]") from api_snapshot.recorder.recorder import record_multiple recordings = record_multiple( requests_config=config, base_url=base_url, on_record=on_record ) if not recordings: console.print("[red]No requests were recorded[/red]") raise click.Abort() try: path = manager.save_snapshot( name=name, requests=recordings, description=description, source_url=base_url, tags=[] ) console.print(f"[green]Snapshot saved: {path}[/green]") console.print(f"[green]Recorded {len(recordings)} request(s)[/green]") if verbose: show_recording_preview(recordings) except Exception as e: console.print(f"[red]Error saving snapshot: {e}[/red]") raise click.Abort()