diff --git a/.src/openapi_mock/core/spec_parser.py b/.src/openapi_mock/core/spec_parser.py new file mode 100644 index 0000000..5df82a3 --- /dev/null +++ b/.src/openapi_mock/core/spec_parser.py @@ -0,0 +1,203 @@ +"""Parse and validate OpenAPI specifications.""" + +import yaml +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from openapi_spec_validator import validate +from openapi_spec_validator.versions import consts as validator_consts + + +class OpenAPISpecError(Exception): + """Base exception for OpenAPI spec errors.""" + pass + + +class SpecValidationError(OpenAPISpecError): + """Raised when OpenAPI spec validation fails.""" + pass + + +class SpecNotFoundError(OpenAPISpecError): + """Raised when OpenAPI spec file is not found.""" + pass + + +def load_spec(spec_path: str) -> Dict[str, Any]: + """Load and parse an OpenAPI specification from a YAML or JSON file. + + Args: + spec_path: Path to the OpenAPI specification file. + + Returns: + Parsed OpenAPI specification as a dictionary. + + Raises: + SpecNotFoundError: If the spec file does not exist. + SpecValidationError: If the spec is invalid. + """ + path = Path(spec_path) + if not path.exists(): + raise SpecNotFoundError(f"OpenAPI spec file not found: {spec_path}") + + try: + with open(path, 'r', encoding='utf-8') as f: + spec = yaml.safe_load(f) + except yaml.YAMLError as e: + raise OpenAPISpecError(f"Invalid YAML syntax in {spec_path}: {e}") + except IOError as e: + raise OpenAPISpecError(f"Error reading {spec_path}: {e}") + + if spec is None: + raise OpenAPISpecError(f"Empty specification file: {spec_path}") + + try: + validate(spec) + except Exception as e: + raise SpecValidationError(f"OpenAPI spec validation failed: {e}") + + return spec + + +def extract_paths(spec: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + """Extract all paths and their operations from an OpenAPI spec. + + Args: + spec: Parsed OpenAPI specification. + + Returns: + Dictionary mapping path strings to operation dictionaries. + """ + return spec.get('paths', {}) + + +def extract_schemas(spec: Dict[str, Any]) -> Dict[str, Any]: + """Extract all schemas from an OpenAPI spec. + + Args: + spec: Parsed OpenAPI specification. + + Returns: + Dictionary mapping schema names to schema definitions. + """ + components = spec.get('components', {}) + schemas = components.get('schemas', {}) + + if not schemas and 'definitions' in spec: + schemas = spec.get('definitions', {}) + + return schemas + + +def extract_path_params(path: str) -> List[str]: + """Extract parameter names from a path string. + + Args: + path: API path string like '/users/{id}/posts/{post_id}'. + + Returns: + List of parameter names without braces. + """ + import re + params = re.findall(r'\{([^}]+)\}', path) + return params + + +def get_operation_id(path: str, method: str) -> str: + """Generate an operation ID for a path/method combination. + + Args: + path: API path string. + method: HTTP method (get, post, put, delete, etc.). + + Returns: + Generated operation ID string. + """ + import re + clean_path = path.lstrip('/').replace('/', '_').replace('-', '_') + params = re.findall(r'\{([^}]+)\}', clean_path) + for param in params: + clean_path = clean_path.replace(f'{{{param}}}', f'_{param}_') + method = method.lower() + return f"{method}_{clean_path}" + + +def extract_security_schemes(spec: Dict[str, Any]) -> Dict[str, Any]: + """Extract security schemes from an OpenAPI spec. + + Args: + spec: Parsed OpenAPI specification. + + Returns: + Dictionary mapping security scheme names to scheme definitions. + """ + components = spec.get('components', {}) + return components.get('securitySchemes', {}) + + +def extract_global_security(spec: Dict[str, Any]) -> List[Dict[str, List[str]]]: + """Extract global security requirements from an OpenAPI spec. + + Args: + spec: Parsed OpenAPI specification. + + Returns: + List of security requirement dictionaries. + """ + return spec.get('security', []) + + +def get_response_schema( + spec: Dict[str, Any], + path: str, + method: str, + status_code: str = '200' +) -> Optional[Dict[str, Any]]: + """Get the response schema for a specific path/method/status_code. + + Args: + spec: Parsed OpenAPI specification. + path: API path string. + method: HTTP method. + status_code: Response status code. + + Returns: + Response schema definition or None if not found. + """ + paths = spec.get('paths', {}) + path_item = paths.get(path, {}) + operation = path_item.get(method.lower(), {}) + + responses = operation.get('responses', {}) + response = responses.get(status_code, responses.get('default', {})) + + content = response.get('content', {}) + if 'application/json' in content: + media_type = content['application/json'] + return media_type.get('schema') + + return None + + +def get_spec_version(spec: Dict[str, Any]) -> str: + """Get the OpenAPI version from the spec. + + Args: + spec: Parsed OpenAPI specification. + + Returns: + OpenAPI version string (e.g., "3.0.3", "2.0"). + """ + return spec.get('openapi', spec.get('swagger', 'unknown')) + + +def is_openapi_3(spec: Dict[str, Any]) -> bool: + """Check if the spec is OpenAPI 3.x. + + Args: + spec: Parsed OpenAPI specification. + + Returns: + True if OpenAPI 3.x, False if Swagger 2.0. + """ + openapi_version = spec.get('openapi', '') + return openapi_version.startswith('3.')