This commit is contained in:
@@ -1,126 +1,174 @@
|
|||||||
|
"""Parser for OpenAPI specifications."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import yaml
|
import yaml
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, Any, Optional
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
from openapi_spec_validator import validate
|
|
||||||
from openapi_spec_validator.versions.consts import OPENAPI_V3
|
from openapi_spec_validator import validate as validate_spec
|
||||||
|
from pydantic import ValidationError
|
||||||
|
|
||||||
|
from src.core.models import OpenAPISpec
|
||||||
|
|
||||||
|
|
||||||
class ParseError(Exception):
|
class ParseError(Exception):
|
||||||
"""Custom exception for parsing errors."""
|
"""Exception raised when parsing fails."""
|
||||||
|
|
||||||
|
def __init__(self, message: str, path: Optional[str] = None, line: Optional[int] = None):
|
||||||
|
self.message = message
|
||||||
|
self.path = path
|
||||||
|
self.line = line
|
||||||
|
super().__init__(self._format_message())
|
||||||
|
|
||||||
|
def _format_message(self) -> str:
|
||||||
|
if self.path and self.line:
|
||||||
|
return f"{self.message} (at {self.path}:{self.line})"
|
||||||
|
elif self.path:
|
||||||
|
return f"{self.message} (at {self.path})"
|
||||||
|
return self.message
|
||||||
|
|
||||||
|
|
||||||
|
class SpecValidationError(ParseError):
|
||||||
|
"""Exception raised when spec validation fails."""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def load_spec_file(spec_path: str) -> Dict[str, Any]:
|
class OpenAPIParser:
|
||||||
"""Load an OpenAPI specification file.
|
"""Parser for OpenAPI 3.0/3.1 specifications."""
|
||||||
|
|
||||||
Args:
|
SUPPORTED_VERSIONS = ["3.0.0", "3.0.1", "3.0.2", "3.0.3", "3.1.0"]
|
||||||
spec_path: Path to the OpenAPI spec file (JSON or YAML)
|
|
||||||
|
|
||||||
Returns:
|
def __init__(self, spec_path: str):
|
||||||
Parsed spec as a dictionary
|
self.spec_path = Path(spec_path)
|
||||||
|
|
||||||
Raises:
|
def load(self) -> Dict[str, Any]:
|
||||||
ParseError: If the file cannot be loaded or parsed
|
"""Load the spec file and return its contents."""
|
||||||
"""
|
if not self.spec_path.exists():
|
||||||
path = Path(spec_path)
|
raise ParseError(f"Spec file not found: {self.spec_path}", str(self.spec_path))
|
||||||
|
|
||||||
if not path.exists():
|
content = self.spec_path.read_text()
|
||||||
raise ParseError(f"File not found: {spec_path}")
|
|
||||||
|
|
||||||
try:
|
if self.spec_path.suffix.lower() in [".yaml", ".yml"]:
|
||||||
if path.suffix == '.json':
|
try:
|
||||||
with open(path, 'r') as f:
|
return yaml.safe_load(content)
|
||||||
return json.load(f)
|
except yaml.YAMLError as e:
|
||||||
elif path.suffix in ['.yaml', '.yml']:
|
raise ParseError(f"Invalid YAML format: {e}", str(self.spec_path))
|
||||||
with open(path, 'r') as f:
|
elif self.spec_path.suffix.lower() == ".json":
|
||||||
return yaml.safe_load(f)
|
try:
|
||||||
|
return json.loads(content)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise ParseError(f"Invalid JSON format: {e}", str(self.spec_path))
|
||||||
else:
|
else:
|
||||||
raise ParseError(f"Unsupported file format: {path.suffix}. Expected .json, .yaml, or .yml")
|
raise ParseError(
|
||||||
except json.JSONDecodeError as e:
|
f"Unsupported file format: {self.spec_path.suffix}. Use .yaml, .yml, or .json",
|
||||||
raise ParseError(f"Invalid JSON: {e}")
|
str(self.spec_path)
|
||||||
except yaml.YAMLError as e:
|
)
|
||||||
raise ParseError(f"Invalid YAML: {e}")
|
|
||||||
|
|
||||||
|
def validate(self, spec_data: Optional[Dict[str, Any]] = None) -> Tuple[bool, List[str]]:
|
||||||
|
"""
|
||||||
|
Validate the OpenAPI specification.
|
||||||
|
|
||||||
def parse_openapi_spec(spec_path: str) -> Dict[str, Any]:
|
Returns:
|
||||||
"""Parse and validate an OpenAPI specification file.
|
Tuple of (is_valid, list of errors)
|
||||||
|
"""
|
||||||
Args:
|
if spec_data is None:
|
||||||
spec_path: Path to the OpenAPI spec file
|
spec_data = self.load()
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dictionary with 'valid' boolean and optional 'errors' list
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
spec = load_spec_file(spec_path)
|
|
||||||
|
|
||||||
errors = []
|
errors = []
|
||||||
|
|
||||||
if not isinstance(spec, dict):
|
try:
|
||||||
return {'valid': False, 'errors': ['Spec is not a dictionary']}
|
validate_spec(spec_data)
|
||||||
|
return True, []
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = str(e)
|
||||||
|
errors.append(error_msg)
|
||||||
|
return False, errors
|
||||||
|
|
||||||
openapi_version = spec.get('openapi', '')
|
def parse(self) -> OpenAPISpec:
|
||||||
if not openapi_version.startswith('3.'):
|
"""
|
||||||
errors.append(f"Expected OpenAPI 3.x version, got: {openapi_version}")
|
Parse and validate the OpenAPI specification.
|
||||||
|
|
||||||
if 'info' not in spec:
|
Returns:
|
||||||
errors.append("Missing 'info' field")
|
OpenAPISpec object
|
||||||
|
|
||||||
if 'paths' not in spec:
|
Raises:
|
||||||
errors.append("Missing 'paths' field")
|
ParseError: If the spec cannot be parsed
|
||||||
|
SpecValidationError: If the spec is invalid
|
||||||
|
"""
|
||||||
|
spec_data = self.load()
|
||||||
|
|
||||||
if errors:
|
is_valid, errors = self.validate(spec_data)
|
||||||
return {'valid': False, 'errors': errors}
|
if not is_valid:
|
||||||
|
error_text = "; ".join(errors)
|
||||||
|
raise SpecValidationError(f"Invalid OpenAPI specification: {error_text}", str(self.spec_path))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
validate(spec)
|
return OpenAPISpec(**spec_data)
|
||||||
except Exception as e:
|
except ValidationError as e:
|
||||||
return {'valid': False, 'errors': [str(e)]}
|
error_messages = []
|
||||||
|
for error in e.errors():
|
||||||
|
loc = ".".join(str(l) for l in error["loc"])
|
||||||
|
error_messages.append(f"{loc}: {error['msg']}")
|
||||||
|
raise ParseError(
|
||||||
|
f"Schema validation failed: {'; '.join(error_messages)}",
|
||||||
|
str(self.spec_path)
|
||||||
|
)
|
||||||
|
|
||||||
|
def parse_with_examples(self) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Parse the spec and add generated examples.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing parsed spec with examples
|
||||||
|
"""
|
||||||
|
from src.utils.examples import ExampleGenerator
|
||||||
|
|
||||||
|
spec = self.parse()
|
||||||
|
generator = ExampleGenerator()
|
||||||
|
|
||||||
|
endpoints = []
|
||||||
|
for endpoint in spec.get_endpoints():
|
||||||
|
endpoint_dict = endpoint.model_dump(mode="json", exclude_none=True)
|
||||||
|
|
||||||
|
if endpoint.requestBody:
|
||||||
|
endpoint_dict["requestBodyExample"] = generator.generate_from_content(
|
||||||
|
endpoint.requestBody.content or {}
|
||||||
|
)
|
||||||
|
|
||||||
|
responses = {}
|
||||||
|
for status_code, response in endpoint.responses.items():
|
||||||
|
response_dict = response.model_dump(mode="json", exclude_none=True)
|
||||||
|
if response.content:
|
||||||
|
response_dict["example"] = generator.generate_from_content(response.content or {})
|
||||||
|
responses[status_code] = response_dict
|
||||||
|
endpoint_dict["responses"] = responses
|
||||||
|
|
||||||
|
endpoints.append(endpoint_dict)
|
||||||
|
|
||||||
|
schemas = {}
|
||||||
|
for name, schema in spec.get_schemas().items():
|
||||||
|
schemas[name] = {
|
||||||
|
"name": name,
|
||||||
|
"schema": schema.model_dump(mode="json", exclude_none=True),
|
||||||
|
"example": generator.generate_example(schema),
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'valid': True,
|
"spec": spec.model_dump(mode="json", exclude_none=True),
|
||||||
'spec': spec,
|
"endpoints": endpoints,
|
||||||
'version': openapi_version,
|
"schemas": schemas,
|
||||||
'title': spec.get('info', {}).get('title', 'Untitled'),
|
"tags": [tag.model_dump(mode="json", exclude_none=True) for tag in spec.get_tags()],
|
||||||
'version_num': spec.get('info', {}).get('version', '1.0.0'),
|
|
||||||
'endpoints_count': count_endpoints(spec),
|
|
||||||
'tags': spec.get('tags', [])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
except ParseError as e:
|
|
||||||
return {'valid': False, 'errors': [str(e)]}
|
def parse_spec_file(spec_path: str) -> OpenAPISpec:
|
||||||
except Exception as e:
|
"""Convenience function to parse an OpenAPI spec file."""
|
||||||
return {'valid': False, 'errors': [f"Unexpected error: {e}"]}
|
parser = OpenAPIParser(spec_path)
|
||||||
|
return parser.parse()
|
||||||
|
|
||||||
|
|
||||||
def count_endpoints(spec: Dict[str, Any]) -> int:
|
def validate_spec_file(spec_path: str) -> Tuple[bool, List[str]]:
|
||||||
"""Count the total number of endpoints in the spec."""
|
"""Convenience function to validate an OpenAPI spec file."""
|
||||||
count = 0
|
parser = OpenAPIParser(spec_path)
|
||||||
for path, methods in spec.get('paths', {}).items():
|
return parser.validate()
|
||||||
for method in methods:
|
|
||||||
if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']:
|
|
||||||
count += 1
|
|
||||||
return count
|
|
||||||
|
|
||||||
|
|
||||||
def extract_endpoints(spec: Dict[str, Any]) -> list:
|
|
||||||
"""Extract all endpoints from the spec."""
|
|
||||||
endpoints = []
|
|
||||||
for path, methods in spec.get('paths', {}).items():
|
|
||||||
for method, details in methods.items():
|
|
||||||
if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']:
|
|
||||||
endpoint = {
|
|
||||||
'path': path,
|
|
||||||
'method': method.upper(),
|
|
||||||
'summary': details.get('summary'),
|
|
||||||
'description': details.get('description'),
|
|
||||||
'operation_id': details.get('operationId'),
|
|
||||||
'tags': details.get('tags', []),
|
|
||||||
'parameters': details.get('parameters', []),
|
|
||||||
'request_body': details.get('requestBody'),
|
|
||||||
'responses': details.get('responses', {}),
|
|
||||||
'deprecated': details.get('deprecated', False)
|
|
||||||
}
|
|
||||||
endpoints.append(endpoint)
|
|
||||||
return endpoints
|
|
||||||
|
|||||||
Reference in New Issue
Block a user