Add core modules: CLI, recorder, server, snapshot manager
Some checks failed
CI / test (push) Failing after 16s
CI / lint (push) Failing after 6s
CI / type-check (push) Failing after 10s

This commit is contained in:
2026-02-04 13:34:46 +00:00
parent 17e68a0f05
commit 33b639c4b6

View File

@@ -0,0 +1,87 @@
import json
import os
from typing import Any
SNAPSHOTS_DIR = "snapshots"
def get_snapshot_path(name: str) -> str:
"""Get the file path for a snapshot."""
os.makedirs(SNAPSHOTS_DIR, exist_ok=True)
return os.path.join(SNAPSHOTS_DIR, f"{name}.json")
def list_snapshots() -> list[dict[str, Any]]:
"""List all available snapshots."""
os.makedirs(SNAPSHOTS_DIR, exist_ok=True)
snapshots = []
for filename in os.listdir(SNAPSHOTS_DIR):
if filename.endswith(".json"):
path = os.path.join(SNAPSHOTS_DIR, filename)
with open(path, "r") as f:
data = json.load(f)
snapshots.append(
{
"name": filename[:-5],
"metadata": data.get("metadata", {}),
"requests_count": len(data.get("requests", [])),
}
)
return snapshots
def load_snapshot(name: str) -> dict[str, Any]:
"""Load a snapshot from file."""
path = get_snapshot_path(name)
if not os.path.exists(path):
raise FileNotFoundError(f"Snapshot '{name}' not found")
with open(path, "r") as f:
return json.load(f)
def save_snapshot(
name: str,
session_data: dict[str, Any],
description: str | None = None,
tags: list[str] | None = None,
source_url: str | None = None,
) -> str:
"""Save a snapshot to file."""
path = get_snapshot_path(name)
import time
metadata = {
"version": "1.0",
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"description": description or "",
"source_url": source_url or "",
"tags": tags or [],
"latency_mode": "original",
"custom_latency_ms": None,
}
snapshot = {
"metadata": metadata,
"requests": [session_data],
}
with open(path, "w") as f:
json.dump(snapshot, f, indent=2)
return path
def delete_snapshot(name: str) -> bool:
"""Delete a snapshot."""
path = get_snapshot_path(name)
if not os.path.exists(path):
return False
os.remove(path)
return True