From 92b4281bde99a5dd840fde89f8036743d08db7b5 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Sun, 1 Feb 2026 16:38:19 +0000 Subject: [PATCH] fix: resolve CI test failures --- src/core/parser.py | 236 +++++++++++++++++++++++++++------------------ 1 file changed, 142 insertions(+), 94 deletions(-) diff --git a/src/core/parser.py b/src/core/parser.py index ed96e27..aeda1f3 100644 --- a/src/core/parser.py +++ b/src/core/parser.py @@ -1,126 +1,174 @@ +"""Parser for OpenAPI specifications.""" + import json import yaml from pathlib import Path -from typing import Dict, Any, Optional -from openapi_spec_validator import validate -from openapi_spec_validator.versions.consts import OPENAPI_V3 +from typing import Any, Dict, List, Optional, Tuple + +from openapi_spec_validator import validate as validate_spec +from pydantic import ValidationError + +from src.core.models import OpenAPISpec class ParseError(Exception): - """Custom exception for parsing errors.""" + """Exception raised when parsing fails.""" + + def __init__(self, message: str, path: Optional[str] = None, line: Optional[int] = None): + self.message = message + self.path = path + self.line = line + super().__init__(self._format_message()) + + def _format_message(self) -> str: + if self.path and self.line: + return f"{self.message} (at {self.path}:{self.line})" + elif self.path: + return f"{self.message} (at {self.path})" + return self.message + + +class SpecValidationError(ParseError): + """Exception raised when spec validation fails.""" + pass -def load_spec_file(spec_path: str) -> Dict[str, Any]: - """Load an OpenAPI specification file. +class OpenAPIParser: + """Parser for OpenAPI 3.0/3.1 specifications.""" - Args: - spec_path: Path to the OpenAPI spec file (JSON or YAML) + SUPPORTED_VERSIONS = ["3.0.0", "3.0.1", "3.0.2", "3.0.3", "3.1.0"] - Returns: - Parsed spec as a dictionary + def __init__(self, spec_path: str): + self.spec_path = Path(spec_path) - Raises: - ParseError: If the file cannot be loaded or parsed - """ - path = Path(spec_path) + def load(self) -> Dict[str, Any]: + """Load the spec file and return its contents.""" + if not self.spec_path.exists(): + raise ParseError(f"Spec file not found: {self.spec_path}", str(self.spec_path)) - if not path.exists(): - raise ParseError(f"File not found: {spec_path}") + content = self.spec_path.read_text() - try: - if path.suffix == '.json': - with open(path, 'r') as f: - return json.load(f) - elif path.suffix in ['.yaml', '.yml']: - with open(path, 'r') as f: - return yaml.safe_load(f) + if self.spec_path.suffix.lower() in [".yaml", ".yml"]: + try: + return yaml.safe_load(content) + except yaml.YAMLError as e: + raise ParseError(f"Invalid YAML format: {e}", str(self.spec_path)) + elif self.spec_path.suffix.lower() == ".json": + try: + return json.loads(content) + except json.JSONDecodeError as e: + raise ParseError(f"Invalid JSON format: {e}", str(self.spec_path)) else: - raise ParseError(f"Unsupported file format: {path.suffix}. Expected .json, .yaml, or .yml") - except json.JSONDecodeError as e: - raise ParseError(f"Invalid JSON: {e}") - except yaml.YAMLError as e: - raise ParseError(f"Invalid YAML: {e}") + raise ParseError( + f"Unsupported file format: {self.spec_path.suffix}. Use .yaml, .yml, or .json", + str(self.spec_path) + ) + def validate(self, spec_data: Optional[Dict[str, Any]] = None) -> Tuple[bool, List[str]]: + """ + Validate the OpenAPI specification. -def parse_openapi_spec(spec_path: str) -> Dict[str, Any]: - """Parse and validate an OpenAPI specification file. - - Args: - spec_path: Path to the OpenAPI spec file - - Returns: - Dictionary with 'valid' boolean and optional 'errors' list - """ - try: - spec = load_spec_file(spec_path) + Returns: + Tuple of (is_valid, list of errors) + """ + if spec_data is None: + spec_data = self.load() errors = [] - if not isinstance(spec, dict): - return {'valid': False, 'errors': ['Spec is not a dictionary']} + try: + validate_spec(spec_data) + return True, [] + except Exception as e: + error_msg = str(e) + errors.append(error_msg) + return False, errors - openapi_version = spec.get('openapi', '') - if not openapi_version.startswith('3.'): - errors.append(f"Expected OpenAPI 3.x version, got: {openapi_version}") + def parse(self) -> OpenAPISpec: + """ + Parse and validate the OpenAPI specification. - if 'info' not in spec: - errors.append("Missing 'info' field") + Returns: + OpenAPISpec object - if 'paths' not in spec: - errors.append("Missing 'paths' field") + Raises: + ParseError: If the spec cannot be parsed + SpecValidationError: If the spec is invalid + """ + spec_data = self.load() - if errors: - return {'valid': False, 'errors': errors} + is_valid, errors = self.validate(spec_data) + if not is_valid: + error_text = "; ".join(errors) + raise SpecValidationError(f"Invalid OpenAPI specification: {error_text}", str(self.spec_path)) try: - validate(spec) - except Exception as e: - return {'valid': False, 'errors': [str(e)]} + return OpenAPISpec(**spec_data) + except ValidationError as e: + error_messages = [] + for error in e.errors(): + loc = ".".join(str(l) for l in error["loc"]) + error_messages.append(f"{loc}: {error['msg']}") + raise ParseError( + f"Schema validation failed: {'; '.join(error_messages)}", + str(self.spec_path) + ) + + def parse_with_examples(self) -> Dict[str, Any]: + """ + Parse the spec and add generated examples. + + Returns: + Dictionary containing parsed spec with examples + """ + from src.utils.examples import ExampleGenerator + + spec = self.parse() + generator = ExampleGenerator() + + endpoints = [] + for endpoint in spec.get_endpoints(): + endpoint_dict = endpoint.model_dump(mode="json", exclude_none=True) + + if endpoint.requestBody: + endpoint_dict["requestBodyExample"] = generator.generate_from_content( + endpoint.requestBody.content or {} + ) + + responses = {} + for status_code, response in endpoint.responses.items(): + response_dict = response.model_dump(mode="json", exclude_none=True) + if response.content: + response_dict["example"] = generator.generate_from_content(response.content or {}) + responses[status_code] = response_dict + endpoint_dict["responses"] = responses + + endpoints.append(endpoint_dict) + + schemas = {} + for name, schema in spec.get_schemas().items(): + schemas[name] = { + "name": name, + "schema": schema.model_dump(mode="json", exclude_none=True), + "example": generator.generate_example(schema), + } return { - 'valid': True, - 'spec': spec, - 'version': openapi_version, - 'title': spec.get('info', {}).get('title', 'Untitled'), - 'version_num': spec.get('info', {}).get('version', '1.0.0'), - 'endpoints_count': count_endpoints(spec), - 'tags': spec.get('tags', []) + "spec": spec.model_dump(mode="json", exclude_none=True), + "endpoints": endpoints, + "schemas": schemas, + "tags": [tag.model_dump(mode="json", exclude_none=True) for tag in spec.get_tags()], } - except ParseError as e: - return {'valid': False, 'errors': [str(e)]} - except Exception as e: - return {'valid': False, 'errors': [f"Unexpected error: {e}"]} + +def parse_spec_file(spec_path: str) -> OpenAPISpec: + """Convenience function to parse an OpenAPI spec file.""" + parser = OpenAPIParser(spec_path) + return parser.parse() -def count_endpoints(spec: Dict[str, Any]) -> int: - """Count the total number of endpoints in the spec.""" - count = 0 - for path, methods in spec.get('paths', {}).items(): - for method in methods: - if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']: - count += 1 - return count - - -def extract_endpoints(spec: Dict[str, Any]) -> list: - """Extract all endpoints from the spec.""" - endpoints = [] - for path, methods in spec.get('paths', {}).items(): - for method, details in methods.items(): - if method.lower() in ['get', 'post', 'put', 'delete', 'patch', 'options', 'head']: - endpoint = { - 'path': path, - 'method': method.upper(), - 'summary': details.get('summary'), - 'description': details.get('description'), - 'operation_id': details.get('operationId'), - 'tags': details.get('tags', []), - 'parameters': details.get('parameters', []), - 'request_body': details.get('requestBody'), - 'responses': details.get('responses', {}), - 'deprecated': details.get('deprecated', False) - } - endpoints.append(endpoint) - return endpoints +def validate_spec_file(spec_path: str) -> Tuple[bool, List[str]]: + """Convenience function to validate an OpenAPI spec file.""" + parser = OpenAPIParser(spec_path) + return parser.validate()