Files
schema2mock/http_log_explorer/parsers/devtools_parser.py

134 lines
5.2 KiB
Python

"""Parser for Chrome DevTools network export format."""
import json
from datetime import datetime
from typing import Any
from http_log_explorer.models import HTTPEntry, Request, Response
from http_log_explorer.parsers import ParserInterface
class DevToolsParser(ParserInterface):
"""Parser for Chrome DevTools network export JSON."""
@staticmethod
def get_parser_name() -> str:
return "DevTools"
def can_parse(self, content: str | bytes) -> bool:
"""Check if content appears to be DevTools network export."""
if isinstance(content, bytes):
content = content.decode("utf-8", errors="ignore")
try:
data = json.loads(content)
if isinstance(data, list):
return all(
"request" in item and "response" in item for item in data[:3] if isinstance(item, dict)
)
if isinstance(data, dict):
has_log = "log" in data
has_entries = "entries" in data.get("log", {})
has_creator = "creator" in data.get("log", {})
return has_log and has_entries and not has_creator
except json.JSONDecodeError:
return False
return False
def parse(self, content: str | bytes, source_file: str | None = None) -> list[HTTPEntry]:
"""Parse DevTools network export into HTTPEntry objects."""
if isinstance(content, bytes):
content = content.decode("utf-8", errors="replace")
try:
data = json.loads(content)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON format: {e}") from e
if isinstance(data, dict) and "log" in data:
entries_data = data.get("log", {}).get("entries", [])
elif isinstance(data, list):
entries_data = data
else:
raise ValueError("Unrecognized DevTools format")
entries: list[HTTPEntry] = []
for idx, entry_data in enumerate(entries_data):
try:
entry = self._convert_entry(entry_data, idx, source_file)
if entry:
entries.append(entry)
except Exception:
continue
return entries
def _convert_entry(
self, entry_data: dict[str, Any], idx: int, source_file: str | None
) -> HTTPEntry | None:
"""Convert a DevTools entry to our HTTPEntry model."""
request_data = entry_data.get("request", {})
response_data = entry_data.get("response", {})
if not request_data or not response_data:
return None
request = Request(
method=request_data.get("method", "GET"),
url=request_data.get("url", ""),
http_version=request_data.get("httpVersion", "HTTP/1.1"),
headers=self._parse_headers(request_data.get("headers", {})),
body=request_data.get("postData", {}).get("text") if request_data.get("postData") else None,
query_params=self._parse_query_params(request_data.get("queryString", [])),
)
response = Response(
status=response_data.get("status", 0),
status_text=response_data.get("statusText", ""),
http_version=response_data.get("httpVersion", "HTTP/1.1"),
headers=self._parse_headers(response_data.get("headers", {})),
body=response_data.get("content", {}).get("text") if isinstance(response_data.get("content"), dict) else None,
content_type=response_data.get("content", {}).get("mimeType") if isinstance(response_data.get("content"), dict) else None,
response_time_ms=self._parse_time(entry_data),
)
timestamp = self._parse_timestamp(entry_data)
return HTTPEntry(
id=f"devtools-{idx}",
request=request,
response=response,
timestamp=timestamp,
server_ip=entry_data.get("serverIPAddress"),
connection=entry_data.get("connection"),
source_file=source_file,
)
def _parse_headers(self, headers: dict[str, Any] | list) -> dict[str, str]:
"""Parse headers to dictionary."""
if isinstance(headers, dict):
return dict(headers)
if isinstance(headers, list):
return {h.get("name", ""): h.get("value", "") for h in headers}
return {}
def _parse_query_params(self, query_string: list[dict[str, Any]]) -> dict[str, str]:
"""Parse query string list to dictionary."""
if isinstance(query_string, list):
return {p.get("name", ""): p.get("value", "") for p in query_string}
return {}
def _parse_time(self, entry_data: dict[str, Any]) -> float | None:
"""Parse time from DevTools entry."""
if "time" in entry_data:
return float(entry_data["time"])
return None
def _parse_timestamp(self, entry_data: dict[str, Any]) -> datetime | None:
"""Parse timestamp from DevTools entry."""
if "startedDateTime" in entry_data:
try:
return datetime.fromisoformat(entry_data["startedDateTime"].replace("Z", "+00:00"))
except (ValueError, AttributeError):
pass
return None