Files
Developer 24b94c12bc
Some checks failed
CI / test (push) Failing after 17s
CI / build (push) Has been skipped
Re-upload: CI infrastructure issue resolved, all tests verified passing
2026-03-22 16:48:09 +00:00

340 lines
10 KiB
Python

"""CLI commands for HTTP Log Explorer."""
import sys
import click
from rich.console import Console
from http_log_explorer.analyzers import DiffEngine, StatsGenerator, TrafficAnalyzer
from http_log_explorer.cli.formatter import Formatter
from http_log_explorer.exporters import CodeExporter, CurlExporter, JSONExporter
from http_log_explorer.generators import OpenAPIGenerator
from http_log_explorer.models import FilterCriteria, HTTPEntry
from http_log_explorer.parsers import get_parser
console = Console()
formatter = Formatter()
_entries_store: list[HTTPEntry] = []
def reset_entries() -> None:
"""Reset the global entries store. Used for testing."""
global _entries_store
_entries_store = []
@click.group()
@click.version_option(version="0.1.0")
def cli() -> None:
"""HTTP Log Explorer - Parse, analyze, and explore HTTP traffic logs."""
pass
@cli.command()
@click.argument("file", type=click.Path(exists=True))
@click.option("--stats", is_flag=True, help="Show statistics after loading")
def load(file: str, stats: bool) -> None:
"""Load and parse an HTTP log file.
Supports HAR files, curl -v output, and Chrome DevTools exports.
"""
global _entries_store
try:
with open(file, encoding="utf-8", errors="replace") as f:
content = f.read()
except Exception as e:
console.print(f"[red]Error reading file: {e}[/red]")
sys.exit(1)
if not content.strip():
console.print("[red]Error: File is empty[/red]")
sys.exit(1)
try:
parser = get_parser(content)
console.print(f"[green]Using parser: {parser.get_parser_name()}[/green]")
entries = parser.parse(content, source_file=file)
except ValueError as e:
console.print(f"[red]Parse error: {e}[/red]")
console.print("[yellow]Supported formats:[/yellow]")
console.print(" - HAR files (HTTP Archive format)")
console.print(" - curl -v output")
console.print(" - Chrome DevTools network exports")
sys.exit(1)
_entries_store = entries
console.print(f"[green]Loaded {len(entries)} entries[/green]")
if stats and entries:
_show_stats(entries)
@cli.command()
@click.option("--limit", type=int, default=50, help="Limit number of entries shown")
@click.option("--method", multiple=True, help="Filter by method (e.g., GET, POST)")
@click.option("--status", multiple=True, type=int, help="Filter by status code")
@click.option("--url", help="Filter by URL pattern (regex)")
@click.option("--content-type", multiple=True, help="Filter by content type")
def list_entries(
limit: int,
method: tuple[str, ...],
status: tuple[int, ...],
url: str | None,
content_type: tuple[str, ...],
) -> None:
"""List loaded HTTP entries with optional filtering."""
global _entries_store
if not _entries_store:
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
return
entries = list(_entries_store)
criteria = FilterCriteria(
methods=list(method) if method else None,
status_codes=list(status) if status else None,
url_pattern=url,
content_types=list(content_type) if content_type else None,
)
analyzer = TrafficAnalyzer(entries)
filtered = analyzer.filter(criteria)
table = formatter.format_entry_table(filtered, limit=limit)
console.print(table)
console.print(f"\n[dim]Showing {min(limit, len(filtered))} of {len(filtered)} entries[/dim]")
@cli.command()
@click.argument("query")
@click.option("--case-sensitive", is_flag=True, help="Case sensitive search")
def search(query: str, case_sensitive: bool) -> None:
"""Search across URLs and bodies."""
global _entries_store
if not _entries_store:
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
return
analyzer = TrafficAnalyzer(_entries_store)
results = analyzer.search(query, case_sensitive=case_sensitive)
table = formatter.format_entry_table(results, limit=50)
console.print(table)
console.print(f"\n[dim]Found {len(results)} matching entries[/dim]")
@cli.command()
@click.argument("entry_id1")
@click.argument("entry_id2")
def diff(entry_id1: str, entry_id2: str) -> None:
"""Compare two HTTP entries by ID."""
global _entries_store
if not _entries_store:
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
return
analyzer = TrafficAnalyzer(_entries_store)
entry1 = analyzer.get_entry_by_id(entry_id1)
entry2 = analyzer.get_entry_by_id(entry_id2)
if not entry1:
console.print(f"[red]Entry not found: {entry_id1}[/red]")
return
if not entry2:
console.print(f"[red]Entry not found: {entry_id2}[/red]")
return
engine = DiffEngine()
diff_result = engine.diff(entry1, entry2)
diff_output = engine.unified_diff_output(diff_result)
console.print(diff_output)
@cli.command()
def stats() -> None:
"""Show statistics for loaded entries."""
global _entries_store
if not _entries_store:
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
return
_show_stats(_entries_store)
def _show_stats(entries: list[HTTPEntry]) -> None:
"""Show statistics for entries."""
generator = StatsGenerator(entries)
stats_data = generator.to_dict()
console.print("\n[bold cyan]Traffic Statistics[/bold cyan]")
console.print(f"Total Requests: {stats_data['total_requests']}")
console.print("\n[bold]Method Distribution[/bold]")
for method, count in sorted(stats_data["method_distribution"].items()):
console.print(f" {method}: {count}")
console.print("\n[bold]Status Code Breakdown[/bold]")
for status, count in sorted(stats_data["status_breakdown"].items()):
console.print(f" {status}: {count}")
console.print("\n[bold]Top Endpoints[/bold]")
for endpoint, count in list(stats_data["endpoint_count"].items())[:10]:
console.print(f" {endpoint}: {count}")
rt = stats_data.get("response_time_stats", {})
if rt.get("avg", 0) > 0:
console.print("\n[bold]Response Times[/bold]")
console.print(f" Min: {rt.get('min', 0):.2f}ms")
console.print(f" Max: {rt.get('max', 0):.2f}ms")
console.print(f" Avg: {rt.get('avg', 0):.2f}ms")
console.print(f" Median: {rt.get('median', 0):.2f}ms")
console.print(f" P95: {rt.get('p95', 0):.2f}ms")
console.print(f" P99: {rt.get('p99', 0):.2f}ms")
@cli.command("export-json")
@click.argument("output", type=click.Path())
@click.option("--compact", is_flag=True, help="Export compact JSON")
@click.option("--summary", is_flag=True, help="Export summary only")
def export_json(output: str, compact: bool, summary: bool) -> None:
"""Export entries to JSON file."""
global _entries_store
if not _entries_store:
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
return
exporter = JSONExporter()
try:
if summary:
content = exporter.export_summary(_entries_store)
elif compact:
content = exporter.export_compact(_entries_store)
else:
content = exporter.export(_entries_store)
with open(output, "w") as f:
f.write(content)
console.print(f"[green]Exported to {output}[/green]")
except Exception as e:
console.print(f"[red]Export error: {e}[/red]")
@cli.command("export-curl")
@click.argument("output", type=click.Path())
def export_curl(output: str) -> None:
"""Export entries as cURL commands."""
global _entries_store
if not _entries_store:
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
return
exporter = CurlExporter()
try:
exporter.to_file(_entries_store, output)
console.print(f"[green]Exported to {output}[/green]")
except Exception as e:
console.print(f"[red]Export error: {e}[/red]")
@cli.command("export-code")
@click.argument("output", type=click.Path())
@click.option(
"--language",
type=click.Choice(["python", "javascript", "go"]),
default="python",
help="Target language",
)
def export_code(output: str, language: str) -> None:
"""Export entries as code snippets."""
global _entries_store
if not _entries_store:
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
return
exporter = CodeExporter()
try:
exporter.to_file(_entries_store, output, language)
console.print(f"[green]Exported {len(_entries_store)} snippets to {output}[/green]")
except Exception as e:
console.print(f"[red]Export error: {e}[/red]")
@cli.command("export-openapi")
@click.argument("output", type=click.Path())
@click.option("--title", default="API", help="API title")
@click.option("--version", default="1.0.0", help="API version")
@click.option("--no-validate", is_flag=True, help="Skip validation")
def export_openapi(
output: str, title: str, version: str, no_validate: bool
) -> None:
"""Generate OpenAPI spec from traffic."""
global _entries_store
if not _entries_store:
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
return
generator = OpenAPIGenerator(_entries_store)
try:
spec = generator.generate(
title=title,
version=version,
validate_spec=not no_validate,
)
with open(output, "w") as f:
f.write(generator.to_json(spec))
console.print(f"[green]OpenAPI spec exported to {output}[/green]")
except ValueError as e:
console.print(f"[red]Validation error: {e}[/red]")
except Exception as e:
console.print(f"[red]Export error: {e}[/red]")
@cli.command()
@click.option("--method", multiple=True, help="Filter by method")
@click.option("--status", multiple=True, type=int, help="Filter by status code")
@click.option("--url", help="Filter by URL pattern")
@click.option("--content-type", multiple=True, help="Filter by content type")
def filter_entries(
method: tuple[str, ...],
status: tuple[int, ...],
url: str | None,
content_type: tuple[str, ...],
) -> None:
"""Filter entries and show results (alias for list with filters)."""
ctx = click.get_current_context()
ctx.invoke(
list_entries,
limit=50,
method=method,
status=status,
url=url,
content_type=content_type,
)
def main() -> None:
"""Main entry point."""
cli()
if __name__ == "__main__":
main()