Initial commit: Add OpenAPI Mock Server project
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-01-30 03:41:44 +00:00
parent daea37577c
commit c39fe2749e

View File

@@ -0,0 +1,198 @@
"""Hot-reload functionality for automatic server restart on spec changes."""
import os
import signal
import subprocess
import sys
import threading
import time
from pathlib import Path
from typing import Optional, Tuple
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
class FileWatcher:
"""Watch OpenAPI spec file and restart server on changes."""
def __init__(
self,
spec_path: str,
port: int = 8080,
host: str = "127.0.0.1",
delay: Optional[Tuple[float, float]] = None,
auth: str = "none",
debounce_seconds: float = 0.5
):
"""Initialize the file watcher.
Args:
spec_path: Path to the OpenAPI spec file.
port: Port to run the server on.
host: Host address to bind to.
delay: Optional response delay range.
auth: Authentication type.
debounce_seconds: Debounce time to avoid multiple reloads.
"""
self.spec_path = Path(spec_path).resolve()
self.port = port
self.host = host
self.delay = delay
self.auth = auth
self.debounce_seconds = debounce_seconds
self.observer: Optional[Observer] = None
self.server_process: Optional[subprocess.Popen] = None
self._stop_event = threading.Event()
self._reload_event = threading.Event()
self._last_modified = 0.0
self._lock = threading.Lock()
def _get_delay_str(self) -> str:
"""Get delay as string for CLI option."""
if self.delay is None:
return ""
if self.delay[0] == self.delay[1]:
return f"--delay {self.delay[0]}"
return f"--delay {self.delay[0]},{self.delay[1]}"
def _start_server(self) -> subprocess.Popen:
"""Start the mock server process.
Returns:
Popen process handle.
"""
cmd = [
sys.executable, "-m", "openapi_mock.cli.cli", "start",
str(self.spec_path),
"--host", self.host,
"--port", str(self.port),
"--no-watch",
]
if self.delay:
cmd.extend(["--delay", self._get_delay_str().replace("--delay ", "")])
if self.auth != "none":
cmd.extend(["--auth", self.auth])
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
return process
def _read_output(self, process: subprocess.Popen) -> None:
"""Read and print process output.
Args:
process: Popen process to read from.
"""
try:
for line in iter(process.stdout.readline, ""):
if self._stop_event.is_set():
break
if line:
print(line, end="")
except Exception:
pass
def run(self) -> None:
"""Run the file watcher and server."""
self._stop_event.clear()
self._reload_event.clear()
event_handler = FileSystemEventHandler()
event_handler.on_modified = self._on_file_changed
event_handler.on_created = self._on_file_changed
event_handler.on_moved = self._on_file_changed
self.observer = Observer()
self.observer.schedule(
event_handler,
str(self.spec_path.parent),
recursive=False
)
self.observer.start()
print(f"Watching: {self.spec_path}")
self._restart_server()
try:
while not self._stop_event.is_set():
if self._reload_event.is_set():
self._reload_event.clear()
self._restart_server()
time.sleep(0.1)
except Exception as e:
print(f"Watcher error: {e}")
finally:
self.stop()
def _on_file_changed(self, event) -> None:
"""Handle file change events.
Args:
event: File system event.
"""
if event.src_path != str(self.spec_path):
return
current_time = time.time()
with self._lock:
if current_time - self._last_modified < self.debounce_seconds:
return
self._last_modified = current_time
print(f"\nDetected change in {self.spec_path}")
self._reload_event.set()
def _restart_server(self) -> None:
"""Restart the server process."""
self._stop_server()
print("Restarting server...")
time.sleep(0.2)
self.server_process = self._start_server()
output_thread = threading.Thread(
target=self._read_output,
args=(self.server_process,),
daemon=True
)
output_thread.start()
def _stop_server(self) -> None:
"""Stop the server process."""
if self.server_process:
try:
self.server_process.terminate()
self.server_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.server_process.kill()
except Exception:
pass
self.server_process = None
def stop(self) -> None:
"""Stop the file watcher and server."""
self._stop_event.set()
if self.observer:
try:
self.observer.stop()
self.observer.join(timeout=5)
except Exception:
pass
self.observer = None
self._stop_server()