From 730c2f7ab84cf4b24b08195fbfca14c0a847e82a Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 19:55:34 +0000 Subject: [PATCH] Add source files: generator, validator, fuzzer modules --- src/core/validator.py | 393 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 393 insertions(+) create mode 100644 src/core/validator.py diff --git a/src/core/validator.py b/src/core/validator.py new file mode 100644 index 0000000..8bcd8c3 --- /dev/null +++ b/src/core/validator.py @@ -0,0 +1,393 @@ +"""Request Validator for validating requests against OpenAPI schemas.""" + +from typing import Any + +from jsonschema import Draft7Validator, ValidationError, validate + + +class RequestValidatorError(Exception): + """Base exception for request validator errors.""" + + pass + + +class ValidationFailure(RequestValidatorError): + """Raised when request validation fails.""" + + def __init__(self, message: str, errors: list[ValidationError]) -> None: + """Initialize validation failure. + + Args: + message: Error message. + errors: List of validation errors. + """ + super().__init__(message) + self.errors = errors + + +class RequestValidator: + """Validates HTTP requests against OpenAPI schema definitions.""" + + def __init__(self, spec: dict[str, Any]) -> None: + """Initialize the validator with an OpenAPI specification. + + Args: + spec: Parsed OpenAPI specification. + """ + self.spec = spec + self.schemas = self._extract_schemas() + self.definitions = self._extract_definitions() + + def _extract_schemas(self) -> dict[str, Any]: + """Extract schemas from components (OpenAPI 3.x). + + Returns: + Dictionary of schema definitions. + """ + components = self.spec.get("components", {}) + return components.get("schemas", {}) + + def _extract_definitions(self) -> dict[str, Any]: + """Extract definitions (Swagger 2.0). + + Returns: + Dictionary of schema definitions. + """ + return self.spec.get("definitions", {}) + + def _resolve_ref(self, ref: str) -> dict[str, Any] | None: + """Resolve a $ref reference. + + Args: + ref: The reference string. + + Returns: + The resolved schema or None. + """ + if ref.startswith("#/components/schemas/"): + schema_name = ref.split("/")[-1] + return self.schemas.get(schema_name) + elif ref.startswith("#/definitions/"): + schema_name = ref.split("/")[-1] + return self.definitions.get(schema_name) + return None + + def _convert_schema(self, schema: dict[str, Any]) -> dict[str, Any]: + """Convert OpenAPI schema to JSON Schema format. + + Args: + schema: OpenAPI schema definition. + + Returns: + JSON Schema compatible dictionary. + """ + if "$ref" in schema: + resolved = self._resolve_ref(schema["$ref"]) + if resolved: + return self._convert_schema(resolved) + return schema + + converted = {} + + for key, value in schema.items(): + if key in {"type", "format", "description", "default", "example"}: + converted[key] = value + elif key == "properties": + converted["properties"] = { + prop_name: self._convert_schema(prop_schema) + for prop_name, prop_schema in value.items() + } + elif key == "items": + converted["items"] = self._convert_schema(value) + elif key == "required": + converted["required"] = value + elif key == "enum": + converted["enum"] = value + elif key in {"minimum", "maximum", "minLength", "maxLength", "minItems", "maxItems"}: + converted[key] = value + elif key == "pattern": + converted["pattern"] = value + elif key == "additionalProperties": + converted["additionalProperties"] = value + + return converted + + def validate_headers( + self, + headers: dict[str, Any], + required_headers: list[str] | None = None, + header_schemas: dict[str, dict[str, Any]] | None = None, + ) -> ValidationFailure | None: + """Validate request headers. + + Args: + headers: Request headers. + required_headers: List of required header names. + header_schemas: Schema definitions for headers. + + Returns: + ValidationFailure if validation fails, None otherwise. + """ + if required_headers: + missing = [h for h in required_headers if h.lower() not in headers] + if missing: + errors = [ + ValidationError( + f"Missing required header: {missing_h}", + validator="required", + validator_value=required_headers, + instance=headers, + ) + for missing_h in missing + ] + return ValidationFailure("Header validation failed", errors) + + if header_schemas: + for header_name, header_schema in header_schemas.items(): + header_value = headers.get(header_name.lower()) + if header_value is not None: + converted_schema = self._convert_schema(header_schema) + try: + validate( + header_value, + converted_schema, + resolver=Draft7Validator, + ) + except ValidationError as e: + return ValidationFailure( + f"Header '{header_name}' validation failed", [e] + ) + + return None + + def validate_query_params( + self, + params: dict[str, Any], + param_schemas: dict[str, dict[str, Any]], + ) -> ValidationFailure | None: + """Validate query parameters. + + Args: + params: Query parameters from request. + param_schemas: Schema definitions for each parameter. + + Returns: + ValidationFailure if validation fails, None otherwise. + """ + for param_name, param_schema in param_schemas.items(): + param_value = params.get(param_name) + required = param_schema.get("required", False) + + if required and param_value is None: + return ValidationFailure( + f"Missing required query parameter: {param_name}", + [ + ValidationError( + f"Missing required parameter: {param_name}", + validator="required", + validator_value=[param_name], + instance=params, + ) + ], + ) + + if param_value is not None: + converted_schema = self._convert_schema(param_schema) + try: + validate(param_value, converted_schema, resolver=Draft7Validator) + except ValidationError as e: + return ValidationFailure( + f"Query parameter '{param_name}' validation failed", [e] + ) + + return None + + def validate_path_params( + self, + params: dict[str, Any], + param_schemas: dict[str, dict[str, Any]], + ) -> ValidationFailure | None: + """Validate path parameters. + + Args: + params: Path parameters extracted from URL. + param_schemas: Schema definitions for each parameter. + + Returns: + ValidationFailure if validation fails, None otherwise. + """ + for param_name, param_schema in param_schemas.items(): + if param_name not in params: + return ValidationFailure( + f"Missing path parameter: {param_name}", + [ + ValidationError( + f"Missing path parameter: {param_name}", + validator="required", + validator_value=[param_name], + instance=params, + ) + ], + ) + + return None + + def validate_body( + self, + body: Any, + schema: dict[str, Any] | None, + ) -> ValidationFailure | None: + """Validate request body against schema. + + Args: + body: Request body. + schema: JSON Schema for validation. + + Returns: + ValidationFailure if validation fails, None otherwise. + """ + if schema is None: + return None + + if body is None or body == "": + if schema.get("required", False): + return ValidationFailure( + "Request body is required", + [ + ValidationError( + "Request body is required", + validator="type", + validator_value=["object", "array", "string"], + instance=None, + ) + ], + ) + return None + + converted_schema = self._convert_schema(schema) + + try: + validate(body, converted_schema, resolver=Draft7Validator) + except ValidationError as e: + return ValidationFailure("Request body validation failed", [e]) + + return None + + def validate_request( + self, + method: str, + path: str, + headers: dict[str, Any], + query_params: dict[str, Any], + path_params: dict[str, Any], + body: Any, + operation_spec: dict[str, Any], + ) -> ValidationFailure | None: + """Validate a complete request. + + Args: + method: HTTP method. + path: Request path. + headers: Request headers. + query_params: Query parameters. + path_params: Path parameters. + body: Request body. + operation_spec: Operation specification from OpenAPI. + + Returns: + ValidationFailure if validation fails, None otherwise. + """ + parameters = operation_spec.get("parameters", []) + + header_schemas: dict[str, dict[str, Any]] = {} + required_headers: list[str] = [] + + query_param_schemas: dict[str, dict[str, Any]] = {} + path_param_schemas: dict[str, dict[str, Any]] = {} + + for param in parameters: + param_name = param.get("name", "") + param_in = param.get("in", "") + param_required = param.get("required", False) + param_schema = param.get("schema", {}) + + if param_in == "header": + header_schemas[param_name] = param_schema + if param_required: + required_headers.append(param_name) + elif param_in == "query": + query_param_schemas[param_name] = param_schema + elif param_in == "path": + path_param_schemas[param_name] = param_schema + + validation_error = self.validate_headers(headers, required_headers, header_schemas) + if validation_error: + return validation_error + + validation_error = self.validate_query_params(query_params, query_param_schemas) + if validation_error: + return validation_error + + validation_error = self.validate_path_params(path_params, path_param_schemas) + if validation_error: + return validation_error + + request_body = operation_spec.get("requestBody", {}) + if request_body: + content = request_body.get("content", {}) + json_content = content.get("application/json", {}) + body_schema = json_content.get("schema") + validation_error = self.validate_body(body, body_schema) + if validation_error: + return validation_error + + return None + + def format_validation_errors( + self, errors: list[ValidationError] + ) -> list[dict[str, Any]]: + """Format validation errors for response. + + Args: + errors: List of validation errors. + + Returns: + List of formatted error details. + """ + formatted = [] + + for error in errors: + error_info: dict[str, Any] = { + "message": error.message, + "path": list(error.absolute_path) if error.absolute_path else [], + "validator": error.validator, + } + + if error.instance is not None: + error_info["instance"] = str(error.instance) + + formatted.append(error_info) + + return formatted + + def create_error_response( + self, failure: ValidationFailure + ) -> dict[str, Any]: + """Create an error response from a validation failure. + + Args: + failure: The validation failure. + + Returns: + Error response dictionary. + """ + return { + "status_code": 400, + "body": { + "error": { + "type": "Validation Error", + "message": str(failure), + "details": self.format_validation_errors(failure.errors), + } + }, + }