340 lines
10 KiB
Python
340 lines
10 KiB
Python
"""CLI commands for HTTP Log Explorer."""
|
|
|
|
import sys
|
|
|
|
import click
|
|
from rich.console import Console
|
|
|
|
from http_log_explorer.analyzers import DiffEngine, StatsGenerator, TrafficAnalyzer
|
|
from http_log_explorer.cli.formatter import Formatter
|
|
from http_log_explorer.exporters import CodeExporter, CurlExporter, JSONExporter
|
|
from http_log_explorer.generators import OpenAPIGenerator
|
|
from http_log_explorer.models import FilterCriteria, HTTPEntry
|
|
from http_log_explorer.parsers import get_parser
|
|
|
|
console = Console()
|
|
formatter = Formatter()
|
|
|
|
_entries_store: list[HTTPEntry] = []
|
|
|
|
|
|
def reset_entries() -> None:
|
|
"""Reset the global entries store. Used for testing."""
|
|
global _entries_store
|
|
_entries_store = []
|
|
|
|
|
|
@click.group()
|
|
@click.version_option(version="0.1.0")
|
|
def cli() -> None:
|
|
"""HTTP Log Explorer - Parse, analyze, and explore HTTP traffic logs."""
|
|
pass
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("file", type=click.Path(exists=True))
|
|
@click.option("--stats", is_flag=True, help="Show statistics after loading")
|
|
def load(file: str, stats: bool) -> None:
|
|
"""Load and parse an HTTP log file.
|
|
|
|
Supports HAR files, curl -v output, and Chrome DevTools exports.
|
|
"""
|
|
global _entries_store
|
|
|
|
try:
|
|
with open(file, encoding="utf-8", errors="replace") as f:
|
|
content = f.read()
|
|
except Exception as e:
|
|
console.print(f"[red]Error reading file: {e}[/red]")
|
|
sys.exit(1)
|
|
|
|
if not content.strip():
|
|
console.print("[red]Error: File is empty[/red]")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
parser = get_parser(content)
|
|
console.print(f"[green]Using parser: {parser.get_parser_name()}[/green]")
|
|
entries = parser.parse(content, source_file=file)
|
|
except ValueError as e:
|
|
console.print(f"[red]Parse error: {e}[/red]")
|
|
console.print("[yellow]Supported formats:[/yellow]")
|
|
console.print(" - HAR files (HTTP Archive format)")
|
|
console.print(" - curl -v output")
|
|
console.print(" - Chrome DevTools network exports")
|
|
sys.exit(1)
|
|
|
|
_entries_store = entries
|
|
console.print(f"[green]Loaded {len(entries)} entries[/green]")
|
|
|
|
if stats and entries:
|
|
_show_stats(entries)
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--limit", type=int, default=50, help="Limit number of entries shown")
|
|
@click.option("--method", multiple=True, help="Filter by method (e.g., GET, POST)")
|
|
@click.option("--status", multiple=True, type=int, help="Filter by status code")
|
|
@click.option("--url", help="Filter by URL pattern (regex)")
|
|
@click.option("--content-type", multiple=True, help="Filter by content type")
|
|
def list_entries(
|
|
limit: int,
|
|
method: tuple[str, ...],
|
|
status: tuple[int, ...],
|
|
url: str | None,
|
|
content_type: tuple[str, ...],
|
|
) -> None:
|
|
"""List loaded HTTP entries with optional filtering."""
|
|
global _entries_store
|
|
|
|
if not _entries_store:
|
|
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
|
|
return
|
|
|
|
entries = list(_entries_store)
|
|
|
|
criteria = FilterCriteria(
|
|
methods=list(method) if method else None,
|
|
status_codes=list(status) if status else None,
|
|
url_pattern=url,
|
|
content_types=list(content_type) if content_type else None,
|
|
)
|
|
|
|
analyzer = TrafficAnalyzer(entries)
|
|
filtered = analyzer.filter(criteria)
|
|
|
|
table = formatter.format_entry_table(filtered, limit=limit)
|
|
console.print(table)
|
|
console.print(f"\n[dim]Showing {min(limit, len(filtered))} of {len(filtered)} entries[/dim]")
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("query")
|
|
@click.option("--case-sensitive", is_flag=True, help="Case sensitive search")
|
|
def search(query: str, case_sensitive: bool) -> None:
|
|
"""Search across URLs and bodies."""
|
|
global _entries_store
|
|
|
|
if not _entries_store:
|
|
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
|
|
return
|
|
|
|
analyzer = TrafficAnalyzer(_entries_store)
|
|
results = analyzer.search(query, case_sensitive=case_sensitive)
|
|
|
|
table = formatter.format_entry_table(results, limit=50)
|
|
console.print(table)
|
|
console.print(f"\n[dim]Found {len(results)} matching entries[/dim]")
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("entry_id1")
|
|
@click.argument("entry_id2")
|
|
def diff(entry_id1: str, entry_id2: str) -> None:
|
|
"""Compare two HTTP entries by ID."""
|
|
global _entries_store
|
|
|
|
if not _entries_store:
|
|
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
|
|
return
|
|
|
|
analyzer = TrafficAnalyzer(_entries_store)
|
|
entry1 = analyzer.get_entry_by_id(entry_id1)
|
|
entry2 = analyzer.get_entry_by_id(entry_id2)
|
|
|
|
if not entry1:
|
|
console.print(f"[red]Entry not found: {entry_id1}[/red]")
|
|
return
|
|
if not entry2:
|
|
console.print(f"[red]Entry not found: {entry_id2}[/red]")
|
|
return
|
|
|
|
engine = DiffEngine()
|
|
diff_result = engine.diff(entry1, entry2)
|
|
diff_output = engine.unified_diff_output(diff_result)
|
|
|
|
console.print(diff_output)
|
|
|
|
|
|
@cli.command()
|
|
def stats() -> None:
|
|
"""Show statistics for loaded entries."""
|
|
global _entries_store
|
|
|
|
if not _entries_store:
|
|
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
|
|
return
|
|
|
|
_show_stats(_entries_store)
|
|
|
|
|
|
def _show_stats(entries: list[HTTPEntry]) -> None:
|
|
"""Show statistics for entries."""
|
|
generator = StatsGenerator(entries)
|
|
stats_data = generator.to_dict()
|
|
|
|
console.print("\n[bold cyan]Traffic Statistics[/bold cyan]")
|
|
console.print(f"Total Requests: {stats_data['total_requests']}")
|
|
|
|
console.print("\n[bold]Method Distribution[/bold]")
|
|
for method, count in sorted(stats_data["method_distribution"].items()):
|
|
console.print(f" {method}: {count}")
|
|
|
|
console.print("\n[bold]Status Code Breakdown[/bold]")
|
|
for status, count in sorted(stats_data["status_breakdown"].items()):
|
|
console.print(f" {status}: {count}")
|
|
|
|
console.print("\n[bold]Top Endpoints[/bold]")
|
|
for endpoint, count in list(stats_data["endpoint_count"].items())[:10]:
|
|
console.print(f" {endpoint}: {count}")
|
|
|
|
rt = stats_data.get("response_time_stats", {})
|
|
if rt.get("avg", 0) > 0:
|
|
console.print("\n[bold]Response Times[/bold]")
|
|
console.print(f" Min: {rt.get('min', 0):.2f}ms")
|
|
console.print(f" Max: {rt.get('max', 0):.2f}ms")
|
|
console.print(f" Avg: {rt.get('avg', 0):.2f}ms")
|
|
console.print(f" Median: {rt.get('median', 0):.2f}ms")
|
|
console.print(f" P95: {rt.get('p95', 0):.2f}ms")
|
|
console.print(f" P99: {rt.get('p99', 0):.2f}ms")
|
|
|
|
|
|
@cli.command("export-json")
|
|
@click.argument("output", type=click.Path())
|
|
@click.option("--compact", is_flag=True, help="Export compact JSON")
|
|
@click.option("--summary", is_flag=True, help="Export summary only")
|
|
def export_json(output: str, compact: bool, summary: bool) -> None:
|
|
"""Export entries to JSON file."""
|
|
global _entries_store
|
|
|
|
if not _entries_store:
|
|
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
|
|
return
|
|
|
|
exporter = JSONExporter()
|
|
|
|
try:
|
|
if summary:
|
|
content = exporter.export_summary(_entries_store)
|
|
elif compact:
|
|
content = exporter.export_compact(_entries_store)
|
|
else:
|
|
content = exporter.export(_entries_store)
|
|
|
|
with open(output, "w") as f:
|
|
f.write(content)
|
|
|
|
console.print(f"[green]Exported to {output}[/green]")
|
|
except Exception as e:
|
|
console.print(f"[red]Export error: {e}[/red]")
|
|
|
|
|
|
@cli.command("export-curl")
|
|
@click.argument("output", type=click.Path())
|
|
def export_curl(output: str) -> None:
|
|
"""Export entries as cURL commands."""
|
|
global _entries_store
|
|
|
|
if not _entries_store:
|
|
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
|
|
return
|
|
|
|
exporter = CurlExporter()
|
|
|
|
try:
|
|
exporter.to_file(_entries_store, output)
|
|
console.print(f"[green]Exported to {output}[/green]")
|
|
except Exception as e:
|
|
console.print(f"[red]Export error: {e}[/red]")
|
|
|
|
|
|
@cli.command("export-code")
|
|
@click.argument("output", type=click.Path())
|
|
@click.option(
|
|
"--language",
|
|
type=click.Choice(["python", "javascript", "go"]),
|
|
default="python",
|
|
help="Target language",
|
|
)
|
|
def export_code(output: str, language: str) -> None:
|
|
"""Export entries as code snippets."""
|
|
global _entries_store
|
|
|
|
if not _entries_store:
|
|
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
|
|
return
|
|
|
|
exporter = CodeExporter()
|
|
|
|
try:
|
|
exporter.to_file(_entries_store, output, language)
|
|
console.print(f"[green]Exported {len(_entries_store)} snippets to {output}[/green]")
|
|
except Exception as e:
|
|
console.print(f"[red]Export error: {e}[/red]")
|
|
|
|
|
|
@cli.command("export-openapi")
|
|
@click.argument("output", type=click.Path())
|
|
@click.option("--title", default="API", help="API title")
|
|
@click.option("--version", default="1.0.0", help="API version")
|
|
@click.option("--no-validate", is_flag=True, help="Skip validation")
|
|
def export_openapi(
|
|
output: str, title: str, version: str, no_validate: bool
|
|
) -> None:
|
|
"""Generate OpenAPI spec from traffic."""
|
|
global _entries_store
|
|
|
|
if not _entries_store:
|
|
console.print("[yellow]No entries loaded. Use 'load' command first.[/yellow]")
|
|
return
|
|
|
|
generator = OpenAPIGenerator(_entries_store)
|
|
|
|
try:
|
|
spec = generator.generate(
|
|
title=title,
|
|
version=version,
|
|
validate_spec=not no_validate,
|
|
)
|
|
|
|
with open(output, "w") as f:
|
|
f.write(generator.to_json(spec))
|
|
|
|
console.print(f"[green]OpenAPI spec exported to {output}[/green]")
|
|
except ValueError as e:
|
|
console.print(f"[red]Validation error: {e}[/red]")
|
|
except Exception as e:
|
|
console.print(f"[red]Export error: {e}[/red]")
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--method", multiple=True, help="Filter by method")
|
|
@click.option("--status", multiple=True, type=int, help="Filter by status code")
|
|
@click.option("--url", help="Filter by URL pattern")
|
|
@click.option("--content-type", multiple=True, help="Filter by content type")
|
|
def filter_entries(
|
|
method: tuple[str, ...],
|
|
status: tuple[int, ...],
|
|
url: str | None,
|
|
content_type: tuple[str, ...],
|
|
) -> None:
|
|
"""Filter entries and show results (alias for list with filters)."""
|
|
ctx = click.get_current_context()
|
|
ctx.invoke(
|
|
list_entries,
|
|
limit=50,
|
|
method=method,
|
|
status=status,
|
|
url=url,
|
|
content_type=content_type,
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
"""Main entry point."""
|
|
cli()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|