diff --git a/.src/openapi_mock/server/hot_reload.py b/.src/openapi_mock/server/hot_reload.py new file mode 100644 index 0000000..af5b09d --- /dev/null +++ b/.src/openapi_mock/server/hot_reload.py @@ -0,0 +1,198 @@ +"""Hot-reload functionality for automatic server restart on spec changes.""" + +import os +import signal +import subprocess +import sys +import threading +import time +from pathlib import Path +from typing import Optional, Tuple + +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + + +class FileWatcher: + """Watch OpenAPI spec file and restart server on changes.""" + + def __init__( + self, + spec_path: str, + port: int = 8080, + host: str = "127.0.0.1", + delay: Optional[Tuple[float, float]] = None, + auth: str = "none", + debounce_seconds: float = 0.5 + ): + """Initialize the file watcher. + + Args: + spec_path: Path to the OpenAPI spec file. + port: Port to run the server on. + host: Host address to bind to. + delay: Optional response delay range. + auth: Authentication type. + debounce_seconds: Debounce time to avoid multiple reloads. + """ + self.spec_path = Path(spec_path).resolve() + self.port = port + self.host = host + self.delay = delay + self.auth = auth + self.debounce_seconds = debounce_seconds + + self.observer: Optional[Observer] = None + self.server_process: Optional[subprocess.Popen] = None + self._stop_event = threading.Event() + self._reload_event = threading.Event() + self._last_modified = 0.0 + self._lock = threading.Lock() + + def _get_delay_str(self) -> str: + """Get delay as string for CLI option.""" + if self.delay is None: + return "" + if self.delay[0] == self.delay[1]: + return f"--delay {self.delay[0]}" + return f"--delay {self.delay[0]},{self.delay[1]}" + + def _start_server(self) -> subprocess.Popen: + """Start the mock server process. + + Returns: + Popen process handle. + """ + cmd = [ + sys.executable, "-m", "openapi_mock.cli.cli", "start", + str(self.spec_path), + "--host", self.host, + "--port", str(self.port), + "--no-watch", + ] + + if self.delay: + cmd.extend(["--delay", self._get_delay_str().replace("--delay ", "")]) + + if self.auth != "none": + cmd.extend(["--auth", self.auth]) + + env = os.environ.copy() + env["PYTHONUNBUFFERED"] = "1" + + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + env=env, + ) + + return process + + def _read_output(self, process: subprocess.Popen) -> None: + """Read and print process output. + + Args: + process: Popen process to read from. + """ + try: + for line in iter(process.stdout.readline, ""): + if self._stop_event.is_set(): + break + if line: + print(line, end="") + except Exception: + pass + + def run(self) -> None: + """Run the file watcher and server.""" + self._stop_event.clear() + self._reload_event.clear() + + event_handler = FileSystemEventHandler() + event_handler.on_modified = self._on_file_changed + event_handler.on_created = self._on_file_changed + event_handler.on_moved = self._on_file_changed + + self.observer = Observer() + self.observer.schedule( + event_handler, + str(self.spec_path.parent), + recursive=False + ) + self.observer.start() + + print(f"Watching: {self.spec_path}") + + self._restart_server() + + try: + while not self._stop_event.is_set(): + if self._reload_event.is_set(): + self._reload_event.clear() + self._restart_server() + time.sleep(0.1) + except Exception as e: + print(f"Watcher error: {e}") + finally: + self.stop() + + def _on_file_changed(self, event) -> None: + """Handle file change events. + + Args: + event: File system event. + """ + if event.src_path != str(self.spec_path): + return + + current_time = time.time() + with self._lock: + if current_time - self._last_modified < self.debounce_seconds: + return + self._last_modified = current_time + + print(f"\nDetected change in {self.spec_path}") + self._reload_event.set() + + def _restart_server(self) -> None: + """Restart the server process.""" + self._stop_server() + + print("Restarting server...") + time.sleep(0.2) + + self.server_process = self._start_server() + output_thread = threading.Thread( + target=self._read_output, + args=(self.server_process,), + daemon=True + ) + output_thread.start() + + def _stop_server(self) -> None: + """Stop the server process.""" + if self.server_process: + try: + self.server_process.terminate() + self.server_process.wait(timeout=5) + except subprocess.TimeoutExpired: + self.server_process.kill() + except Exception: + pass + self.server_process = None + + def stop(self) -> None: + """Stop the file watcher and server.""" + self._stop_event.set() + + if self.observer: + try: + self.observer.stop() + self.observer.join(timeout=5) + except Exception: + pass + self.observer = None + + self._stop_server()