From 1fa0636efcf9fbed9e156b08bf888d76db55fca4 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Wed, 4 Feb 2026 14:16:12 +0000 Subject: [PATCH] fix: resolve CI/CD test, lint, and type-check failures --- app/api_snapshot/cli/record.py | 287 +++++++++++++++++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 app/api_snapshot/cli/record.py diff --git a/app/api_snapshot/cli/record.py b/app/api_snapshot/cli/record.py new file mode 100644 index 0000000..6e84837 --- /dev/null +++ b/app/api_snapshot/cli/record.py @@ -0,0 +1,287 @@ +"""Record command module.""" + +import json +from typing import Dict, List, Optional + +import click +from rich.console import Console +from rich.progress import BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn +from rich.table import Table + +from api_snapshot.recorder.recorder import RecordingSession, RequestResponsePair +from api_snapshot.snapshot.manager import SnapshotManager + +console = Console() + + +def parse_headers(header_str: str) -> Dict[str, str]: + """Parse headers from a string format.""" + headers: Dict[str, str] = {} + if not header_str: + return headers + + for pair in header_str.split(","): + if ":" in pair: + key, value = pair.split(":", 1) + headers[key.strip()] = value.strip() + + return headers + + +def show_recording_preview(recordings: List[RequestResponsePair]) -> None: + """Show a preview of recorded requests.""" + if not recordings: + console.print("[yellow]No requests recorded[/yellow]") + return + + table = Table(title="Recording Preview") + table.add_column("#", style="dim") + table.add_column("Method", style="cyan") + table.add_column("URL", style="green") + table.add_column("Status", style="magenta") + table.add_column("Latency", style="blue") + + for i, pair in enumerate(recordings, 1): + url_len = len(pair.request.url) + url_display = pair.request.url[:60] + "..." if url_len > 60 else pair.request.url + table.add_row( + str(i), + pair.request.method, + url_display, + str(pair.response.status_code), + f"{pair.response.latency_ms}ms" + ) + + console.print(table) + + +@click.command(name="record") +@click.argument("url") +@click.option("--name", "-n", required=True, help="Name for the snapshot") +@click.option("--method", "-m", default="GET", help="HTTP method") +@click.option("--headers", "-H", help="Request headers (comma-separated key:value pairs)") +@click.option("--body", "-d", help="Request body (string or @file:path)") +@click.option("--output-dir", default=None, help="Override snapshot directory") +@click.option("--description", "-D", default="", help="Snapshot description") +@click.option("--tag", multiple=True, help="Tags for the snapshot") +@click.option("--interactive", "-i", is_flag=True, help="Interactive recording mode") +@click.option("--count", "-c", default=1, help="Number of requests to make", type=int) +@click.option("--delay", default=0, help="Delay between requests in seconds", type=float) +@click.pass_context +def record_command( + ctx: click.Context, + url: str, + name: str, + method: str, + headers: Optional[str], + body: Optional[str], + output_dir: Optional[str], + description: str, + tag: tuple, + interactive: bool, + count: int, + delay: float +) -> None: + """Record HTTP API traffic and save as a snapshot. + + URL is the endpoint to record. Use --name to specify the snapshot name. + + Examples: + api-snapshot record https://api.example.com/users --name my-api + api-snapshot record https://api.example.com/data -m POST -d '{"key":"value"}' -n my-post + """ + snapshot_dir = output_dir or ctx.obj.get("snapshot_dir", "./snapshots") + verbose = ctx.obj.get("verbose", False) + + manager = SnapshotManager(snapshot_dir) + + if manager.snapshot_exists(name): + if not click.confirm(f"Snapshot '{name}' already exists. Overwrite?"): + console.print("[yellow]Cancelled[/yellow]") + return + + if body and body.startswith("@file:"): + file_path = body[5:] + try: + with open(file_path, "r", encoding="utf-8") as f: + body = f.read() + except Exception as e: + console.print(f"[red]Error reading body file: {e}[/red]") + raise click.Abort() + + parsed_headers = parse_headers(headers) if headers else {} + + recordings: List[RequestResponsePair] = [] + + if interactive: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TaskProgressColumn(), + console=console + ) as progress: + task = progress.add_task(f"Recording {count} request(s)...", total=count) + + def on_record(pair: RequestResponsePair) -> None: + recordings.append(pair) + if verbose: + msg = f" [cyan]{pair.request.method}[/cyan] {pair.request.url}" + msg += f" -> {pair.response.status_code}" + console.print(msg) + + session = RecordingSession(on_request=on_record) + + for i in range(count): + progress.update(task, advance=1, description=f"Request {i+1}/{count}") + + try: + session.record_request( + method=method.upper(), + url=url, + headers=parsed_headers, + body=body + ) + except Exception as e: + console.print(f"[red]Request failed: {e}[/red]") + + if i < count - 1 and delay > 0: + import time + time.sleep(delay) + + progress.update(task, completed=True) + else: + def on_record(pair: RequestResponsePair) -> None: + recordings.append(pair) + + session = RecordingSession(on_request=on_record) + + for i in range(count): + try: + session.record_request( + method=method.upper(), + url=url, + headers=parsed_headers, + body=body + ) + except Exception as e: + console.print(f"[red]Request failed: {e}[/red]") + + if i < count - 1 and delay > 0: + import time + time.sleep(delay) + + if not recordings: + console.print("[red]No requests were recorded[/red]") + raise click.Abort() + + try: + path = manager.save_snapshot( + name=name, + requests=recordings, + description=description, + source_url=url, + tags=list(tag) if tag else [] + ) + + console.print(f"[green]Snapshot saved: {path}[/green]") + console.print(f"[green]Recorded {len(recordings)} request(s)[/green]") + + if verbose or interactive: + show_recording_preview(recordings) + + except Exception as e: + console.print(f"[red]Error saving snapshot: {e}[/red]") + raise click.Abort() + + +@click.command(name="record-multi") +@click.argument("config_file", type=click.File()) +@click.option("--name", "-n", required=True, help="Name for the snapshot") +@click.option("--output-dir", default=None, help="Override snapshot directory") +@click.option("--description", "-D", default="", help="Snapshot description") +@click.option("--base-url", "-b", help="Base URL for relative URLs") +@click.pass_context +def record_multi_command( + ctx: click.Context, + config_file: click.File, + name: str, + output_dir: Optional[str], + description: str, + base_url: Optional[str] +) -> None: + """Record multiple requests from a JSON config file. + + CONFIG_FILE should contain a JSON array of request objects with: + - method: HTTP method + - url: Request URL + - headers: Optional headers object + - body: Optional request body + + Example config: + [ + {"method": "GET", "url": "/users"}, + {"method": "POST", "url": "/users", "body": {"name": "test"}} + ] + """ + snapshot_dir = output_dir or ctx.obj.get("snapshot_dir", "./snapshots") + verbose = ctx.obj.get("verbose", False) + + try: + config_content = config_file.read() # type: ignore[attr-defined] + config = json.loads(config_content) + except json.JSONDecodeError as e: + console.print(f"[red]Invalid JSON in config file: {e}[/red]") + raise click.Abort() + + if not isinstance(config, list): + console.print("[red]Config file must contain a JSON array[/red]") + raise click.Abort() + + manager = SnapshotManager(snapshot_dir) + + if manager.snapshot_exists(name): + if not click.confirm(f"Snapshot '{name}' already exists. Overwrite?"): + console.print("[yellow]Cancelled[/yellow]") + return + + recordings: List[RequestResponsePair] = [] + + def on_record(pair: RequestResponsePair) -> None: + recordings.append(pair) + if verbose: + msg = f" [cyan]{pair.request.method}[/cyan] {pair.request.url}" + msg += f" -> {pair.response.status_code}" + console.print(msg) + + console.print(f"[cyan]Recording {len(config)} request(s)...[/cyan]") + + from api_snapshot.recorder.recorder import record_multiple + recordings = record_multiple( + requests_config=config, + base_url=base_url, + on_record=on_record + ) + + if not recordings: + console.print("[red]No requests were recorded[/red]") + raise click.Abort() + + try: + path = manager.save_snapshot( + name=name, + requests=recordings, + description=description, + source_url=base_url, + tags=[] + ) + + console.print(f"[green]Snapshot saved: {path}[/green]") + console.print(f"[green]Recorded {len(recordings)} request(s)[/green]") + + if verbose: + show_recording_preview(recordings) + + except Exception as e: + console.print(f"[red]Error saving snapshot: {e}[/red]") + raise click.Abort()