308 lines
8.9 KiB
Python
308 lines
8.9 KiB
Python
"""CLI interface for OpenAPI Mock Server."""
|
|
|
|
import os
|
|
import sys
|
|
import signal
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import click
|
|
import uvicorn
|
|
|
|
from openapi_mock.server.server import create_app
|
|
from openapi_mock.core.spec_parser import load_spec, SpecNotFoundError, SpecValidationError
|
|
|
|
|
|
def validate_delay(ctx: click.Context, param: click.Parameter, value: str) -> Optional[tuple]:
|
|
"""Validate and parse delay option."""
|
|
if value is None:
|
|
return None
|
|
|
|
try:
|
|
if "," in value:
|
|
parts = value.split(",")
|
|
if len(parts) == 2:
|
|
return (float(parts[0].strip()), float(parts[1].strip()))
|
|
return (float(value), float(value))
|
|
except ValueError:
|
|
raise click.BadParameter(
|
|
"Invalid delay format. Use --delay 0.5 or --delay 0.1,1.0"
|
|
)
|
|
|
|
|
|
def validate_port(ctx: click.Context, param: click.Parameter, value: int) -> int:
|
|
"""Validate port number."""
|
|
if value < 1 or value > 65535:
|
|
raise click.BadParameter("Port must be between 1 and 65535")
|
|
return value
|
|
|
|
|
|
@click.group()
|
|
def main() -> None:
|
|
"""OpenAPI Mock Server - Generate mock API servers from OpenAPI specifications."""
|
|
pass
|
|
|
|
|
|
@click.command()
|
|
@click.argument(
|
|
"spec",
|
|
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
|
|
)
|
|
@click.option(
|
|
"--host",
|
|
default="127.0.0.1",
|
|
help="Host address to bind to",
|
|
show_default=True,
|
|
)
|
|
@click.option(
|
|
"--port",
|
|
type=int,
|
|
default=8080,
|
|
help="Port to run the mock server on",
|
|
callback=validate_port,
|
|
show_default=True,
|
|
)
|
|
@click.option(
|
|
"--delay",
|
|
type=str,
|
|
default=None,
|
|
help="Response delay in seconds (e.g., 0.5 or 0.1,1.0 for range)",
|
|
callback=validate_delay,
|
|
)
|
|
@click.option(
|
|
"--watch/--no-watch",
|
|
default=False,
|
|
help="Enable hot-reload when spec file changes",
|
|
)
|
|
@click.option(
|
|
"--auth",
|
|
type=click.Choice(["none", "bearer", "api_key", "basic"], case_sensitive=False),
|
|
default="none",
|
|
help="Authentication type to simulate",
|
|
show_default=True,
|
|
)
|
|
@click.option(
|
|
"--reload/--no-reload",
|
|
default=False,
|
|
help="Automatically reload server on changes (for development)",
|
|
)
|
|
def start(
|
|
spec: str,
|
|
host: str,
|
|
port: int,
|
|
delay: Optional[tuple],
|
|
watch: bool,
|
|
auth: str,
|
|
reload: bool,
|
|
) -> None:
|
|
"""Start the mock server from an OpenAPI specification file.
|
|
|
|
SPEC is the path to the OpenAPI YAML or JSON specification file.
|
|
"""
|
|
spec_path = Path(spec).resolve()
|
|
|
|
try:
|
|
spec_data = load_spec(str(spec_path))
|
|
click.echo(f"Loaded OpenAPI spec: {spec_path}")
|
|
click.echo(f"API Title: {spec_data.get('info', {}).get('title', 'Unknown')}")
|
|
click.echo(f"Version: {spec_data.get('info', {}).get('version', 'Unknown')}")
|
|
except SpecNotFoundError:
|
|
click.echo(f"Error: Spec file not found: {spec_path}", err=True)
|
|
sys.exit(1)
|
|
except SpecValidationError as e:
|
|
click.echo(f"Error: Invalid OpenAPI spec: {e}", err=True)
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
click.echo(f"Error loading spec: {e}", err=True)
|
|
sys.exit(1)
|
|
|
|
if watch:
|
|
from openapi_mock.server.hot_reload import FileWatcher
|
|
|
|
watcher = FileWatcher(str(spec_path), port, host, delay, auth)
|
|
|
|
signal.signal(signal.SIGINT, lambda s, f: watcher.stop())
|
|
signal.signal(signal.SIGTERM, lambda s, f: watcher.stop())
|
|
|
|
click.echo(f"Starting hot-reload server at http://{host}:{port}")
|
|
click.echo(f"Watching for changes to: {spec_path}")
|
|
click.echo("Press Ctrl+C to stop")
|
|
|
|
try:
|
|
watcher.run()
|
|
except KeyboardInterrupt:
|
|
watcher.stop()
|
|
return
|
|
|
|
if auth != "none":
|
|
click.echo(f"Authentication enabled: {auth}")
|
|
|
|
if delay:
|
|
if delay[0] == delay[1]:
|
|
click.echo(f"Response delay: {delay[0]}s")
|
|
else:
|
|
click.echo(f"Response delay: {delay[0]}s - {delay[1]}s")
|
|
|
|
config = uvicorn.Config(
|
|
create_app(str(spec_path), delay_range=delay, auth_type=auth),
|
|
host=host,
|
|
port=port,
|
|
reload=reload,
|
|
)
|
|
server = uvicorn.Server(config)
|
|
|
|
click.echo(f"\nStarting mock server at http://{host}:{port}")
|
|
click.echo("Press Ctrl+C to stop\n")
|
|
|
|
try:
|
|
server.run()
|
|
except KeyboardInterrupt:
|
|
click.echo("\nShutting down server...")
|
|
|
|
|
|
@click.command()
|
|
@click.argument(
|
|
"spec",
|
|
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
|
|
)
|
|
@click.option(
|
|
"--output",
|
|
"-o",
|
|
type=click.Path(file_okay=True, dir_okay=False, writable=True),
|
|
help="Output file for the generated app (default: stdout)",
|
|
)
|
|
def generate(spec: str, output: Optional[str]) -> None:
|
|
"""Generate a standalone Python file with the mock server.
|
|
|
|
SPEC is the path to the OpenAPI YAML or JSON specification file.
|
|
"""
|
|
spec_path = Path(spec).resolve()
|
|
|
|
try:
|
|
spec_data = load_spec(str(spec_path))
|
|
except (SpecNotFoundError, SpecValidationError) as e:
|
|
click.echo(f"Error: {e}", err=True)
|
|
sys.exit(1)
|
|
|
|
title = spec_data.get("info", {}).get("title", "Mock API")
|
|
version = spec_data.get("info", {}).get("version", "1.0.0")
|
|
|
|
code = f'''\"\"\"Auto-generated mock server from OpenAPI spec: {spec_path.name}\"\"\"
|
|
|
|
from fastapi import FastAPI
|
|
from openapi_mock.generators.data_generator import DataGenerator
|
|
|
|
app = FastAPI(
|
|
title="{title}",
|
|
version="{version}",
|
|
)
|
|
|
|
generator = DataGenerator()
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="127.0.0.1", port=8080)
|
|
'''
|
|
|
|
if output:
|
|
Path(output).write_text(code)
|
|
click.echo(f"Generated mock server: {output}")
|
|
else:
|
|
click.echo(code)
|
|
|
|
|
|
@click.command()
|
|
@click.argument(
|
|
"spec",
|
|
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
|
|
)
|
|
def validate(spec: str) -> None:
|
|
"""Validate an OpenAPI specification file.
|
|
|
|
SPEC is the path to the OpenAPI YAML or JSON specification file.
|
|
"""
|
|
spec_path = Path(spec).resolve()
|
|
|
|
try:
|
|
spec_data = load_spec(str(spec_path))
|
|
click.echo(f"Valid OpenAPI spec: {spec_path}")
|
|
click.echo(f" Title: {spec_data.get('info', {}).get('title', 'Unknown')}")
|
|
click.echo(f" Version: {spec_data.get('info', {}).get('version', 'Unknown')}")
|
|
|
|
paths = spec_data.get("paths", {})
|
|
click.echo(f" Paths: {len(paths)}")
|
|
|
|
schemas = spec_data.get("components", {}).get("schemas", {})
|
|
if not schemas and "definitions" in spec_data:
|
|
schemas = spec_data.get("definitions", {})
|
|
click.echo(f" Schemas: {len(schemas)}")
|
|
|
|
click.echo("\nSpecification is valid")
|
|
except SpecNotFoundError:
|
|
click.echo(f"Spec file not found: {spec_path}", err=True)
|
|
sys.exit(1)
|
|
except SpecValidationError as e:
|
|
click.echo(f"Invalid OpenAPI spec: {e}", err=True)
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
click.echo(f"Error: {e}", err=True)
|
|
sys.exit(1)
|
|
|
|
|
|
@click.command()
|
|
@click.argument(
|
|
"spec",
|
|
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
|
|
)
|
|
def info(spec: str) -> None:
|
|
"""Display information about an OpenAPI specification.
|
|
|
|
SPEC is the path to the OpenAPI YAML or JSON specification file.
|
|
"""
|
|
spec_path = Path(spec).resolve()
|
|
|
|
try:
|
|
spec_data = load_spec(str(spec_path))
|
|
except (SpecNotFoundError, SpecValidationError) as e:
|
|
click.echo(f"Error: {e}", err=True)
|
|
sys.exit(1)
|
|
|
|
info_data = spec_data.get("info", {})
|
|
|
|
click.echo(f"Title: {info_data.get('title', 'Unknown')}")
|
|
click.echo(f"Version: {info_data.get('version', 'Unknown')}")
|
|
click.echo(f"Description: {info_data.get('description', 'N/A')}")
|
|
|
|
if info_data.get("contact"):
|
|
contact = info_data["contact"]
|
|
click.echo(f"Contact: {contact.get('name', 'N/A')} <{contact.get('email', 'N/A')}>")
|
|
|
|
if info_data.get("license"):
|
|
license_info = info_data["license"]
|
|
click.echo(f"License: {license_info.get('name', 'N/A')}")
|
|
|
|
paths = spec_data.get("paths", {})
|
|
click.echo(f"\nPaths ({len(paths)}):")
|
|
for path, methods in sorted(paths.items()):
|
|
for method in sorted(methods.keys()):
|
|
if method.upper() in ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]:
|
|
operation = methods.get(method, {})
|
|
summary = operation.get("summary", "")
|
|
click.echo(f" {method.upper():7} {path} {'- ' + summary if summary else ''}")
|
|
|
|
schemas = spec_data.get("components", {}).get("schemas", {})
|
|
if not schemas and "definitions" in spec_data:
|
|
schemas = spec_data.get("definitions", {})
|
|
|
|
if schemas:
|
|
click.echo(f"\nSchemas ({len(schemas)}):")
|
|
for name in sorted(schemas.keys()):
|
|
schema = schemas[name]
|
|
props = schema.get("properties", {})
|
|
click.echo(f" {name} ({len(props)} properties)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|