Files
openapi-mock-server/.src/openapi_mock/core/spec_parser.py
7000pctAUTO a35ca70982
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled
Initial commit: Add OpenAPI Mock Server project
2026-01-30 03:41:41 +00:00

204 lines
5.3 KiB
Python

"""Parse and validate OpenAPI specifications."""
import yaml
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from openapi_spec_validator import validate
from openapi_spec_validator.versions import consts as validator_consts
class OpenAPISpecError(Exception):
"""Base exception for OpenAPI spec errors."""
pass
class SpecValidationError(OpenAPISpecError):
"""Raised when OpenAPI spec validation fails."""
pass
class SpecNotFoundError(OpenAPISpecError):
"""Raised when OpenAPI spec file is not found."""
pass
def load_spec(spec_path: str) -> Dict[str, Any]:
"""Load and parse an OpenAPI specification from a YAML or JSON file.
Args:
spec_path: Path to the OpenAPI specification file.
Returns:
Parsed OpenAPI specification as a dictionary.
Raises:
SpecNotFoundError: If the spec file does not exist.
SpecValidationError: If the spec is invalid.
"""
path = Path(spec_path)
if not path.exists():
raise SpecNotFoundError(f"OpenAPI spec file not found: {spec_path}")
try:
with open(path, 'r', encoding='utf-8') as f:
spec = yaml.safe_load(f)
except yaml.YAMLError as e:
raise OpenAPISpecError(f"Invalid YAML syntax in {spec_path}: {e}")
except IOError as e:
raise OpenAPISpecError(f"Error reading {spec_path}: {e}")
if spec is None:
raise OpenAPISpecError(f"Empty specification file: {spec_path}")
try:
validate(spec)
except Exception as e:
raise SpecValidationError(f"OpenAPI spec validation failed: {e}")
return spec
def extract_paths(spec: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""Extract all paths and their operations from an OpenAPI spec.
Args:
spec: Parsed OpenAPI specification.
Returns:
Dictionary mapping path strings to operation dictionaries.
"""
return spec.get('paths', {})
def extract_schemas(spec: Dict[str, Any]) -> Dict[str, Any]:
"""Extract all schemas from an OpenAPI spec.
Args:
spec: Parsed OpenAPI specification.
Returns:
Dictionary mapping schema names to schema definitions.
"""
components = spec.get('components', {})
schemas = components.get('schemas', {})
if not schemas and 'definitions' in spec:
schemas = spec.get('definitions', {})
return schemas
def extract_path_params(path: str) -> List[str]:
"""Extract parameter names from a path string.
Args:
path: API path string like '/users/{id}/posts/{post_id}'.
Returns:
List of parameter names without braces.
"""
import re
params = re.findall(r'\{([^}]+)\}', path)
return params
def get_operation_id(path: str, method: str) -> str:
"""Generate an operation ID for a path/method combination.
Args:
path: API path string.
method: HTTP method (get, post, put, delete, etc.).
Returns:
Generated operation ID string.
"""
import re
clean_path = path.lstrip('/').replace('/', '_').replace('-', '_')
params = re.findall(r'\{([^}]+)\}', clean_path)
for param in params:
clean_path = clean_path.replace(f'{{{param}}}', f'_{param}_')
method = method.lower()
return f"{method}_{clean_path}"
def extract_security_schemes(spec: Dict[str, Any]) -> Dict[str, Any]:
"""Extract security schemes from an OpenAPI spec.
Args:
spec: Parsed OpenAPI specification.
Returns:
Dictionary mapping security scheme names to scheme definitions.
"""
components = spec.get('components', {})
return components.get('securitySchemes', {})
def extract_global_security(spec: Dict[str, Any]) -> List[Dict[str, List[str]]]:
"""Extract global security requirements from an OpenAPI spec.
Args:
spec: Parsed OpenAPI specification.
Returns:
List of security requirement dictionaries.
"""
return spec.get('security', [])
def get_response_schema(
spec: Dict[str, Any],
path: str,
method: str,
status_code: str = '200'
) -> Optional[Dict[str, Any]]:
"""Get the response schema for a specific path/method/status_code.
Args:
spec: Parsed OpenAPI specification.
path: API path string.
method: HTTP method.
status_code: Response status code.
Returns:
Response schema definition or None if not found.
"""
paths = spec.get('paths', {})
path_item = paths.get(path, {})
operation = path_item.get(method.lower(), {})
responses = operation.get('responses', {})
response = responses.get(status_code, responses.get('default', {}))
content = response.get('content', {})
if 'application/json' in content:
media_type = content['application/json']
return media_type.get('schema')
return None
def get_spec_version(spec: Dict[str, Any]) -> str:
"""Get the OpenAPI version from the spec.
Args:
spec: Parsed OpenAPI specification.
Returns:
OpenAPI version string (e.g., "3.0.3", "2.0").
"""
return spec.get('openapi', spec.get('swagger', 'unknown'))
def is_openapi_3(spec: Dict[str, Any]) -> bool:
"""Check if the spec is OpenAPI 3.x.
Args:
spec: Parsed OpenAPI specification.
Returns:
True if OpenAPI 3.x, False if Swagger 2.0.
"""
openapi_version = spec.get('openapi', '')
return openapi_version.startswith('3.')