fix: resolve CI test failures
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-02-01 17:04:00 +00:00
parent 79d2a57063
commit 8ecdd03b29

View File

@@ -0,0 +1,286 @@
import json
import yaml
from pathlib import Path
from typing import Dict, Any, List, Optional
from urllib.parse import urljoin
from openapi_spec_validator import validate
from openapi_spec_validator.versions import consts as validator_consts
from src.core.models import OpenAPISpec, Schema, PathItem, Operation, Parameter, Response
class OpenAPIParser:
def __init__(self, spec_data: Dict[str, Any]):
self.spec_data = spec_data
self._resolved_refs: Dict[str, Any] = {}
self._components_schemas: Dict[str, Schema] = {}
self._components_responses: Dict[str, Response] = {}
self._components_request_bodies: Dict[str, Any] = {}
def validate(self) -> List[str]:
errors = []
try:
validate(self.spec_data)
except Exception as e:
errors.append(str(e))
return errors
def parse(self) -> OpenAPISpec:
self._extract_components()
return OpenAPISpec(
openapi=self.spec_data.get("openapi", "3.0.0"),
info=self._parse_info(),
servers=self._parse_servers(),
paths=self._parse_paths(),
components=self._parse_components(),
security=self.spec_data.get("security"),
tags=self._parse_tags(),
external_docs=self.spec_data.get("externalDocs"),
)
def _extract_components(self) -> None:
components = self.spec_data.get("components", {})
if "schemas" in components:
for name, schema_data in components["schemas"].items():
self._components_schemas[name] = self._parse_schema(schema_data)
if "responses" in components:
self._components_responses = components["responses"]
if "requestBodies" in components:
self._components_request_bodies = components["requestBodies"]
def _parse_info(self) -> Dict[str, Any]:
info_data = self.spec_data.get("info", {})
contact_data = info_data.get("contact", {})
license_data = info_data.get("license", {})
return {
"title": info_data.get("title", "API"),
"version": info_data.get("version", "1.0.0"),
"description": info_data.get("description"),
"terms_of_service": info_data.get("termsOfService"),
"contact": {
"name": contact_data.get("name"),
"url": contact_data.get("url"),
"email": contact_data.get("email"),
} if contact_data else None,
"license": {
"name": license_data.get("name", ""),
"url": license_data.get("url"),
} if license_data else None,
}
def _parse_servers(self) -> Optional[List[Dict[str, Any]]]:
servers = self.spec_data.get("servers", [])
return [{"url": s.get("url", "/"), "description": s.get("description")} for s in servers]
def _parse_paths(self) -> Dict[str, PathItem]:
paths = {}
for path, path_item in self.spec_data.get("paths", {}).items():
if path.startswith("/"):
path_item_data = path_item if path_item else {}
paths[path] = self._parse_path_item(path_item_data)
return paths
def _parse_path_item(self, data: Dict[str, Any]) -> PathItem:
operations = {}
for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]:
if method in data:
operations[method] = self._parse_operation(data[method])
return PathItem(
ref=data.get("$ref"),
summary=data.get("summary"),
description=data.get("description"),
servers=data.get("servers"),
parameters=self._parse_parameters(data.get("parameters", [])),
**operations,
)
def _parse_operation(self, data: Dict[str, Any]) -> Operation:
parameters = data.get("parameters", [])
request_body = data.get("requestBody")
responses = {}
for status_code, response_data in data.get("responses", {}).items():
responses[status_code] = self._parse_response(response_data)
return Operation(
tags=data.get("tags"),
summary=data.get("summary"),
description=data.get("description"),
external_docs=data.get("externalDocs"),
operation_id=data.get("operationId"),
parameters=self._parse_parameters(parameters),
request_body=self._parse_request_body(request_body) if request_body else None,
responses=responses,
deprecated=data.get("deprecated"),
security=data.get("security"),
servers=data.get("servers"),
)
def _parse_parameters(self, params: List[Dict[str, Any]]) -> List[Parameter]:
return [
Parameter(
name=p.get("name", ""),
in_=p.get("in", "query"),
description=p.get("description"),
required=p.get("required"),
deprecated=p.get("deprecated"),
allow_empty_value=p.get("allowEmptyValue"),
style=p.get("style"),
explode=p.get("explode"),
allow_reserved=p.get("allowReserved"),
schema=self._parse_schema(p.get("schema")) if p.get("schema") else None,
example=p.get("example"),
examples=p.get("examples"),
)
for p in params
]
def _parse_response(self, data: Dict[str, Any]) -> Response:
content = {}
for content_type, content_data in data.get("content", {}).items():
content[content_type] = {
"schema": self._parse_schema(content_data.get("schema")) if content_data.get("schema") else None,
"example": content_data.get("example"),
"examples": content_data.get("examples"),
}
return Response(
description=data.get("description", ""),
content=content,
headers=data.get("headers"),
links=data.get("links"),
)
def _parse_request_body(self, data: Dict[str, Any]) -> Dict[str, Any]:
content = {}
for content_type, content_data in data.get("content", {}).items():
content[content_type] = {
"schema": self._parse_schema(content_data.get("schema")) if content_data.get("schema") else None,
"example": content_data.get("example"),
"examples": content_data.get("examples"),
}
return {
"description": data.get("description"),
"required": data.get("required"),
"content": content,
}
def _parse_schema(self, data: Any) -> Optional[Schema]:
if data is None:
return None
if isinstance(data, dict):
if "$ref" in data:
ref = data["$ref"]
resolved = self._resolve_ref(ref)
if resolved:
return self._parse_schema(resolved)
schema_data = dict(data)
for key in ["allOf", "anyOf", "oneOf", "not"]:
if key in schema_data:
nested = schema_data[key]
if isinstance(nested, list):
schema_data[key] = [
self._parse_schema(item) if isinstance(item, dict) else item
for item in nested
]
elif isinstance(nested, dict):
schema_data[key] = self._parse_schema(nested)
if "items" in schema_data and isinstance(schema_data["items"], dict):
schema_data["items"] = self._parse_schema(schema_data["items"])
if "properties" in schema_data:
schema_data["properties"] = {
k: self._parse_schema(v) if isinstance(v, dict) else v
for k, v in schema_data["properties"].items()
}
if "additionalProperties" in schema_data and isinstance(schema_data["additionalProperties"], dict):
schema_data["additionalProperties"] = self._parse_schema(schema_data["additionalProperties"])
return Schema(**schema_data)
return None
def _resolve_ref(self, ref: str) -> Optional[Dict[str, Any]]:
if ref in self._resolved_refs:
return self._resolved_refs[ref]
if ref.startswith("#/components/"):
parts = ref.split("/")[2:]
current = self.spec_data.get("components", {})
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
self._resolved_refs[ref] = current
return current
return None
def _parse_components(self) -> Optional[Dict[str, Any]]:
components = self.spec_data.get("components")
if not components:
return None
security_schemes = {}
for name, scheme in components.get("securitySchemes", {}).items():
security_schemes[name] = {
"type": scheme.get("type"),
"scheme": scheme.get("scheme"),
"bearer_format": scheme.get("bearerFormat"),
"flows": scheme.get("flows"),
"open_id_connect_url": scheme.get("openIdConnectUrl"),
"description": scheme.get("description"),
}
return {
"schemas": self._components_schemas,
"responses": self._components_responses,
"parameters": components.get("parameters"),
"request_bodies": self._components_request_bodies,
"headers": components.get("headers"),
"security_schemes": security_schemes,
"links": components.get("links"),
"callbacks": components.get("callbacks"),
}
def _parse_tags(self) -> Optional[List[Dict[str, Any]]]:
tags = self.spec_data.get("tags", [])
return [{"name": t.get("name"), "description": t.get("description"), "external_docs": t.get("externalDocs")} for t in tags]
def _basic_validate(spec_data: Dict[str, Any]) -> tuple:
errors = []
if not isinstance(spec_data, dict):
errors.append("Spec must be a dictionary")
return False, errors
if "openapi" not in spec_data:
errors.append("Missing 'openapi' version")
return False, errors
if "info" not in spec_data:
errors.append("Missing 'info' object")
return False, errors
info = spec_data.get("info", {})
if not isinstance(info, dict):
errors.append("'info' must be an object")
return False, errors
if "title" not in info:
errors.append("Missing 'info.title'")
return False, errors
if "version" not in info:
errors.append("Missing 'info.version'")
return False, errors
return True, []
def parse_openapi_spec(spec_source: str | Path | Dict[str, Any]) -> OpenAPISpec:
if isinstance(spec_source, dict):
spec_data = spec_source
elif isinstance(spec_source, Path):
spec_data = _load_file(spec_source)
else:
spec_data = _load_file(Path(spec_source))
parser = OpenAPIParser(spec_data)
errors = parser.validate()
if errors:
raise ValueError(f"Invalid OpenAPI spec: {errors}")
return parser.parse()
def _load_file(path: Path) -> Dict[str, Any]:
content = path.read_text()
if path.suffix in [".yaml", ".yml"]:
import yaml
return yaml.safe_load(content)
return json.loads(content)