Files

141 lines
5.0 KiB
Python

"""Parser for curl -v output."""
import re
from datetime import datetime
from typing import Any
from http_log_explorer.models import HTTPEntry, Request, Response
from http_log_explorer.parsers import ParserInterface
class CurlParser(ParserInterface):
"""Parser for curl -v verbose output."""
REQUEST_LINE_RE = re.compile(r"^> (\w+) (\S+) (HTTP/[\d.]+)$", re.MULTILINE)
RESPONSE_LINE_RE = re.compile(r"^< (HTTP/[\d.]+) (\d+) (.+)$", re.MULTILINE)
HEADER_RE = re.compile(r"^(> |<) ([^:]+): (.+)$")
TIMING_RE = re.compile(r"^\* time_conditional check:.*$")
@staticmethod
def get_parser_name() -> str:
return "curl"
def can_parse(self, content: str | bytes) -> bool:
"""Check if content appears to be curl -v output."""
if isinstance(content, bytes):
content = content.decode("utf-8", errors="ignore")
has_request = bool(self.REQUEST_LINE_RE.search(content))
has_response = bool(self.RESPONSE_LINE_RE.search(content))
return has_request and has_response
def parse(self, content: str | bytes, source_file: str | None = None) -> list[HTTPEntry]:
"""Parse curl -v output into HTTPEntry objects."""
if isinstance(content, bytes):
content = content.decode("utf-8", errors="replace")
entries: list[HTTPEntry] = []
blocks = self._split_blocks(content)
for idx, block in enumerate(blocks):
try:
entry = self._parse_block(block, idx, source_file)
if entry:
entries.append(entry)
except Exception:
continue
return entries
def _split_blocks(self, content: str) -> list[dict[str, Any]]:
"""Split curl output into request/response blocks."""
blocks: list[dict[str, Any]] = []
current_block: dict[str, Any] = {}
lines = content.split("\n")
for line in lines:
request_match = self.REQUEST_LINE_RE.match(line)
if request_match:
if current_block.get("request"):
blocks.append(current_block)
current_block = {
"request": {
"method": request_match.group(1),
"url": request_match.group(2),
"http_version": request_match.group(3),
},
"headers": [],
"body": None,
"response": None,
}
continue
response_match = self.RESPONSE_LINE_RE.match(line)
if response_match:
if current_block.get("request"):
current_block["response"] = {
"http_version": response_match.group(1),
"status": int(response_match.group(2)),
"status_text": response_match.group(3),
}
continue
header_match = self.HEADER_RE.match(line)
if header_match:
direction = header_match.group(1)
name = header_match.group(2)
value = header_match.group(3)
if direction == ">" and "headers" in current_block:
current_block["headers"].append((name, value))
continue
if current_block and current_block.get("response") and line.strip():
if current_block["response"].get("body") is None:
current_block["response"]["body"] = ""
current_block["response"]["body"] += line + "\n"
if current_block.get("request"):
blocks.append(current_block)
return blocks
def _parse_block(
self, block: dict[str, Any], idx: int, source_file: str | None
) -> HTTPEntry | None:
"""Parse a single request/response block."""
if not block.get("request") or not block.get("response"):
return None
req_data = block["request"]
resp_data = block["response"]
headers = dict(block.get("headers", []))
request = Request(
method=req_data.get("method", "GET"),
url=req_data.get("url", "/"),
http_version=req_data.get("http_version", "HTTP/1.1"),
headers=headers,
body=block.get("body"),
)
response_body = resp_data.get("body", "")
if response_body:
response_body = response_body.strip()
response = Response(
status=resp_data.get("status", 0),
status_text=resp_data.get("status_text", ""),
http_version=resp_data.get("http_version", "HTTP/1.1"),
headers={},
body=response_body if response_body else None,
content_type=headers.get("Content-Type") or headers.get("content-type"),
)
return HTTPEntry(
id=f"curl-{idx}",
request=request,
response=response,
timestamp=datetime.now(),
source_file=source_file,
)