diff --git a/.src/openapi_mock/cli/cli.py b/.src/openapi_mock/cli/cli.py new file mode 100644 index 0000000..599fc30 --- /dev/null +++ b/.src/openapi_mock/cli/cli.py @@ -0,0 +1,307 @@ +"""CLI interface for OpenAPI Mock Server.""" + +import os +import sys +import signal +import threading +from pathlib import Path +from typing import Optional + +import click +import uvicorn + +from openapi_mock.server.server import create_app +from openapi_mock.core.spec_parser import load_spec, SpecNotFoundError, SpecValidationError + + +def validate_delay(ctx: click.Context, param: click.Parameter, value: str) -> Optional[tuple]: + """Validate and parse delay option.""" + if value is None: + return None + + try: + if "," in value: + parts = value.split(",") + if len(parts) == 2: + return (float(parts[0].strip()), float(parts[1].strip())) + return (float(value), float(value)) + except ValueError: + raise click.BadParameter( + "Invalid delay format. Use --delay 0.5 or --delay 0.1,1.0" + ) + + +def validate_port(ctx: click.Context, param: click.Parameter, value: int) -> int: + """Validate port number.""" + if value < 1 or value > 65535: + raise click.BadParameter("Port must be between 1 and 65535") + return value + + +@click.group() +def main() -> None: + """OpenAPI Mock Server - Generate mock API servers from OpenAPI specifications.""" + pass + + +@click.command() +@click.argument( + "spec", + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), +) +@click.option( + "--host", + default="127.0.0.1", + help="Host address to bind to", + show_default=True, +) +@click.option( + "--port", + type=int, + default=8080, + help="Port to run the mock server on", + callback=validate_port, + show_default=True, +) +@click.option( + "--delay", + type=str, + default=None, + help="Response delay in seconds (e.g., 0.5 or 0.1,1.0 for range)", + callback=validate_delay, +) +@click.option( + "--watch/--no-watch", + default=False, + help="Enable hot-reload when spec file changes", +) +@click.option( + "--auth", + type=click.Choice(["none", "bearer", "api_key", "basic"], case_sensitive=False), + default="none", + help="Authentication type to simulate", + show_default=True, +) +@click.option( + "--reload/--no-reload", + default=False, + help="Automatically reload server on changes (for development)", +) +def start( + spec: str, + host: str, + port: int, + delay: Optional[tuple], + watch: bool, + auth: str, + reload: bool, +) -> None: + """Start the mock server from an OpenAPI specification file. + + SPEC is the path to the OpenAPI YAML or JSON specification file. + """ + spec_path = Path(spec).resolve() + + try: + spec_data = load_spec(str(spec_path)) + click.echo(f"Loaded OpenAPI spec: {spec_path}") + click.echo(f"API Title: {spec_data.get('info', {}).get('title', 'Unknown')}") + click.echo(f"Version: {spec_data.get('info', {}).get('version', 'Unknown')}") + except SpecNotFoundError: + click.echo(f"Error: Spec file not found: {spec_path}", err=True) + sys.exit(1) + except SpecValidationError as e: + click.echo(f"Error: Invalid OpenAPI spec: {e}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error loading spec: {e}", err=True) + sys.exit(1) + + if watch: + from openapi_mock.server.hot_reload import FileWatcher + + watcher = FileWatcher(str(spec_path), port, host, delay, auth) + + signal.signal(signal.SIGINT, lambda s, f: watcher.stop()) + signal.signal(signal.SIGTERM, lambda s, f: watcher.stop()) + + click.echo(f"Starting hot-reload server at http://{host}:{port}") + click.echo(f"Watching for changes to: {spec_path}") + click.echo("Press Ctrl+C to stop") + + try: + watcher.run() + except KeyboardInterrupt: + watcher.stop() + return + + if auth != "none": + click.echo(f"Authentication enabled: {auth}") + + if delay: + if delay[0] == delay[1]: + click.echo(f"Response delay: {delay[0]}s") + else: + click.echo(f"Response delay: {delay[0]}s - {delay[1]}s") + + config = uvicorn.Config( + create_app(str(spec_path), delay_range=delay, auth_type=auth), + host=host, + port=port, + reload=reload, + ) + server = uvicorn.Server(config) + + click.echo(f"\nStarting mock server at http://{host}:{port}") + click.echo("Press Ctrl+C to stop\n") + + try: + server.run() + except KeyboardInterrupt: + click.echo("\nShutting down server...") + + +@click.command() +@click.argument( + "spec", + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), +) +@click.option( + "--output", + "-o", + type=click.Path(file_okay=True, dir_okay=False, writable=True), + help="Output file for the generated app (default: stdout)", +) +def generate(spec: str, output: Optional[str]) -> None: + """Generate a standalone Python file with the mock server. + + SPEC is the path to the OpenAPI YAML or JSON specification file. + """ + spec_path = Path(spec).resolve() + + try: + spec_data = load_spec(str(spec_path)) + except (SpecNotFoundError, SpecValidationError) as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + title = spec_data.get("info", {}).get("title", "Mock API") + version = spec_data.get("info", {}).get("version", "1.0.0") + + code = f'''\"\"\"Auto-generated mock server from OpenAPI spec: {spec_path.name}\"\"\" + +from fastapi import FastAPI +from openapi_mock.generators.data_generator import DataGenerator + +app = FastAPI( + title="{title}", + version="{version}", +) + +generator = DataGenerator() + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="127.0.0.1", port=8080) +''' + + if output: + Path(output).write_text(code) + click.echo(f"Generated mock server: {output}") + else: + click.echo(code) + + +@click.command() +@click.argument( + "spec", + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), +) +def validate(spec: str) -> None: + """Validate an OpenAPI specification file. + + SPEC is the path to the OpenAPI YAML or JSON specification file. + """ + spec_path = Path(spec).resolve() + + try: + spec_data = load_spec(str(spec_path)) + click.echo(f"Valid OpenAPI spec: {spec_path}") + click.echo(f" Title: {spec_data.get('info', {}).get('title', 'Unknown')}") + click.echo(f" Version: {spec_data.get('info', {}).get('version', 'Unknown')}") + + paths = spec_data.get("paths", {}) + click.echo(f" Paths: {len(paths)}") + + schemas = spec_data.get("components", {}).get("schemas", {}) + if not schemas and "definitions" in spec_data: + schemas = spec_data.get("definitions", {}) + click.echo(f" Schemas: {len(schemas)}") + + click.echo("\nSpecification is valid") + except SpecNotFoundError: + click.echo(f"Spec file not found: {spec_path}", err=True) + sys.exit(1) + except SpecValidationError as e: + click.echo(f"Invalid OpenAPI spec: {e}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + +@click.command() +@click.argument( + "spec", + type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True), +) +def info(spec: str) -> None: + """Display information about an OpenAPI specification. + + SPEC is the path to the OpenAPI YAML or JSON specification file. + """ + spec_path = Path(spec).resolve() + + try: + spec_data = load_spec(str(spec_path)) + except (SpecNotFoundError, SpecValidationError) as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + info_data = spec_data.get("info", {}) + + click.echo(f"Title: {info_data.get('title', 'Unknown')}") + click.echo(f"Version: {info_data.get('version', 'Unknown')}") + click.echo(f"Description: {info_data.get('description', 'N/A')}") + + if info_data.get("contact"): + contact = info_data["contact"] + click.echo(f"Contact: {contact.get('name', 'N/A')} <{contact.get('email', 'N/A')}>") + + if info_data.get("license"): + license_info = info_data["license"] + click.echo(f"License: {license_info.get('name', 'N/A')}") + + paths = spec_data.get("paths", {}) + click.echo(f"\nPaths ({len(paths)}):") + for path, methods in sorted(paths.items()): + for method in sorted(methods.keys()): + if method.upper() in ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]: + operation = methods.get(method, {}) + summary = operation.get("summary", "") + click.echo(f" {method.upper():7} {path} {'- ' + summary if summary else ''}") + + schemas = spec_data.get("components", {}).get("schemas", {}) + if not schemas and "definitions" in spec_data: + schemas = spec_data.get("definitions", {}) + + if schemas: + click.echo(f"\nSchemas ({len(schemas)}):") + for name in sorted(schemas.keys()): + schema = schemas[name] + props = schema.get("properties", {}) + click.echo(f" {name} ({len(props)} properties)") + + +if __name__ == "__main__": + main()