diff --git a/src/core/parser.py b/src/core/parser.py new file mode 100644 index 0000000..ed96e27 --- /dev/null +++ b/src/core/parser.py @@ -0,0 +1,126 @@ +import json +import yaml +from pathlib import Path +from typing import Dict, Any, Optional +from openapi_spec_validator import validate +from openapi_spec_validator.versions.consts import OPENAPI_V3 + + +class ParseError(Exception): + """Custom exception for parsing errors.""" + pass + + +def load_spec_file(spec_path: str) -> Dict[str, Any]: + """Load an OpenAPI specification file. + + Args: + spec_path: Path to the OpenAPI spec file (JSON or YAML) + + Returns: + Parsed spec as a dictionary + + Raises: + ParseError: If the file cannot be loaded or parsed + """ + path = Path(spec_path) + + if not path.exists(): + raise ParseError(f"File not found: {spec_path}") + + try: + if path.suffix == '.json': + with open(path, 'r') as f: + return json.load(f) + elif path.suffix in ['.yaml', '.yml']: + with open(path, 'r') as f: + return yaml.safe_load(f) + else: + raise ParseError(f"Unsupported file format: {path.suffix}. Expected .json, .yaml, or .yml") + except json.JSONDecodeError as e: + raise ParseError(f"Invalid JSON: {e}") + except yaml.YAMLError as e: + raise ParseError(f"Invalid YAML: {e}") + + +def parse_openapi_spec(spec_path: str) -> Dict[str, Any]: + """Parse and validate an OpenAPI specification file. + + Args: + spec_path: Path to the OpenAPI spec file + + Returns: + Dictionary with 'valid' boolean and optional 'errors' list + """ + try: + spec = load_spec_file(spec_path) + + errors = [] + + if not isinstance(spec, dict): + return {'valid': False, 'errors': ['Spec is not a dictionary']} + + openapi_version = spec.get('openapi', '') + if not openapi_version.startswith('3.'): + errors.append(f"Expected OpenAPI 3.x version, got: {openapi_version}") + + if 'info' not in spec: + errors.append("Missing 'info' field") + + if 'paths' not in spec: + errors.append("Missing 'paths' field") + + if errors: + return {'valid': False, 'errors': errors} + + try: + validate(spec) + except Exception as e: + return {'valid': False, 'errors': [str(e)]} + + return { + 'valid': True, + 'spec': spec, + 'version': openapi_version, + 'title': spec.get('info', {}).get('title', 'Untitled'), + 'version_num': spec.get('info', {}).get('version', '1.0.0'), + 'endpoints_count': count_endpoints(spec), + 'tags': spec.get('tags', []) + } + + except ParseError as e: + return {'valid': False, 'errors': [str(e)]} + except Exception as e: + return {'valid': False, 'errors': [f"Unexpected error: {e}"]} + + +def count_endpoints(spec: Dict[str, Any]) -> int: + """Count the total number of endpoints in the spec.""" + count = 0 + for path, methods in spec.get('paths', {}).items(): + for method in methods: + if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']: + count += 1 + return count + + +def extract_endpoints(spec: Dict[str, Any]) -> list: + """Extract all endpoints from the spec.""" + endpoints = [] + for path, methods in spec.get('paths', {}).items(): + for method, details in methods.items(): + if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']: + endpoint = { + 'path': path, + 'method': method.upper(), + 'summary': details.get('summary'), + 'description': details.get('description'), + 'operation_id': details.get('operationId'), + 'tags': details.get('tags', []), + 'parameters': details.get('parameters', []), + 'request_body': details.get('requestBody'), + 'responses': details.get('responses', {}), + 'deprecated': details.get('deprecated', False) + } + endpoints.append(endpoint) + return endpoints