"""Parse and validate OpenAPI specifications.""" import yaml from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from openapi_spec_validator import validate from openapi_spec_validator.versions import consts as validator_consts class OpenAPISpecError(Exception): """Base exception for OpenAPI spec errors.""" pass class SpecValidationError(OpenAPISpecError): """Raised when OpenAPI spec validation fails.""" pass class SpecNotFoundError(OpenAPISpecError): """Raised when OpenAPI spec file is not found.""" pass def load_spec(spec_path: str) -> Dict[str, Any]: """Load and parse an OpenAPI specification from a YAML or JSON file. Args: spec_path: Path to the OpenAPI specification file. Returns: Parsed OpenAPI specification as a dictionary. Raises: SpecNotFoundError: If the spec file does not exist. SpecValidationError: If the spec is invalid. """ path = Path(spec_path) if not path.exists(): raise SpecNotFoundError(f"OpenAPI spec file not found: {spec_path}") try: with open(path, 'r', encoding='utf-8') as f: spec = yaml.safe_load(f) except yaml.YAMLError as e: raise OpenAPISpecError(f"Invalid YAML syntax in {spec_path}: {e}") except IOError as e: raise OpenAPISpecError(f"Error reading {spec_path}: {e}") if spec is None: raise OpenAPISpecError(f"Empty specification file: {spec_path}") try: validate(spec) except Exception as e: raise SpecValidationError(f"OpenAPI spec validation failed: {e}") return spec def extract_paths(spec: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: """Extract all paths and their operations from an OpenAPI spec. Args: spec: Parsed OpenAPI specification. Returns: Dictionary mapping path strings to operation dictionaries. """ return spec.get('paths', {}) def extract_schemas(spec: Dict[str, Any]) -> Dict[str, Any]: """Extract all schemas from an OpenAPI spec. Args: spec: Parsed OpenAPI specification. Returns: Dictionary mapping schema names to schema definitions. """ components = spec.get('components', {}) schemas = components.get('schemas', {}) if not schemas and 'definitions' in spec: schemas = spec.get('definitions', {}) return schemas def extract_path_params(path: str) -> List[str]: """Extract parameter names from a path string. Args: path: API path string like '/users/{id}/posts/{post_id}'. Returns: List of parameter names without braces. """ import re params = re.findall(r'\{([^}]+)\}', path) return params def get_operation_id(path: str, method: str) -> str: """Generate an operation ID for a path/method combination. Args: path: API path string. method: HTTP method (get, post, put, delete, etc.). Returns: Generated operation ID string. """ import re clean_path = path.lstrip('/').replace('/', '_').replace('-', '_') params = re.findall(r'\{([^}]+)\}', clean_path) for param in params: clean_path = clean_path.replace(f'{{{param}}}', f'_{param}_') method = method.lower() return f"{method}_{clean_path}" def extract_security_schemes(spec: Dict[str, Any]) -> Dict[str, Any]: """Extract security schemes from an OpenAPI spec. Args: spec: Parsed OpenAPI specification. Returns: Dictionary mapping security scheme names to scheme definitions. """ components = spec.get('components', {}) return components.get('securitySchemes', {}) def extract_global_security(spec: Dict[str, Any]) -> List[Dict[str, List[str]]]: """Extract global security requirements from an OpenAPI spec. Args: spec: Parsed OpenAPI specification. Returns: List of security requirement dictionaries. """ return spec.get('security', []) def get_response_schema( spec: Dict[str, Any], path: str, method: str, status_code: str = '200' ) -> Optional[Dict[str, Any]]: """Get the response schema for a specific path/method/status_code. Args: spec: Parsed OpenAPI specification. path: API path string. method: HTTP method. status_code: Response status code. Returns: Response schema definition or None if not found. """ paths = spec.get('paths', {}) path_item = paths.get(path, {}) operation = path_item.get(method.lower(), {}) responses = operation.get('responses', {}) response = responses.get(status_code, responses.get('default', {})) content = response.get('content', {}) if 'application/json' in content: media_type = content['application/json'] return media_type.get('schema') return None def get_spec_version(spec: Dict[str, Any]) -> str: """Get the OpenAPI version from the spec. Args: spec: Parsed OpenAPI specification. Returns: OpenAPI version string (e.g., "3.0.3", "2.0"). """ return spec.get('openapi', spec.get('swagger', 'unknown')) def is_openapi_3(spec: Dict[str, Any]) -> bool: """Check if the spec is OpenAPI 3.x. Args: spec: Parsed OpenAPI specification. Returns: True if OpenAPI 3.x, False if Swagger 2.0. """ openapi_version = spec.get('openapi', '') return openapi_version.startswith('3.')