diff --git a/src/mockapi/core/validator.py b/src/mockapi/core/validator.py new file mode 100644 index 0000000..8f4d6e9 --- /dev/null +++ b/src/mockapi/core/validator.py @@ -0,0 +1,96 @@ +"""OpenAPI Specification Validator.""" + +from typing import Any, Dict, List, Optional + +from openapi_spec_validator import validate_spec +from openapi_spec_validator.validation.exceptions import OpenAPIValidationError + + +class OpenAPIValidator: + """Validates OpenAPI 3.x specifications.""" + + def __init__(self, spec: Dict[str, Any]): + """Initialize the validator. + + Args: + spec: The OpenAPI specification dictionary + """ + self.spec = spec + self._validation_errors: List[str] = [] + + def validate(self) -> List[str]: + """Validate the OpenAPI specification. + + Returns: + List of validation error messages (empty if valid) + """ + self._validation_errors = [] + + try: + validate_spec(self.spec) + except OpenAPIValidationError as e: + if hasattr(e, "autoblame"): + for error in e.autoblame(): + message = self._format_validation_error(error) + if message not in self._validation_errors: + self._validation_errors.append(message) + else: + self._validation_errors.append(str(e)) + except Exception as e: + self._validation_errors.append(f"Validation error: {e}") + + return self._validation_errors + + def _format_validation_error(self, error) -> str: + """Format a validation error into a readable message.""" + if hasattr(error, "message"): + return f"{error.message}" + return str(error) + + def is_valid(self) -> bool: + """Check if the specification is valid. + + Returns: + True if valid, False otherwise + """ + return len(self.validate()) == 0 + + def get_paths(self) -> List[str]: + """Get list of paths in the spec. + + Returns: + List of path strings + """ + return list(self.spec.get("paths", {}).keys()) + + def get_operations(self, path: str) -> Dict[str, Any]: + """Get all operations for a given path. + + Args: + path: The path to get operations for + + Returns: + Dictionary of method -> operation + """ + path_item = self.spec.get("paths", {}).get(path, {}) + methods = ["get", "post", "put", "delete", "patch", "options", "head", "trace"] + return {m: path_item[m] for m in methods if m in path_item} + + def get_schema(self, schema_name: str) -> Optional[Dict[str, Any]]: + """Get a schema by name from components/schemas. + + Args: + schema_name: Name of the schema + + Returns: + Schema definition or None if not found + """ + return self.spec.get("components", {}).get("schemas", {}).get(schema_name) + + def get_all_schemas(self) -> Dict[str, Any]: + """Get all schemas from the spec. + + Returns: + Dictionary of schema name -> schema definition + """ + return self.spec.get("components", {}).get("schemas", {}) \ No newline at end of file