"""CLI commands for HTTP Log Explorer.""" import sys import click from rich.console import Console from http_log_explorer.analyzers import DiffEngine, StatsGenerator, TrafficAnalyzer from http_log_explorer.cli.formatter import Formatter from http_log_explorer.exporters import CodeExporter, CurlExporter, JSONExporter from http_log_explorer.generators import OpenAPIGenerator from http_log_explorer.models import FilterCriteria, HTTPEntry from http_log_explorer.parsers import get_parser console = Console() formatter = Formatter() _entries_store: list[HTTPEntry] = [] def reset_entries() -> None: """Reset the global entries store. Used for testing.""" global _entries_store _entries_store = [] @click.group() @click.version_option(version="0.1.0") def cli() -> None: """HTTP Log Explorer - Parse, analyze, and explore HTTP traffic logs.""" pass @cli.command() @click.argument("file", type=click.Path(exists=True)) @click.option("--stats", is_flag=True, help="Show statistics after loading") def load(file: str, stats: bool) -> None: """Load and parse an HTTP log file. Supports HAR files, curl -v output, and Chrome DevTools exports. """ global _entries_store try: with open(file, encoding="utf-8", errors="replace") as f: content = f.read() except Exception as e: console.print(f"[red]Error reading file: {e}[/red]") sys.exit(1) if not content.strip(): console.print("[red]Error: File is empty[/red]") sys.exit(1) try: parser = get_parser(content) console.print(f"[green]Using parser: {parser.get_parser_name()}[/green]") entries = parser.parse(content, source_file=file) except ValueError as e: console.print(f"[red]Parse error: {e}[/red]") console.print("[yellow]Supported formats:[/yellow]") console.print(" - HAR files (HTTP Archive format)") console.print(" - curl -v output") console.print(" - Chrome DevTools network exports") sys.exit(1) _entries_store = entries console.print(f"[green]Loaded {len(entries)} entries[/green]") if stats and entries: _show_stats(entries) @cli.command() @click.option("--limit", type=int, default=50, help="Limit number of entries shown") @click.option("--method", multiple=True, help="Filter by method (e.g., GET, POST)") @click.option("--status", multiple=True, type=int, help="Filter by status code") @click.option("--url", help="Filter by URL pattern (regex)") @click.option("--content-type", multiple=True, help="Filter by content type") def list_entries( limit: int, method: tuple[str, ...], status: tuple[int, ...], url: str | None, content_type: tuple[str, ...], ) -> None: """List loaded HTTP entries with optional filtering.""" global _entries_store if not _entries_store: console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]") return entries = list(_entries_store) criteria = FilterCriteria( methods=list(method) if method else None, status_codes=list(status) if status else None, url_pattern=url, content_types=list(content_type) if content_type else None, ) analyzer = TrafficAnalyzer(entries) filtered = analyzer.filter(criteria) table = formatter.format_entry_table(filtered, limit=limit) console.print(table) console.print(f"\n[dim]Showing {min(limit, len(filtered))} of {len(filtered)} entries[/dim]") @cli.command() @click.argument("query") @click.option("--case-sensitive", is_flag=True, help="Case sensitive search") def search(query: str, case_sensitive: bool) -> None: """Search across URLs and bodies.""" global _entries_store if not _entries_store: console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]") return analyzer = TrafficAnalyzer(_entries_store) results = analyzer.search(query, case_sensitive=case_sensitive) table = formatter.format_entry_table(results, limit=50) console.print(table) console.print(f"\n[dim]Found {len(results)} matching entries[/dim]") @cli.command() @click.argument("entry_id1") @click.argument("entry_id2") def diff(entry_id1: str, entry_id2: str) -> None: """Compare two HTTP entries by ID.""" global _entries_store if not _entries_store: console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]") return analyzer = TrafficAnalyzer(_entries_store) entry1 = analyzer.get_entry_by_id(entry_id1) entry2 = analyzer.get_entry_by_id(entry_id2) if not entry1: console.print(f"[red]Entry not found: {entry_id1}[/red]") return if not entry2: console.print(f"[red]Entry not found: {entry_id2}[/red]") return engine = DiffEngine() diff_result = engine.diff(entry1, entry2) diff_output = engine.unified_diff_output(diff_result) console.print(diff_output) @cli.command() def stats() -> None: """Show statistics for loaded entries.""" global _entries_store if not _entries_store: console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]") return _show_stats(_entries_store) def _show_stats(entries: list[HTTPEntry]) -> None: """Show statistics for entries.""" generator = StatsGenerator(entries) stats_data = generator.to_dict() console.print("\n[bold cyan]Traffic Statistics[/bold cyan]") console.print(f"Total Requests: {stats_data['total_requests']}") console.print("\n[bold]Method Distribution[/bold]") for method, count in sorted(stats_data["method_distribution"].items()): console.print(f" {method}: {count}") console.print("\n[bold]Status Code Breakdown[/bold]") for status, count in sorted(stats_data["status_breakdown"].items()): console.print(f" {status}: {count}") console.print("\n[bold]Top Endpoints[/bold]") for endpoint, count in list(stats_data["endpoint_count"].items())[:10]: console.print(f" {endpoint}: {count}") rt = stats_data.get("response_time_stats", {}) if rt.get("avg", 0) > 0: console.print("\n[bold]Response Times[/bold]") console.print(f" Min: {rt.get('min', 0):.2f}ms") console.print(f" Max: {rt.get('max', 0):.2f}ms") console.print(f" Avg: {rt.get('avg', 0):.2f}ms") console.print(f" Median: {rt.get('median', 0):.2f}ms") console.print(f" P95: {rt.get('p95', 0):.2f}ms") console.print(f" P99: {rt.get('p99', 0):.2f}ms") @cli.command("export-json") @click.argument("output", type=click.Path()) @click.option("--compact", is_flag=True, help="Export compact JSON") @click.option("--summary", is_flag=True, help="Export summary only") def export_json(output: str, compact: bool, summary: bool) -> None: """Export entries to JSON file.""" global _entries_store if not _entries_store: console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]") return exporter = JSONExporter() try: if summary: content = exporter.export_summary(_entries_store) elif compact: content = exporter.export_compact(_entries_store) else: content = exporter.export(_entries_store) with open(output, "w") as f: f.write(content) console.print(f"[green]Exported to {output}[/green]") except Exception as e: console.print(f"[red]Export error: {e}[/red]") @cli.command("export-curl") @click.argument("output", type=click.Path()) def export_curl(output: str) -> None: """Export entries as cURL commands.""" global _entries_store if not _entries_store: console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]") return exporter = CurlExporter() try: exporter.to_file(_entries_store, output) console.print(f"[green]Exported to {output}[/green]") except Exception as e: console.print(f"[red]Export error: {e}[/red]") @cli.command("export-code") @click.argument("output", type=click.Path()) @click.option( "--language", type=click.Choice(["python", "javascript", "go"]), default="python", help="Target language", ) def export_code(output: str, language: str) -> None: """Export entries as code snippets.""" global _entries_store if not _entries_store: console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]") return exporter = CodeExporter() try: exporter.to_file(_entries_store, output, language) console.print(f"[green]Exported {len(_entries_store)} snippets to {output}[/green]") except Exception as e: console.print(f"[red]Export error: {e}[/red]") @cli.command("export-openapi") @click.argument("output", type=click.Path()) @click.option("--title", default="API", help="API title") @click.option("--version", default="1.0.0", help="API version") @click.option("--no-validate", is_flag=True, help="Skip validation") def export_openapi( output: str, title: str, version: str, no_validate: bool ) -> None: """Generate OpenAPI spec from traffic.""" global _entries_store if not _entries_store: console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]") return generator = OpenAPIGenerator(_entries_store) try: spec = generator.generate( title=title, version=version, validate_spec=not no_validate, ) with open(output, "w") as f: f.write(generator.to_json(spec)) console.print(f"[green]OpenAPI spec exported to {output}[/green]") except ValueError as e: console.print(f"[red]Validation error: {e}[/red]") except Exception as e: console.print(f"[red]Export error: {e}[/red]") @cli.command() @click.option("--method", multiple=True, help="Filter by method") @click.option("--status", multiple=True, type=int, help="Filter by status code") @click.option("--url", help="Filter by URL pattern") @click.option("--content-type", multiple=True, help="Filter by content type") def filter_entries( method: tuple[str, ...], status: tuple[int, ...], url: str | None, content_type: tuple[str, ...], ) -> None: """Filter entries and show results (alias for list with filters).""" ctx = click.get_current_context() ctx.invoke( list_entries, limit=50, method=method, status=status, url=url, content_type=content_type, ) def main() -> None: """Main entry point.""" cli() if __name__ == "__main__": main()