278 lines
8.9 KiB
Python
278 lines
8.9 KiB
Python
"""Statistics generator for HTTP traffic analytics."""
|
|
|
|
import re
|
|
from collections import Counter, defaultdict
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from rich.table import Table
|
|
|
|
from http_log_explorer.models import HTTPEntry
|
|
|
|
|
|
@dataclass
|
|
class TrafficStats:
|
|
"""Container for traffic statistics."""
|
|
|
|
total_requests: int
|
|
endpoint_count: dict[str, int]
|
|
method_distribution: dict[str, int]
|
|
status_breakdown: dict[int, int]
|
|
content_type_distribution: dict[str, int]
|
|
response_time_stats: dict[str, float]
|
|
hosts: dict[str, int]
|
|
|
|
|
|
class StatsGenerator:
|
|
"""Generate statistics from HTTP entries."""
|
|
|
|
def __init__(self, entries: list[HTTPEntry]) -> None:
|
|
"""Initialize with HTTP entries.
|
|
|
|
Args:
|
|
entries: List of HTTPEntry objects
|
|
"""
|
|
self.entries = entries
|
|
|
|
def generate(self) -> TrafficStats:
|
|
"""Generate all statistics.
|
|
|
|
Returns:
|
|
TrafficStats object with all computed statistics
|
|
"""
|
|
return TrafficStats(
|
|
total_requests=len(self.entries),
|
|
endpoint_count=self.endpoint_count(),
|
|
method_distribution=self.method_distribution(),
|
|
status_breakdown=self.status_breakdown(),
|
|
content_type_distribution=self.content_type_distribution(),
|
|
response_time_stats=self.response_time_stats(),
|
|
hosts=self.hosts(),
|
|
)
|
|
|
|
def endpoint_count(self) -> dict[str, int]:
|
|
"""Count requests per endpoint pattern.
|
|
|
|
Returns:
|
|
Dictionary mapping endpoint patterns to counts
|
|
"""
|
|
counter: Counter[str] = Counter()
|
|
for entry in self.entries:
|
|
endpoint = self._normalize_endpoint(entry.endpoint)
|
|
counter[endpoint] += 1
|
|
return dict(counter.most_common())
|
|
|
|
def method_distribution(self) -> dict[str, int]:
|
|
"""Get distribution of HTTP methods.
|
|
|
|
Returns:
|
|
Dictionary mapping methods to counts
|
|
"""
|
|
counter = Counter(e.request.method for e in self.entries)
|
|
return dict(counter)
|
|
|
|
def status_breakdown(self) -> dict[int, int]:
|
|
"""Get breakdown of status codes.
|
|
|
|
Returns:
|
|
Dictionary mapping status codes to counts
|
|
"""
|
|
counter = Counter(e.response.status for e in self.entries)
|
|
return dict(sorted(counter.items()))
|
|
|
|
def content_type_distribution(self) -> dict[str, int]:
|
|
"""Get distribution of content types.
|
|
|
|
Returns:
|
|
Dictionary mapping content types to counts
|
|
"""
|
|
counter: Counter[str] = Counter()
|
|
for entry in self.entries:
|
|
ct = entry.content_type or "unknown"
|
|
main_type = ct.split(";")[0].strip()
|
|
counter[main_type] += 1
|
|
return dict(counter.most_common())
|
|
|
|
def response_time_stats(self) -> dict[str, float]:
|
|
"""Calculate response time statistics.
|
|
|
|
Returns:
|
|
Dictionary with min, max, avg, median response times in ms
|
|
"""
|
|
times = [e.duration_ms for e in self.entries if e.duration_ms is not None]
|
|
if not times:
|
|
return {"min": 0.0, "max": 0.0, "avg": 0.0, "median": 0.0, "p95": 0.0, "p99": 0.0}
|
|
|
|
sorted_times = sorted(times)
|
|
n = len(sorted_times)
|
|
|
|
stats = {
|
|
"min": float(sorted_times[0]),
|
|
"max": float(sorted_times[-1]),
|
|
"avg": float(sum(times) / n),
|
|
"median": float(sorted_times[n // 2]),
|
|
}
|
|
|
|
p95_idx = int(n * 0.95)
|
|
p99_idx = int(n * 0.99)
|
|
stats["p95"] = float(sorted_times[min(p95_idx, n - 1)])
|
|
stats["p99"] = float(sorted_times[min(p99_idx, n - 1)])
|
|
|
|
return stats
|
|
|
|
def hosts(self) -> dict[str, int]:
|
|
"""Get request count per host.
|
|
|
|
Returns:
|
|
Dictionary mapping hosts to counts
|
|
"""
|
|
counter = Counter(e.host for e in self.entries)
|
|
return dict(counter.most_common())
|
|
|
|
def status_code_categories(self) -> dict[str, int]:
|
|
"""Get counts by status code category.
|
|
|
|
Returns:
|
|
Dictionary with 1xx, 2xx, 3xx, 4xx, 5xx counts
|
|
"""
|
|
categories: dict[str, int] = {
|
|
"1xx informational": 0,
|
|
"2xx success": 0,
|
|
"3xx redirection": 0,
|
|
"4xx client error": 0,
|
|
"5xx server error": 0,
|
|
}
|
|
|
|
for entry in self.entries:
|
|
status = entry.response.status
|
|
if 100 <= status < 200:
|
|
categories["1xx informational"] += 1
|
|
elif 200 <= status < 300:
|
|
categories["2xx success"] += 1
|
|
elif 300 <= status < 400:
|
|
categories["3xx redirection"] += 1
|
|
elif 400 <= status < 500:
|
|
categories["4xx client error"] += 1
|
|
elif 500 <= status < 600:
|
|
categories["5xx server error"] += 1
|
|
|
|
return categories
|
|
|
|
def endpoint_patterns(self) -> dict[str, int]:
|
|
"""Extract common endpoint patterns with path parameters.
|
|
|
|
Returns:
|
|
Dictionary mapping patterns to counts
|
|
"""
|
|
patterns: dict[str, int] = defaultdict(int)
|
|
|
|
for entry in self.entries:
|
|
pattern = self._extract_pattern(entry.endpoint)
|
|
patterns[pattern] += 1
|
|
|
|
return dict(sorted(patterns.items(), key=lambda x: x[1], reverse=True))
|
|
|
|
def _normalize_endpoint(self, endpoint: str) -> str:
|
|
"""Normalize endpoint by removing IDs and versions."""
|
|
cleaned = re.sub(r"/\d+", "/{id}", endpoint)
|
|
cleaned = re.sub(r"/[a-f0-9-]{36}", "/{uuid}", cleaned)
|
|
cleaned = re.sub(r"/v\d+(?:\.\d+)?", "", cleaned)
|
|
return cleaned
|
|
|
|
def _extract_pattern(self, endpoint: str) -> str:
|
|
"""Extract endpoint pattern with parameter placeholders."""
|
|
parts = endpoint.split("/")
|
|
normalized_parts = []
|
|
|
|
for part in parts:
|
|
if not part:
|
|
normalized_parts.append("")
|
|
elif part.isdigit():
|
|
normalized_parts.append("{id}")
|
|
elif self._is_uuid(part):
|
|
normalized_parts.append("{uuid}")
|
|
elif self._is_hash(part):
|
|
normalized_parts.append("{hash}")
|
|
else:
|
|
normalized_parts.append(part)
|
|
|
|
return "/".join(normalized_parts)
|
|
|
|
def _is_uuid(self, s: str) -> bool:
|
|
"""Check if string looks like a UUID."""
|
|
uuid_pattern = re.compile(
|
|
r"^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$",
|
|
re.IGNORECASE,
|
|
)
|
|
return bool(uuid_pattern.match(s))
|
|
|
|
def _is_hash(self, s: str) -> bool:
|
|
"""Check if string looks like a hash."""
|
|
hash_pattern = re.compile(r"^[a-f0-9]{32,}$", re.IGNORECASE)
|
|
return bool(hash_pattern.match(s))
|
|
|
|
def render_table(self, stats: TrafficStats | None = None) -> Table:
|
|
"""Render statistics as a Rich table.
|
|
|
|
Args:
|
|
stats: Pre-generated stats, or None to generate new
|
|
|
|
Returns:
|
|
Rich Table object
|
|
"""
|
|
if stats is None:
|
|
stats = self.generate()
|
|
|
|
table = Table(title="Traffic Statistics")
|
|
|
|
table.add_column("Metric", style="cyan")
|
|
table.add_column("Value", style="green")
|
|
|
|
table.add_row("Total Requests", str(stats.total_requests))
|
|
|
|
method_rows = [f"{m}: {c}" for m, c in sorted(stats.method_distribution.items())]
|
|
table.add_row("Methods", ", ".join(method_rows) if method_rows else "N/A")
|
|
|
|
status_rows = [f"{s}: {c}" for s, c in sorted(stats.status_breakdown.items())]
|
|
table.add_row("Status Codes", ", ".join(status_rows) if status_rows else "N/A")
|
|
|
|
rt = stats.response_time_stats
|
|
if rt["avg"] > 0:
|
|
table.add_row(
|
|
"Response Time (avg)",
|
|
f"{rt['avg']:.2f}ms",
|
|
)
|
|
table.add_row(
|
|
"Response Time (p95)",
|
|
f"{rt['p95']:.2f}ms",
|
|
)
|
|
|
|
top_endpoints = list(stats.endpoint_count.items())[:5]
|
|
endpoint_rows = [f"{e}: {c}" for e, c in top_endpoints]
|
|
table.add_row("Top Endpoints", ", ".join(endpoint_rows) if endpoint_rows else "N/A")
|
|
|
|
return table
|
|
|
|
def to_dict(self, stats: TrafficStats | None = None) -> dict[str, Any]:
|
|
"""Convert stats to dictionary.
|
|
|
|
Args:
|
|
stats: Pre-generated stats, or None to generate new
|
|
|
|
Returns:
|
|
Dictionary representation of stats
|
|
"""
|
|
if stats is None:
|
|
stats = self.generate()
|
|
|
|
return {
|
|
"total_requests": stats.total_requests,
|
|
"endpoint_count": stats.endpoint_count,
|
|
"method_distribution": stats.method_distribution,
|
|
"status_breakdown": stats.status_breakdown,
|
|
"content_type_distribution": stats.content_type_distribution,
|
|
"response_time_stats": stats.response_time_stats,
|
|
"hosts": stats.hosts,
|
|
"status_code_categories": self.status_code_categories(),
|
|
}
|