import random import time from typing import Any from flask import Flask, Response, request from api_snapshot.snapshot import load_snapshot def create_app(snapshot: dict[str, Any]) -> Flask: """Create a Flask application for the mock server.""" app = Flask(__name__) requests_list = snapshot.get("requests", []) metadata = snapshot.get("metadata", {}) latency_mode = metadata.get("latency_mode", "original") custom_latency_ms = metadata.get("custom_latency_ms") def find_matching_response(req_method: str, req_path: str) -> dict[str, Any] | None: """Find a matching response from the snapshot.""" for item in requests_list: recorded_req = item.get("request", {}) recorded_url = recorded_req.get("url", "") if "://" in recorded_url: from urllib.parse import urlparse parsed = urlparse(recorded_url) recorded_path = parsed.path else: recorded_path = recorded_url if recorded_req.get("method") == req_method and recorded_path == req_path: return item.get("response") return None def apply_latency(latency_mode: str, custom_latency_ms: int | None) -> None: """Apply latency to the response.""" if latency_mode == "none": return if latency_mode == "fixed": delay_ms = custom_latency_ms or 100 time.sleep(delay_ms / 1000) elif latency_mode == "random": min_ms = metadata.get("latency_min_ms", 50) max_ms = metadata.get("latency_max_ms", 500) delay_ms = random.randint(min_ms, max_ms) time.sleep(delay_ms / 1000) elif latency_mode == "original": response = find_matching_response(request.method, request.path) if response: delay_ms = response.get("latency_ms", 0) if delay_ms > 0: time.sleep(delay_ms / 1000) @app.route("/", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) def handle_request(path: str) -> Response: req_method = request.method apply_latency(latency_mode, custom_latency_ms) response = find_matching_response(req_method, f"/{path}") if not response: return Response( response=f"No matching response found for {req_method} /{path}", status=404, mimetype="text/plain", ) status_code = response.get("status_code", 200) headers = response.get("headers", {}) body = response.get("body", "") if isinstance(body, dict): import json body = json.dumps(body) headers["Content-Type"] = headers.get("Content-Type", "application/json") return Response( response=body, status=status_code, headers=headers, mimetype=headers.get("Content-Type", "text/plain"), ) return app