"""Request Validator for validating requests against OpenAPI schemas.""" from typing import Any from jsonschema import Draft7Validator, ValidationError, validate class RequestValidatorError(Exception): """Base exception for request validator errors.""" pass class ValidationFailure(RequestValidatorError): """Raised when request validation fails.""" def __init__(self, message: str, errors: list[ValidationError]) -> None: """Initialize validation failure. Args: message: Error message. errors: List of validation errors. """ super().__init__(message) self.errors = errors class RequestValidator: """Validates HTTP requests against OpenAPI schema definitions.""" def __init__(self, spec: dict[str, Any]) -> None: """Initialize the validator with an OpenAPI specification. Args: spec: Parsed OpenAPI specification. """ self.spec = spec self.schemas = self._extract_schemas() self.definitions = self._extract_definitions() def _extract_schemas(self) -> dict[str, Any]: """Extract schemas from components (OpenAPI 3.x). Returns: Dictionary of schema definitions. """ components = self.spec.get("components", {}) return components.get("schemas", {}) def _extract_definitions(self) -> dict[str, Any]: """Extract definitions (Swagger 2.0). Returns: Dictionary of schema definitions. """ return self.spec.get("definitions", {}) def _resolve_ref(self, ref: str) -> dict[str, Any] | None: """Resolve a $ref reference. Args: ref: The reference string. Returns: The resolved schema or None. """ if ref.startswith("#/components/schemas/"): schema_name = ref.split("/")[-1] return self.schemas.get(schema_name) elif ref.startswith("#/definitions/"): schema_name = ref.split("/")[-1] return self.definitions.get(schema_name) return None def _convert_schema(self, schema: dict[str, Any]) -> dict[str, Any]: """Convert OpenAPI schema to JSON Schema format. Args: schema: OpenAPI schema definition. Returns: JSON Schema compatible dictionary. """ if "$ref" in schema: resolved = self._resolve_ref(schema["$ref"]) if resolved: return self._convert_schema(resolved) return schema converted = {} for key, value in schema.items(): if key in {"type", "format", "description", "default", "example"}: converted[key] = value elif key == "properties": converted["properties"] = { prop_name: self._convert_schema(prop_schema) for prop_name, prop_schema in value.items() } elif key == "items": converted["items"] = self._convert_schema(value) elif key == "required": converted["required"] = value elif key == "enum": converted["enum"] = value elif key in {"minimum", "maximum", "minLength", "maxLength", "minItems", "maxItems"}: converted[key] = value elif key == "pattern": converted["pattern"] = value elif key == "additionalProperties": converted["additionalProperties"] = value return converted def validate_headers( self, headers: dict[str, Any], required_headers: list[str] | None = None, header_schemas: dict[str, dict[str, Any]] | None = None, ) -> ValidationFailure | None: """Validate request headers. Args: headers: Request headers. required_headers: List of required header names. header_schemas: Schema definitions for headers. Returns: ValidationFailure if validation fails, None otherwise. """ if required_headers: missing = [h for h in required_headers if h.lower() not in headers] if missing: errors = [ ValidationError( f"Missing required header: {missing_h}", validator="required", validator_value=required_headers, instance=headers, ) for missing_h in missing ] return ValidationFailure("Header validation failed", errors) if header_schemas: for header_name, header_schema in header_schemas.items(): header_value = headers.get(header_name.lower()) if header_value is not None: converted_schema = self._convert_schema(header_schema) try: validate( header_value, converted_schema, resolver=Draft7Validator, ) except ValidationError as e: return ValidationFailure( f"Header '{header_name}' validation failed", [e] ) return None def validate_query_params( self, params: dict[str, Any], param_schemas: dict[str, dict[str, Any]], ) -> ValidationFailure | None: """Validate query parameters. Args: params: Query parameters from request. param_schemas: Schema definitions for each parameter. Returns: ValidationFailure if validation fails, None otherwise. """ for param_name, param_schema in param_schemas.items(): param_value = params.get(param_name) required = param_schema.get("required", False) if required and param_value is None: return ValidationFailure( f"Missing required query parameter: {param_name}", [ ValidationError( f"Missing required parameter: {param_name}", validator="required", validator_value=[param_name], instance=params, ) ], ) if param_value is not None: converted_schema = self._convert_schema(param_schema) try: validate(param_value, converted_schema, resolver=Draft7Validator) except ValidationError as e: return ValidationFailure( f"Query parameter '{param_name}' validation failed", [e] ) return None def validate_path_params( self, params: dict[str, Any], param_schemas: dict[str, dict[str, Any]], ) -> ValidationFailure | None: """Validate path parameters. Args: params: Path parameters extracted from URL. param_schemas: Schema definitions for each parameter. Returns: ValidationFailure if validation fails, None otherwise. """ for param_name, param_schema in param_schemas.items(): if param_name not in params: return ValidationFailure( f"Missing path parameter: {param_name}", [ ValidationError( f"Missing path parameter: {param_name}", validator="required", validator_value=[param_name], instance=params, ) ], ) return None def validate_body( self, body: Any, schema: dict[str, Any] | None, ) -> ValidationFailure | None: """Validate request body against schema. Args: body: Request body. schema: JSON Schema for validation. Returns: ValidationFailure if validation fails, None otherwise. """ if schema is None: return None if body is None or body == "": if schema.get("required", False): return ValidationFailure( "Request body is required", [ ValidationError( "Request body is required", validator="type", validator_value=["object", "array", "string"], instance=None, ) ], ) return None converted_schema = self._convert_schema(schema) try: validate(body, converted_schema, resolver=Draft7Validator) except ValidationError as e: return ValidationFailure("Request body validation failed", [e]) return None def validate_request( self, method: str, path: str, headers: dict[str, Any], query_params: dict[str, Any], path_params: dict[str, Any], body: Any, operation_spec: dict[str, Any], ) -> ValidationFailure | None: """Validate a complete request. Args: method: HTTP method. path: Request path. headers: Request headers. query_params: Query parameters. path_params: Path parameters. body: Request body. operation_spec: Operation specification from OpenAPI. Returns: ValidationFailure if validation fails, None otherwise. """ parameters = operation_spec.get("parameters", []) header_schemas: dict[str, dict[str, Any]] = {} required_headers: list[str] = [] query_param_schemas: dict[str, dict[str, Any]] = {} path_param_schemas: dict[str, dict[str, Any]] = {} for param in parameters: param_name = param.get("name", "") param_in = param.get("in", "") param_required = param.get("required", False) param_schema = param.get("schema", {}) if param_in == "header": header_schemas[param_name] = param_schema if param_required: required_headers.append(param_name) elif param_in == "query": query_param_schemas[param_name] = param_schema elif param_in == "path": path_param_schemas[param_name] = param_schema validation_error = self.validate_headers(headers, required_headers, header_schemas) if validation_error: return validation_error validation_error = self.validate_query_params(query_params, query_param_schemas) if validation_error: return validation_error validation_error = self.validate_path_params(path_params, path_param_schemas) if validation_error: return validation_error request_body = operation_spec.get("requestBody", {}) if request_body: content = request_body.get("content", {}) json_content = content.get("application/json", {}) body_schema = json_content.get("schema") validation_error = self.validate_body(body, body_schema) if validation_error: return validation_error return None def format_validation_errors( self, errors: list[ValidationError] ) -> list[dict[str, Any]]: """Format validation errors for response. Args: errors: List of validation errors. Returns: List of formatted error details. """ formatted = [] for error in errors: error_info: dict[str, Any] = { "message": error.message, "path": list(error.absolute_path) if error.absolute_path else [], "validator": error.validator, } if error.instance is not None: error_info["instance"] = str(error.instance) formatted.append(error_info) return formatted def create_error_response( self, failure: ValidationFailure ) -> dict[str, Any]: """Create an error response from a validation failure. Args: failure: The validation failure. Returns: Error response dictionary. """ return { "status_code": 400, "body": { "error": { "type": "Validation Error", "message": str(failure), "details": self.format_validation_errors(failure.errors), } }, }