Add core modules: CLI, recorder, server, snapshot manager
This commit is contained in:
90
api_snapshot/server/server.py
Normal file
90
api_snapshot/server/server.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import random
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from flask import Flask, Response, request
|
||||
|
||||
from api_snapshot.snapshot import load_snapshot
|
||||
|
||||
|
||||
def create_app(snapshot: dict[str, Any]) -> Flask:
|
||||
"""Create a Flask application for the mock server."""
|
||||
app = Flask(__name__)
|
||||
|
||||
requests_list = snapshot.get("requests", [])
|
||||
metadata = snapshot.get("metadata", {})
|
||||
latency_mode = metadata.get("latency_mode", "original")
|
||||
custom_latency_ms = metadata.get("custom_latency_ms")
|
||||
|
||||
def find_matching_response(req_method: str, req_path: str) -> dict[str, Any] | None:
|
||||
"""Find a matching response from the snapshot."""
|
||||
for item in requests_list:
|
||||
recorded_req = item.get("request", {})
|
||||
recorded_url = recorded_req.get("url", "")
|
||||
|
||||
if "://" in recorded_url:
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed = urlparse(recorded_url)
|
||||
recorded_path = parsed.path
|
||||
else:
|
||||
recorded_path = recorded_url
|
||||
|
||||
if recorded_req.get("method") == req_method and recorded_path == req_path:
|
||||
return item.get("response")
|
||||
|
||||
return None
|
||||
|
||||
def apply_latency(latency_mode: str, custom_latency_ms: int | None) -> None:
|
||||
"""Apply latency to the response."""
|
||||
if latency_mode == "none":
|
||||
return
|
||||
|
||||
if latency_mode == "fixed":
|
||||
delay_ms = custom_latency_ms or 100
|
||||
time.sleep(delay_ms / 1000)
|
||||
elif latency_mode == "random":
|
||||
min_ms = metadata.get("latency_min_ms", 50)
|
||||
max_ms = metadata.get("latency_max_ms", 500)
|
||||
delay_ms = random.randint(min_ms, max_ms)
|
||||
time.sleep(delay_ms / 1000)
|
||||
elif latency_mode == "original":
|
||||
response = find_matching_response(request.method, request.path)
|
||||
if response:
|
||||
delay_ms = response.get("latency_ms", 0)
|
||||
if delay_ms > 0:
|
||||
time.sleep(delay_ms / 1000)
|
||||
|
||||
@app.route("/<path:path>", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
|
||||
def handle_request(path: str) -> Response:
|
||||
req_method = request.method
|
||||
|
||||
apply_latency(latency_mode, custom_latency_ms)
|
||||
|
||||
response = find_matching_response(req_method, f"/{path}")
|
||||
|
||||
if not response:
|
||||
return Response(
|
||||
response=f"No matching response found for {req_method} /{path}",
|
||||
status=404,
|
||||
mimetype="text/plain",
|
||||
)
|
||||
|
||||
status_code = response.get("status_code", 200)
|
||||
headers = response.get("headers", {})
|
||||
body = response.get("body", "")
|
||||
|
||||
if isinstance(body, dict):
|
||||
import json
|
||||
|
||||
body = json.dumps(body)
|
||||
headers["Content-Type"] = headers.get("Content-Type", "application/json")
|
||||
|
||||
return Response(
|
||||
response=body,
|
||||
status=status_code,
|
||||
headers=headers,
|
||||
mimetype=headers.get("Content-Type", "text/plain"),
|
||||
)
|
||||
|
||||
return app
|
||||
Reference in New Issue
Block a user