From 5b04166ca68026470835650582885d0bbb141a16 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Wed, 4 Feb 2026 13:34:44 +0000 Subject: [PATCH] Add core modules: CLI, recorder, server, snapshot manager --- api_snapshot/server/server.py | 90 +++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 api_snapshot/server/server.py diff --git a/api_snapshot/server/server.py b/api_snapshot/server/server.py new file mode 100644 index 0000000..02ded15 --- /dev/null +++ b/api_snapshot/server/server.py @@ -0,0 +1,90 @@ +import random +import time +from typing import Any + +from flask import Flask, Response, request + +from api_snapshot.snapshot import load_snapshot + + +def create_app(snapshot: dict[str, Any]) -> Flask: + """Create a Flask application for the mock server.""" + app = Flask(__name__) + + requests_list = snapshot.get("requests", []) + metadata = snapshot.get("metadata", {}) + latency_mode = metadata.get("latency_mode", "original") + custom_latency_ms = metadata.get("custom_latency_ms") + + def find_matching_response(req_method: str, req_path: str) -> dict[str, Any] | None: + """Find a matching response from the snapshot.""" + for item in requests_list: + recorded_req = item.get("request", {}) + recorded_url = recorded_req.get("url", "") + + if "://" in recorded_url: + from urllib.parse import urlparse + + parsed = urlparse(recorded_url) + recorded_path = parsed.path + else: + recorded_path = recorded_url + + if recorded_req.get("method") == req_method and recorded_path == req_path: + return item.get("response") + + return None + + def apply_latency(latency_mode: str, custom_latency_ms: int | None) -> None: + """Apply latency to the response.""" + if latency_mode == "none": + return + + if latency_mode == "fixed": + delay_ms = custom_latency_ms or 100 + time.sleep(delay_ms / 1000) + elif latency_mode == "random": + min_ms = metadata.get("latency_min_ms", 50) + max_ms = metadata.get("latency_max_ms", 500) + delay_ms = random.randint(min_ms, max_ms) + time.sleep(delay_ms / 1000) + elif latency_mode == "original": + response = find_matching_response(request.method, request.path) + if response: + delay_ms = response.get("latency_ms", 0) + if delay_ms > 0: + time.sleep(delay_ms / 1000) + + @app.route("/", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) + def handle_request(path: str) -> Response: + req_method = request.method + + apply_latency(latency_mode, custom_latency_ms) + + response = find_matching_response(req_method, f"/{path}") + + if not response: + return Response( + response=f"No matching response found for {req_method} /{path}", + status=404, + mimetype="text/plain", + ) + + status_code = response.get("status_code", 200) + headers = response.get("headers", {}) + body = response.get("body", "") + + if isinstance(body, dict): + import json + + body = json.dumps(body) + headers["Content-Type"] = headers.get("Content-Type", "application/json") + + return Response( + response=body, + status=status_code, + headers=headers, + mimetype=headers.get("Content-Type", "text/plain"), + ) + + return app