This commit is contained in:
126
src/core/parser.py
Normal file
126
src/core/parser.py
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
import json
|
||||||
|
import yaml
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Dict, Any, Optional
|
||||||
|
from openapi_spec_validator import validate
|
||||||
|
from openapi_spec_validator.versions.consts import OPENAPI_V3
|
||||||
|
|
||||||
|
|
||||||
|
class ParseError(Exception):
|
||||||
|
"""Custom exception for parsing errors."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def load_spec_file(spec_path: str) -> Dict[str, Any]:
|
||||||
|
"""Load an OpenAPI specification file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
spec_path: Path to the OpenAPI spec file (JSON or YAML)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parsed spec as a dictionary
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ParseError: If the file cannot be loaded or parsed
|
||||||
|
"""
|
||||||
|
path = Path(spec_path)
|
||||||
|
|
||||||
|
if not path.exists():
|
||||||
|
raise ParseError(f"File not found: {spec_path}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
if path.suffix == '.json':
|
||||||
|
with open(path, 'r') as f:
|
||||||
|
return json.load(f)
|
||||||
|
elif path.suffix in ['.yaml', '.yml']:
|
||||||
|
with open(path, 'r') as f:
|
||||||
|
return yaml.safe_load(f)
|
||||||
|
else:
|
||||||
|
raise ParseError(f"Unsupported file format: {path.suffix}. Expected .json, .yaml, or .yml")
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise ParseError(f"Invalid JSON: {e}")
|
||||||
|
except yaml.YAMLError as e:
|
||||||
|
raise ParseError(f"Invalid YAML: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_openapi_spec(spec_path: str) -> Dict[str, Any]:
|
||||||
|
"""Parse and validate an OpenAPI specification file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
spec_path: Path to the OpenAPI spec file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with 'valid' boolean and optional 'errors' list
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
spec = load_spec_file(spec_path)
|
||||||
|
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
if not isinstance(spec, dict):
|
||||||
|
return {'valid': False, 'errors': ['Spec is not a dictionary']}
|
||||||
|
|
||||||
|
openapi_version = spec.get('openapi', '')
|
||||||
|
if not openapi_version.startswith('3.'):
|
||||||
|
errors.append(f"Expected OpenAPI 3.x version, got: {openapi_version}")
|
||||||
|
|
||||||
|
if 'info' not in spec:
|
||||||
|
errors.append("Missing 'info' field")
|
||||||
|
|
||||||
|
if 'paths' not in spec:
|
||||||
|
errors.append("Missing 'paths' field")
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
return {'valid': False, 'errors': errors}
|
||||||
|
|
||||||
|
try:
|
||||||
|
validate(spec)
|
||||||
|
except Exception as e:
|
||||||
|
return {'valid': False, 'errors': [str(e)]}
|
||||||
|
|
||||||
|
return {
|
||||||
|
'valid': True,
|
||||||
|
'spec': spec,
|
||||||
|
'version': openapi_version,
|
||||||
|
'title': spec.get('info', {}).get('title', 'Untitled'),
|
||||||
|
'version_num': spec.get('info', {}).get('version', '1.0.0'),
|
||||||
|
'endpoints_count': count_endpoints(spec),
|
||||||
|
'tags': spec.get('tags', [])
|
||||||
|
}
|
||||||
|
|
||||||
|
except ParseError as e:
|
||||||
|
return {'valid': False, 'errors': [str(e)]}
|
||||||
|
except Exception as e:
|
||||||
|
return {'valid': False, 'errors': [f"Unexpected error: {e}"]}
|
||||||
|
|
||||||
|
|
||||||
|
def count_endpoints(spec: Dict[str, Any]) -> int:
|
||||||
|
"""Count the total number of endpoints in the spec."""
|
||||||
|
count = 0
|
||||||
|
for path, methods in spec.get('paths', {}).items():
|
||||||
|
for method in methods:
|
||||||
|
if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']:
|
||||||
|
count += 1
|
||||||
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
def extract_endpoints(spec: Dict[str, Any]) -> list:
|
||||||
|
"""Extract all endpoints from the spec."""
|
||||||
|
endpoints = []
|
||||||
|
for path, methods in spec.get('paths', {}).items():
|
||||||
|
for method, details in methods.items():
|
||||||
|
if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']:
|
||||||
|
endpoint = {
|
||||||
|
'path': path,
|
||||||
|
'method': method.upper(),
|
||||||
|
'summary': details.get('summary'),
|
||||||
|
'description': details.get('description'),
|
||||||
|
'operation_id': details.get('operationId'),
|
||||||
|
'tags': details.get('tags', []),
|
||||||
|
'parameters': details.get('parameters', []),
|
||||||
|
'request_body': details.get('requestBody'),
|
||||||
|
'responses': details.get('responses', {}),
|
||||||
|
'deprecated': details.get('deprecated', False)
|
||||||
|
}
|
||||||
|
endpoints.append(endpoint)
|
||||||
|
return endpoints
|
||||||
Reference in New Issue
Block a user