"""Hot-reload functionality for automatic server restart on spec changes.""" import os import signal import subprocess import sys import threading import time from pathlib import Path from typing import Optional, Tuple from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer class FileWatcher: """Watch OpenAPI spec file and restart server on changes.""" def __init__( self, spec_path: str, port: int = 8080, host: str = "127.0.0.1", delay: Optional[Tuple[float, float]] = None, auth: str = "none", debounce_seconds: float = 0.5 ): """Initialize the file watcher. Args: spec_path: Path to the OpenAPI spec file. port: Port to run the server on. host: Host address to bind to. delay: Optional response delay range. auth: Authentication type. debounce_seconds: Debounce time to avoid multiple reloads. """ self.spec_path = Path(spec_path).resolve() self.port = port self.host = host self.delay = delay self.auth = auth self.debounce_seconds = debounce_seconds self.observer: Optional[Observer] = None self.server_process: Optional[subprocess.Popen] = None self._stop_event = threading.Event() self._reload_event = threading.Event() self._last_modified = 0.0 self._lock = threading.Lock() def _get_delay_str(self) -> str: """Get delay as string for CLI option.""" if self.delay is None: return "" if self.delay[0] == self.delay[1]: return f"--delay {self.delay[0]}" return f"--delay {self.delay[0]},{self.delay[1]}" def _start_server(self) -> subprocess.Popen: """Start the mock server process. Returns: Popen process handle. """ cmd = [ sys.executable, "-m", "openapi_mock.cli.cli", "start", str(self.spec_path), "--host", self.host, "--port", str(self.port), "--no-watch", ] if self.delay: cmd.extend(["--delay", self._get_delay_str().replace("--delay ", "")]) if self.auth != "none": cmd.extend(["--auth", self.auth]) env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env, ) return process def _read_output(self, process: subprocess.Popen) -> None: """Read and print process output. Args: process: Popen process to read from. """ try: for line in iter(process.stdout.readline, ""): if self._stop_event.is_set(): break if line: print(line, end="") except Exception: pass def run(self) -> None: """Run the file watcher and server.""" self._stop_event.clear() self._reload_event.clear() event_handler = FileSystemEventHandler() event_handler.on_modified = self._on_file_changed event_handler.on_created = self._on_file_changed event_handler.on_moved = self._on_file_changed self.observer = Observer() self.observer.schedule( event_handler, str(self.spec_path.parent), recursive=False ) self.observer.start() print(f"Watching: {self.spec_path}") self._restart_server() try: while not self._stop_event.is_set(): if self._reload_event.is_set(): self._reload_event.clear() self._restart_server() time.sleep(0.1) except Exception as e: print(f"Watcher error: {e}") finally: self.stop() def _on_file_changed(self, event) -> None: """Handle file change events. Args: event: File system event. """ if event.src_path != str(self.spec_path): return current_time = time.time() with self._lock: if current_time - self._last_modified < self.debounce_seconds: return self._last_modified = current_time print(f"\nDetected change in {self.spec_path}") self._reload_event.set() def _restart_server(self) -> None: """Restart the server process.""" self._stop_server() print("Restarting server...") time.sleep(0.2) self.server_process = self._start_server() output_thread = threading.Thread( target=self._read_output, args=(self.server_process,), daemon=True ) output_thread.start() def _stop_server(self) -> None: """Stop the server process.""" if self.server_process: try: self.server_process.terminate() self.server_process.wait(timeout=5) except subprocess.TimeoutExpired: self.server_process.kill() except Exception: pass self.server_process = None def stop(self) -> None: """Stop the file watcher and server.""" self._stop_event.set() if self.observer: try: self.observer.stop() self.observer.join(timeout=5) except Exception: pass self.observer = None self._stop_server()