Files
testdata-cli/http_log_explorer/parsers/har_parser.py

147 lines
5.4 KiB
Python

"""HAR file parser using haralyzer."""
import json
from datetime import datetime
from typing import Any
from haralyzer import HarParser
from http_log_explorer.models import HTTPEntry, Request, Response
from http_log_explorer.parsers import ParserInterface
class HARParser(ParserInterface):
"""Parser for HAR (HTTP Archive) files."""
@staticmethod
def get_parser_name() -> str:
return "HAR"
def can_parse(self, content: str | bytes) -> bool:
"""Check if content appears to be a HAR file."""
if isinstance(content, bytes):
content = content.decode("utf-8", errors="ignore")
try:
data = json.loads(content)
has_log = "log" in data
has_entries = "entries" in data.get("log", {})
has_creator = "creator" in data.get("log", {})
return has_log and has_entries and has_creator
except (json.JSONDecodeError, AttributeError):
return False
def parse(self, content: str | bytes, source_file: str | None = None) -> list[HTTPEntry]:
"""Parse HAR content into HTTPEntry objects."""
if isinstance(content, bytes):
content = content.decode("utf-8", errors="replace")
try:
data = json.loads(content)
har_parser = HarParser(data)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid HAR format: {e}") from e
except Exception as e:
raise ValueError(f"Invalid HAR format: {e}") from e
entries: list[HTTPEntry] = []
har_entries = har_parser.har_data.get("entries", [])
for idx, har_entry in enumerate(har_entries):
try:
entry = self._convert_har_entry(har_entry, idx, source_file)
if entry:
entries.append(entry)
except Exception:
continue
return entries
def _convert_har_entry(
self, har_entry: Any, idx: int, source_file: str | None
) -> HTTPEntry | None:
"""Convert a haralyzer entry to our HTTPEntry model."""
request_data = har_entry.get("request")
response_data = har_entry.get("response")
if not request_data or not response_data:
return None
request = Request(
method=request_data.get("method", "GET"),
url=self._build_url(request_data),
http_version=request_data.get("httpVersion", "HTTP/1.1"),
headers=self._parse_headers(request_data.get("headers", [])),
body=self._get_request_body(request_data),
query_params=self._parse_query_params(request_data.get("queryString", [])),
)
response = Response(
status=response_data.get("status", 0),
status_text=response_data.get("statusText", ""),
http_version=response_data.get("httpVersion", "HTTP/1.1"),
headers=self._parse_headers(response_data.get("headers", [])),
body=self._get_response_body(response_data),
content_type=self._get_content_type(response_data.get("content", {})),
response_time_ms=har_entry.get("time", None),
)
timestamp = self._parse_timestamp(har_entry)
return HTTPEntry(
id=f"har-{idx}",
request=request,
response=response,
timestamp=timestamp,
server_ip=har_entry.get("serverIPAddress", None),
connection=har_entry.get("connection", None),
source_file=source_file,
)
def _build_url(self, request_data: dict[str, Any]) -> str:
"""Build full URL from request data."""
url = request_data.get("url", "")
if not url:
host = ""
for header in request_data.get("headers", []):
if header.get("name", "").lower() == "host":
host = header.get("value", "")
break
url = f"http://{host}/"
return url
def _parse_headers(self, headers: list[dict[str, Any]]) -> dict[str, str]:
"""Parse headers list to dictionary."""
return {h.get("name", ""): h.get("value", "") for h in headers}
def _parse_query_params(self, query_string: list[dict[str, Any]]) -> dict[str, str]:
"""Parse query string list to dictionary."""
return {p.get("name", ""): p.get("value", "") for p in query_string}
def _get_request_body(self, request_data: dict[str, Any]) -> str | None:
"""Extract request body."""
post_data = request_data.get("postData", {})
if post_data:
if isinstance(post_data, dict):
return post_data.get("text", None)
return str(post_data)
return None
def _get_response_body(self, response_data: dict[str, Any]) -> str | None:
"""Extract response body."""
content = response_data.get("content", {})
if isinstance(content, dict):
return content.get("text", None)
return None
def _get_content_type(self, content: dict[str, Any]) -> str | None:
"""Extract content type from content dict."""
if isinstance(content, dict):
return content.get("mimeType", None)
return None
def _parse_timestamp(self, har_entry: Any) -> datetime | None:
"""Parse timestamp from HAR entry."""
started_datetime = getattr(har_entry, "started_datetime", None)
if started_datetime:
return started_datetime
return None