diff --git a/.src/openapi_mock/server/server.py b/.src/openapi_mock/server/server.py new file mode 100644 index 0000000..3291604 --- /dev/null +++ b/.src/openapi_mock/server/server.py @@ -0,0 +1,347 @@ +"""FastAPI server with dynamic route generation from OpenAPI specs.""" + +import re +from typing import Any, Callable, Dict, List, Optional +from fastapi import FastAPI, Request, Response, Query, Path +from fastapi.responses import JSONResponse +from starlette.middleware.base import BaseHTTPMiddleware + +from openapi_mock.core.spec_parser import ( + load_spec, + extract_paths, + extract_schemas, + extract_path_params, + get_operation_id, + get_response_schema, +) +from openapi_mock.generators.data_generator import DataGenerator + + +class ResponseDelayMiddleware(BaseHTTPMiddleware): + """Middleware to add configurable response delays.""" + + def __init__(self, app: FastAPI, delay_range: Optional[tuple] = None): + """Initialize the delay middleware. + + Args: + app: FastAPI application. + delay_range: Optional tuple of (min_delay, max_delay) in seconds. + """ + super().__init__(app) + self.delay_range = delay_range + + async def dispatch(self, request: Request, call_next: Callable) -> Response: + """Add delay before processing request. + + Args: + request: The incoming request. + call_next: The next middleware/handler. + + Returns: + The response. + """ + import asyncio + import random + + if self.delay_range: + min_delay, max_delay = self.delay_range + delay = random.uniform(min_delay, max_delay) + await asyncio.sleep(delay) + + response = await call_next(request) + return response + + +class AuthMiddleware(BaseHTTPMiddleware): + """Middleware for API authentication simulation.""" + + def __init__( + self, + app: FastAPI, + auth_type: str = 'none', + api_keys: Optional[List[str]] = None + ): + """Initialize the auth middleware. + + Args: + app: FastAPI application. + auth_type: Type of authentication (none, bearer, api_key, basic). + api_keys: List of valid API keys. + """ + super().__init__(app) + self.auth_type = auth_type + self.api_keys = api_keys or [] + + async def dispatch(self, request: Request, call_next: Callable) -> Response: + """Validate authentication credentials. + + Args: + request: The incoming request. + call_next: The next middleware/handler. + + Returns: + The response or error. + """ + if self.auth_type == 'none': + return await call_next(request) + + auth_header = request.headers.get('Authorization', '') + api_key = request.headers.get('X-API-Key', '') + + if self.auth_type == 'bearer': + if not auth_header.startswith('Bearer '): + return JSONResponse( + status_code=401, + content={'error': 'Missing or invalid Authorization header'} + ) + token = auth_header[7:] + if not token: + return JSONResponse( + status_code=401, + content={'error': 'Invalid token'} + ) + + elif self.auth_type == 'api_key': + if not api_key and not auth_header.startswith('ApiKey '): + return JSONResponse( + status_code=401, + content={'error': 'Missing API key'} + ) + key = api_key or auth_header[7:] + if key not in self.api_keys: + return JSONResponse( + status_code=401, + content={'error': 'Invalid API key'} + ) + + elif self.auth_type == 'basic': + if not auth_header.startswith('Basic '): + return JSONResponse( + status_code=401, + content={'error': 'Missing or invalid Authorization header'} + ) + import base64 + try: + decoded = base64.b64decode(auth_header[6:]).decode('utf-8') + username, password = decoded.split(':', 1) + if not self._validate_basic_credentials(username, password): + return JSONResponse( + status_code=401, + content={'error': 'Invalid credentials'} + ) + except Exception: + return JSONResponse( + status_code=401, + content={'error': 'Invalid authorization format'} + ) + + return await call_next(request) + + def _validate_basic_credentials(self, username: str, password: str) -> bool: + """Validate basic auth credentials. + + Args: + username: Username from Basic auth. + password: Password from Basic auth. + + Returns: + True if credentials are valid. + """ + return bool(username and password) + + +def _convert_param_name(name: str) -> str: + """Convert parameter name to valid Python identifier. + + Args: + name: Parameter name from OpenAPI spec. + + Returns: + Valid Python identifier. + """ + return re.sub(r'[^a-zA-Z0-9_]', '_', name) + + +class OpenAPIMockServer: + """Mock server generated from OpenAPI specification.""" + + def __init__( + self, + spec_path: str, + delay_range: Optional[tuple] = None, + auth_type: str = 'none', + auth_config: Optional[Dict[str, Any]] = None + ): + """Initialize the mock server. + + Args: + spec_path: Path to the OpenAPI specification file. + delay_range: Optional response delay range (min, max) in seconds. + auth_type: Type of authentication to simulate. + auth_config: Additional authentication configuration. + """ + self.spec_path = spec_path + self.spec = load_spec(spec_path) + self.delay_range = delay_range + self.auth_type = auth_type + self.auth_config = auth_config or {} + + self.app = FastAPI( + title=self.spec.get('info', {}).get('title', 'Mock API'), + version=self.spec.get('info', {}).get('version', '1.0.0'), + description=self.spec.get('info', {}).get('description', ''), + openapi_version=self.spec.get('openapi', '3.0.0'), + ) + + self._setup_middleware() + self._setup_routes() + + def _setup_middleware(self) -> None: + """Set up middleware for delay and authentication.""" + if self.delay_range: + self.app.add_middleware(ResponseDelayMiddleware, delay_range=self.delay_range) + + if self.auth_type != 'none': + api_keys = self.auth_config.get('api_keys', []) + self.app.add_middleware( + AuthMiddleware, + auth_type=self.auth_type, + api_keys=api_keys + ) + + def _setup_routes(self) -> None: + """Set up routes from the OpenAPI spec.""" + paths = extract_paths(self.spec) + schemas = extract_schemas(self.spec) + self.data_generator = DataGenerator() + self.data_generator.set_ref_cache(schemas) + + for path, path_item in paths.items(): + path_params = extract_path_params(path) + + for method, operation in path_item.items(): + if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']: + continue + + operation_id = get_operation_id(path, method) + summary = operation.get('summary', '') + description = operation.get('description', '') + tags = operation.get('tags', []) + + response_schema = get_response_schema(self.spec, path, method.lower()) + + self._create_route_handler( + path=path, + method=method.lower(), + path_params=path_params, + operation=operation, + response_schema=response_schema, + operation_id=operation_id, + summary=summary, + description=description, + tags=tags + ) + + def _create_route_handler( + self, + path: str, + method: str, + path_params: List[str], + operation: Dict[str, Any], + response_schema: Optional[Dict[str, Any]], + operation_id: str, + summary: str, + description: str, + tags: List[str] + ) -> None: + """Create and register a route handler. + + Args: + path: API path. + method: HTTP method. + path_params: List of path parameter names. + operation: Operation definition from spec. + response_schema: Response schema. + operation_id: Generated operation ID. + summary: Operation summary. + description: Operation description. + tags: Operation tags. + """ + parameters = operation.get('parameters', []) + + param_types: Dict[str, Any] = {} + + for param in parameters: + param_name = param.get('name', '') + param_in = param.get('in', 'query') + param_schema = param.get('schema', {}) + param_type = param_schema.get('type', 'string') + param_default = param_schema.get('default') + + if param_in == 'path': + param_types[param_name] = (Path, param_type, param_default) + elif param_in == 'query': + param_types[param_name] = (Query, param_type, param_default) + + def create_handler( + method: str, + response_schema: Optional[Dict[str, Any]], + param_types: Dict[str, Any] + ): + async def handler(request: Request) -> Dict[str, Any]: + if response_schema: + return self.data_generator.generate(response_schema) + return {'message': 'No schema defined'} + + handler.__name__ = f"{method}_{path.replace('/', '_').replace('-', '_')}" + return handler + + handler = create_handler(method, response_schema, param_types) + + decorator_kwargs = { + 'summary': summary or f"{method.upper()} {path}", + 'description': description, + 'tags': tags if tags else None, + 'operation_id': operation_id, + } + + if method == 'get': + self.app.get(path, **decorator_kwargs)(handler) + elif method == 'post': + self.app.post(path, **decorator_kwargs)(handler) + elif method == 'put': + self.app.put(path, **decorator_kwargs)(handler) + elif method == 'delete': + self.app.delete(path, **decorator_kwargs)(handler) + elif method == 'patch': + self.app.patch(path, **decorator_kwargs)(handler) + elif method == 'options': + self.app.options(path, **decorator_kwargs)(handler) + elif method == 'head': + self.app.head(path, **decorator_kwargs)(handler) + + +def create_app( + spec_path: str, + delay_range: Optional[tuple] = None, + auth_type: str = 'none', + auth_config: Optional[Dict[str, Any]] = None +) -> FastAPI: + """Create a FastAPI application from an OpenAPI specification. + + Args: + spec_path: Path to the OpenAPI specification file. + delay_range: Optional response delay range (min, max) in seconds. + auth_type: Type of authentication to simulate. + auth_config: Additional authentication configuration. + + Returns: + Configured FastAPI application. + """ + server = OpenAPIMockServer( + spec_path=spec_path, + delay_range=delay_range, + auth_type=auth_type, + auth_config=auth_config + ) + return server.app