fix: resolve CI/CD test, lint, and type-check failures
This commit is contained in:
305
app/api_snapshot/recorder/recorder.py
Normal file
305
app/api_snapshot/recorder/recorder.py
Normal file
@@ -0,0 +1,305 @@
|
||||
"""HTTP traffic recording module."""
|
||||
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Any, Callable, Dict, List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from requests import Response
|
||||
|
||||
|
||||
@dataclass
|
||||
class RecordedRequest:
|
||||
"""Represents a recorded HTTP request."""
|
||||
method: str
|
||||
url: str
|
||||
headers: Dict[str, str]
|
||||
body: Optional[str]
|
||||
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"method": self.method,
|
||||
"url": self.url,
|
||||
"headers": self.headers,
|
||||
"body": self.body,
|
||||
"timestamp": self.timestamp
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "RecordedRequest":
|
||||
"""Create from dictionary."""
|
||||
return cls(
|
||||
method=data["method"],
|
||||
url=data["url"],
|
||||
headers=data.get("headers", {}),
|
||||
body=data.get("body"),
|
||||
timestamp=data.get("timestamp", datetime.utcnow().isoformat())
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RecordedResponse:
|
||||
"""Represents a recorded HTTP response."""
|
||||
status_code: int
|
||||
headers: Dict[str, str]
|
||||
body: Optional[str]
|
||||
latency_ms: int
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"status_code": self.status_code,
|
||||
"headers": self.headers,
|
||||
"body": self.body,
|
||||
"latency_ms": self.latency_ms
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "RecordedResponse":
|
||||
"""Create from dictionary."""
|
||||
return cls(
|
||||
status_code=data["status_code"],
|
||||
headers=data.get("headers", {}),
|
||||
body=data.get("body"),
|
||||
latency_ms=data.get("latency_ms", 0)
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RequestResponsePair:
|
||||
"""A request-response pair for snapshot storage."""
|
||||
request: RecordedRequest
|
||||
response: RecordedResponse
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"request": self.request.to_dict(),
|
||||
"response": self.response.to_dict()
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "RequestResponsePair":
|
||||
"""Create from dictionary."""
|
||||
return cls(
|
||||
request=RecordedRequest.from_dict(data["request"]),
|
||||
response=RecordedResponse.from_dict(data["response"])
|
||||
)
|
||||
|
||||
|
||||
class RecordingSession:
|
||||
"""Session for recording HTTP traffic."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: Optional[str] = None,
|
||||
on_request: Optional[Callable[[RequestResponsePair], None]] = None
|
||||
):
|
||||
"""Initialize the recording session.
|
||||
|
||||
Args:
|
||||
base_url: Optional base URL to prepend to relative URLs
|
||||
on_request: Optional callback called after each request
|
||||
"""
|
||||
self.base_url = base_url
|
||||
self.on_request = on_request
|
||||
self.recordings: List[RequestResponsePair] = []
|
||||
self.session = requests.Session()
|
||||
|
||||
def _build_url(self, url: str) -> str:
|
||||
"""Build full URL from relative or absolute URL."""
|
||||
if url.startswith(("http://", "https://")):
|
||||
return url
|
||||
if self.base_url:
|
||||
parsed_base = urlparse(self.base_url)
|
||||
if not url.startswith("/"):
|
||||
url = "/" + url
|
||||
return f"{parsed_base.scheme}://{parsed_base.netloc}{url}"
|
||||
return url
|
||||
|
||||
def _extract_headers(self, response: Response) -> Dict[str, str]:
|
||||
"""Extract headers from response."""
|
||||
return dict(response.headers)
|
||||
|
||||
def _extract_body(self, response: Response) -> Optional[str]:
|
||||
"""Extract body from response as string."""
|
||||
try:
|
||||
return response.text
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def record_request(
|
||||
self,
|
||||
method: str,
|
||||
url: str,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
body: Optional[Union[str, Dict]] = None,
|
||||
**kwargs: Any
|
||||
) -> Response:
|
||||
"""Make a request and record it.
|
||||
|
||||
Args:
|
||||
method: HTTP method
|
||||
url: URL (relative or absolute)
|
||||
headers: Optional request headers
|
||||
body: Optional request body (string or dict)
|
||||
**kwargs: Additional arguments passed to requests.request
|
||||
|
||||
Returns:
|
||||
The response from the request
|
||||
"""
|
||||
full_url = self._build_url(url)
|
||||
|
||||
if isinstance(body, dict):
|
||||
body = json.dumps(body)
|
||||
|
||||
start_time = time.time()
|
||||
response = self.session.request(
|
||||
method=method,
|
||||
url=full_url,
|
||||
headers=headers,
|
||||
data=body,
|
||||
**kwargs
|
||||
)
|
||||
end_time = time.time()
|
||||
|
||||
latency_ms = int((end_time - start_time) * 1000)
|
||||
|
||||
request_data = RecordedRequest(
|
||||
method=method,
|
||||
url=full_url,
|
||||
headers=dict(headers) if headers else {},
|
||||
body=body
|
||||
)
|
||||
|
||||
response_data = RecordedResponse(
|
||||
status_code=response.status_code,
|
||||
headers=self._extract_headers(response),
|
||||
body=self._extract_body(response),
|
||||
latency_ms=latency_ms
|
||||
)
|
||||
|
||||
pair = RequestResponsePair(request=request_data, response=response_data)
|
||||
self.recordings.append(pair)
|
||||
|
||||
if self.on_request:
|
||||
self.on_request(pair)
|
||||
|
||||
return response
|
||||
|
||||
def get(self, url: str, **kwargs: Any) -> Response:
|
||||
"""Record a GET request."""
|
||||
return self.record_request("GET", url, **kwargs)
|
||||
|
||||
def post(self, url: str, **kwargs: Any) -> Response:
|
||||
"""Record a POST request."""
|
||||
return self.record_request("POST", url, **kwargs)
|
||||
|
||||
def put(self, url: str, **kwargs: Any) -> Response:
|
||||
"""Record a PUT request."""
|
||||
return self.record_request("PUT", url, **kwargs)
|
||||
|
||||
def patch(self, url: str, **kwargs: Any) -> Response:
|
||||
"""Record a PATCH request."""
|
||||
return self.record_request("PATCH", url, **kwargs)
|
||||
|
||||
def delete(self, url: str, **kwargs: Any) -> Response:
|
||||
"""Record a DELETE request."""
|
||||
return self.record_request("DELETE", url, **kwargs)
|
||||
|
||||
def head(self, url: str, **kwargs: Any) -> Response:
|
||||
"""Record a HEAD request."""
|
||||
return self.record_request("HEAD", url, **kwargs)
|
||||
|
||||
def options(self, url: str, **kwargs: Any) -> Response:
|
||||
"""Record an OPTIONS request."""
|
||||
return self.record_request("OPTIONS", url, **kwargs)
|
||||
|
||||
def get_recordings(self) -> List[RequestResponsePair]:
|
||||
"""Get all recorded request-response pairs."""
|
||||
return self.recordings
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear all recordings."""
|
||||
self.recordings.clear()
|
||||
|
||||
|
||||
def record_session(
|
||||
url: str,
|
||||
method: str = "GET",
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
body: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
on_record: Optional[Callable[[RequestResponsePair], None]] = None
|
||||
) -> List[RequestResponsePair]:
|
||||
"""Record a single HTTP request.
|
||||
|
||||
Args:
|
||||
url: The URL to request
|
||||
method: HTTP method (default: GET)
|
||||
headers: Optional request headers
|
||||
body: Optional request body
|
||||
base_url: Optional base URL for relative URLs
|
||||
on_record: Optional callback for each recording
|
||||
|
||||
Returns:
|
||||
List of recorded request-response pairs
|
||||
"""
|
||||
session = RecordingSession(base_url=base_url)
|
||||
|
||||
if on_record:
|
||||
session.on_request = on_record
|
||||
|
||||
session.record_request(method=method, url=url, headers=headers, body=body)
|
||||
|
||||
return session.get_recordings()
|
||||
|
||||
|
||||
def record_multiple(
|
||||
requests_config: List[Dict[str, Any]],
|
||||
base_url: Optional[str] = None,
|
||||
on_record: Optional[Callable[[RequestResponsePair], None]] = None
|
||||
) -> List[RequestResponsePair]:
|
||||
"""Record multiple HTTP requests.
|
||||
|
||||
Args:
|
||||
requests_config: List of request configurations with keys:
|
||||
- method: HTTP method
|
||||
- url: URL to request
|
||||
- headers: Optional headers
|
||||
- body: Optional body
|
||||
base_url: Optional base URL for relative URLs
|
||||
on_record: Optional callback for each recording
|
||||
|
||||
Returns:
|
||||
List of recorded request-response pairs
|
||||
"""
|
||||
session = RecordingSession(base_url=base_url)
|
||||
|
||||
if on_record:
|
||||
session.on_request = on_record
|
||||
|
||||
for config in requests_config:
|
||||
method = config.get("method", "GET")
|
||||
url = config.get("url")
|
||||
if not url:
|
||||
continue
|
||||
headers = config.get("headers")
|
||||
body = config.get("body")
|
||||
extra_kwargs = {k: v for k, v in config.items()
|
||||
if k not in ("method", "url", "headers", "body")}
|
||||
|
||||
session.record_request(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
body=body,
|
||||
**extra_kwargs
|
||||
)
|
||||
|
||||
return session.get_recordings()
|
||||
Reference in New Issue
Block a user