diff --git a/app/api_snapshot/server/server.py b/app/api_snapshot/server/server.py new file mode 100644 index 0000000..c61fde8 --- /dev/null +++ b/app/api_snapshot/server/server.py @@ -0,0 +1,242 @@ +"""Flask-based mock server module.""" + +import random +import re +import time +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import parse_qs, urlparse + +from flask import Flask, Response, jsonify +from flask import request as flask_request + +from api_snapshot.snapshot.manager import Snapshot, SnapshotManager + + +def parse_path_parameters(path: str, recorded_path: str) -> Dict[str, str]: + """Extract path parameters from a matched path.""" + params = {} + + path_parts = path.strip("/").split("/") + recorded_parts = recorded_path.strip("/").split("/") + + for path_part, recorded_part in zip(path_parts, recorded_parts): + if recorded_part.startswith(":") and path_part: + param_name = recorded_part[1:] + params[param_name] = path_part + elif recorded_part.startswith("{") and recorded_part.endswith("}") and path_part: + param_name = recorded_part[1:-1] + params[param_name] = path_part + + return params + + +class MockServer: + """Flask-based mock server for replaying snapshots.""" + + def __init__( + self, + snapshot: Snapshot, + host: str = "127.0.0.1", + port: int = 8080, + latency_mode: str = "original", + fixed_latency_ms: Optional[int] = None, + random_latency_range: Optional[Tuple[int, int]] = None + ): + """Initialize the mock server. + + Args: + snapshot: The Snapshot to replay + host: Host to bind to + port: Port to listen on + latency_mode: "original", "fixed", "random", or "none" + fixed_latency_ms: Fixed latency in milliseconds (for "fixed" mode) + random_latency_range: Tuple of (min, max) ms for random latency + """ + self.snapshot = snapshot + self.host = host + self.port = port + self.latency_mode = latency_mode + self.fixed_latency_ms = fixed_latency_ms + self.random_latency_range = random_latency_range + + self.app = Flask(__name__) + self._setup_routes() + + def _get_latency_ms(self, original_latency: int = 0) -> int: + """Get the latency to apply based on configuration.""" + if self.latency_mode == "none": + return 0 + elif self.latency_mode == "fixed" and self.fixed_latency_ms is not None: + return self.fixed_latency_ms + elif self.latency_mode == "random" and self.random_latency_range: + return random.randint(*self.random_latency_range) + else: + return original_latency + + def _parse_query_params(self, url: str) -> Dict[str, List[str]]: + """Parse query parameters from URL.""" + parsed = urlparse(url) + return parse_qs(parsed.query) + + def _get_request_path(self, url: str) -> str: + """Extract path from URL for matching.""" + if url.startswith("http"): + parsed = urlparse(url) + return parsed.path + return url + + def _match_request( + self, + method: str, + path: str, + query_params: Dict[str, List[str]] + ) -> Optional[Any]: + """Match an incoming request to a recorded request.""" + request_path = path + + for pair in self.snapshot.requests: + recorded_method = pair.request.method.upper() + recorded_url = pair.request.url + + recorded_path = self._get_request_path(recorded_url) + + if recorded_method != method: + continue + + if recorded_path == request_path: + query_match = True + recorded_query = self._parse_query_params(recorded_url) + for key, values in query_params.items(): + if key not in recorded_query: + query_match = False + break + recorded_values = recorded_query.get(key, []) + if set(values) != set(recorded_values): + query_match = False + break + + if query_match: + return pair + + elif ":" in recorded_path or "{" in recorded_path: + regex = "^" + re.sub(r":(\w+)", r"(?P<\1>[^/]+)", recorded_path) + "$" + regex = regex.replace("{", "").replace("}", "") + try: + match = re.match(regex, request_path) + if match: + return pair + except re.error: + pass + + return None + + def _setup_routes(self) -> None: + """Set up Flask routes for the mock server.""" + + @self.app.route("/__snapshot-info", methods=["GET"]) + def snapshot_info(): + info = { + "name": "mock-server", + "description": self.snapshot.metadata.description, + "endpoints": len(self.snapshot.requests), + "latency_mode": self.latency_mode + } + return jsonify(info) + + @self.app.route( + "/", + methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"] + ) + def handle_request(subpath): + method = flask_request.method + path = "/" + subpath + query_params = {k: v for k, v in parse_qs(flask_request.query_string).items()} + + pair = self._match_request(method, path, query_params) + + if pair is None: + available_routes = [] + for p in self.snapshot.requests: + url = p.request.url + if url.startswith("http"): + parsed = urlparse(url) + path_only = parsed.path + else: + path_only = url + available_routes.append(f"{p.request.method} {path_only}") + + unique_routes = sorted(set(available_routes)) + + error_response = { + "error": "Route not found", + "method": method, + "path": path, + "available_routes": unique_routes + } + + return jsonify(error_response), 404 + + latency = self._get_latency_ms(pair.response.latency_ms) + + if latency > 0: + time.sleep(latency / 1000) + + headers = {} + for key, value in pair.response.headers.items(): + if key.lower() not in ("transfer-encoding", "content-encoding"): + headers[key] = value + + content_type = headers.get("Content-Type", "application/json") + + body = pair.response.body or "" + + response = Response(body, status=pair.response.status_code, headers=headers) + response.headers["Content-Type"] = content_type + + return response + + def run(self, debug: bool = False) -> None: + """Run the mock server.""" + print(f"Starting mock server on {self.host}:{self.port}") + print(f"Latency mode: {self.latency_mode}") + print(f"Endpoints: {len(self.snapshot.requests)}") + print("Press Ctrl+C to stop") + print("-" * 40) + + self.app.run(host=self.host, port=self.port, debug=debug, threaded=True) + + +def create_app_from_snapshot( + snapshot_path: str, + host: str = "127.0.0.1", + port: int = 8080, + latency_mode: str = "original", + fixed_latency_ms: Optional[int] = None, + random_latency_range: Optional[Tuple[int, int]] = None +) -> Tuple[Flask, "MockServer"]: + """Create a Flask app from a snapshot file. + + Args: + snapshot_path: Path to the snapshot file + host: Host to bind to + port: Port to listen on + latency_mode: Latency mode for replay + fixed_latency_ms: Fixed latency in milliseconds + random_latency_range: Random latency range (min, max) + + Returns: + Tuple of (Flask app, MockServer instance) + """ + manager = SnapshotManager() + snapshot = manager.load_snapshot(snapshot_path) + + server = MockServer( + snapshot=snapshot, + host=host, + port=port, + latency_mode=latency_mode, + fixed_latency_ms=fixed_latency_ms, + random_latency_range=random_latency_range + ) + + return server.app, server