diff --git a/app/localapi-docs/src/core/parser.py b/app/localapi-docs/src/core/parser.py new file mode 100644 index 0000000..0e3ab6f --- /dev/null +++ b/app/localapi-docs/src/core/parser.py @@ -0,0 +1,286 @@ +import json +import yaml +from pathlib import Path +from typing import Dict, Any, List, Optional +from urllib.parse import urljoin + +from openapi_spec_validator import validate +from openapi_spec_validator.versions import consts as validator_consts + +from src.core.models import OpenAPISpec, Schema, PathItem, Operation, Parameter, Response + + +class OpenAPIParser: + def __init__(self, spec_data: Dict[str, Any]): + self.spec_data = spec_data + self._resolved_refs: Dict[str, Any] = {} + self._components_schemas: Dict[str, Schema] = {} + self._components_responses: Dict[str, Response] = {} + self._components_request_bodies: Dict[str, Any] = {} + + def validate(self) -> List[str]: + errors = [] + try: + validate(self.spec_data) + except Exception as e: + errors.append(str(e)) + return errors + + def parse(self) -> OpenAPISpec: + self._extract_components() + return OpenAPISpec( + openapi=self.spec_data.get("openapi", "3.0.0"), + info=self._parse_info(), + servers=self._parse_servers(), + paths=self._parse_paths(), + components=self._parse_components(), + security=self.spec_data.get("security"), + tags=self._parse_tags(), + external_docs=self.spec_data.get("externalDocs"), + ) + + def _extract_components(self) -> None: + components = self.spec_data.get("components", {}) + if "schemas" in components: + for name, schema_data in components["schemas"].items(): + self._components_schemas[name] = self._parse_schema(schema_data) + if "responses" in components: + self._components_responses = components["responses"] + if "requestBodies" in components: + self._components_request_bodies = components["requestBodies"] + + def _parse_info(self) -> Dict[str, Any]: + info_data = self.spec_data.get("info", {}) + contact_data = info_data.get("contact", {}) + license_data = info_data.get("license", {}) + return { + "title": info_data.get("title", "API"), + "version": info_data.get("version", "1.0.0"), + "description": info_data.get("description"), + "terms_of_service": info_data.get("termsOfService"), + "contact": { + "name": contact_data.get("name"), + "url": contact_data.get("url"), + "email": contact_data.get("email"), + } if contact_data else None, + "license": { + "name": license_data.get("name", ""), + "url": license_data.get("url"), + } if license_data else None, + } + + def _parse_servers(self) -> Optional[List[Dict[str, Any]]]: + servers = self.spec_data.get("servers", []) + return [{"url": s.get("url", "/"), "description": s.get("description")} for s in servers] + + def _parse_paths(self) -> Dict[str, PathItem]: + paths = {} + for path, path_item in self.spec_data.get("paths", {}).items(): + if path.startswith("/"): + path_item_data = path_item if path_item else {} + paths[path] = self._parse_path_item(path_item_data) + return paths + + def _parse_path_item(self, data: Dict[str, Any]) -> PathItem: + operations = {} + for method in ["get", "put", "post", "delete", "options", "head", "patch", "trace"]: + if method in data: + operations[method] = self._parse_operation(data[method]) + return PathItem( + ref=data.get("$ref"), + summary=data.get("summary"), + description=data.get("description"), + servers=data.get("servers"), + parameters=self._parse_parameters(data.get("parameters", [])), + **operations, + ) + + def _parse_operation(self, data: Dict[str, Any]) -> Operation: + parameters = data.get("parameters", []) + request_body = data.get("requestBody") + responses = {} + for status_code, response_data in data.get("responses", {}).items(): + responses[status_code] = self._parse_response(response_data) + return Operation( + tags=data.get("tags"), + summary=data.get("summary"), + description=data.get("description"), + external_docs=data.get("externalDocs"), + operation_id=data.get("operationId"), + parameters=self._parse_parameters(parameters), + request_body=self._parse_request_body(request_body) if request_body else None, + responses=responses, + deprecated=data.get("deprecated"), + security=data.get("security"), + servers=data.get("servers"), + ) + + def _parse_parameters(self, params: List[Dict[str, Any]]) -> List[Parameter]: + return [ + Parameter( + name=p.get("name", ""), + in_=p.get("in", "query"), + description=p.get("description"), + required=p.get("required"), + deprecated=p.get("deprecated"), + allow_empty_value=p.get("allowEmptyValue"), + style=p.get("style"), + explode=p.get("explode"), + allow_reserved=p.get("allowReserved"), + schema=self._parse_schema(p.get("schema")) if p.get("schema") else None, + example=p.get("example"), + examples=p.get("examples"), + ) + for p in params + ] + + def _parse_response(self, data: Dict[str, Any]) -> Response: + content = {} + for content_type, content_data in data.get("content", {}).items(): + content[content_type] = { + "schema": self._parse_schema(content_data.get("schema")) if content_data.get("schema") else None, + "example": content_data.get("example"), + "examples": content_data.get("examples"), + } + return Response( + description=data.get("description", ""), + content=content, + headers=data.get("headers"), + links=data.get("links"), + ) + + def _parse_request_body(self, data: Dict[str, Any]) -> Dict[str, Any]: + content = {} + for content_type, content_data in data.get("content", {}).items(): + content[content_type] = { + "schema": self._parse_schema(content_data.get("schema")) if content_data.get("schema") else None, + "example": content_data.get("example"), + "examples": content_data.get("examples"), + } + return { + "description": data.get("description"), + "required": data.get("required"), + "content": content, + } + + def _parse_schema(self, data: Any) -> Optional[Schema]: + if data is None: + return None + if isinstance(data, dict): + if "$ref" in data: + ref = data["$ref"] + resolved = self._resolve_ref(ref) + if resolved: + return self._parse_schema(resolved) + schema_data = dict(data) + for key in ["allOf", "anyOf", "oneOf", "not"]: + if key in schema_data: + nested = schema_data[key] + if isinstance(nested, list): + schema_data[key] = [ + self._parse_schema(item) if isinstance(item, dict) else item + for item in nested + ] + elif isinstance(nested, dict): + schema_data[key] = self._parse_schema(nested) + if "items" in schema_data and isinstance(schema_data["items"], dict): + schema_data["items"] = self._parse_schema(schema_data["items"]) + if "properties" in schema_data: + schema_data["properties"] = { + k: self._parse_schema(v) if isinstance(v, dict) else v + for k, v in schema_data["properties"].items() + } + if "additionalProperties" in schema_data and isinstance(schema_data["additionalProperties"], dict): + schema_data["additionalProperties"] = self._parse_schema(schema_data["additionalProperties"]) + return Schema(**schema_data) + return None + + def _resolve_ref(self, ref: str) -> Optional[Dict[str, Any]]: + if ref in self._resolved_refs: + return self._resolved_refs[ref] + if ref.startswith("#/components/"): + parts = ref.split("/")[2:] + current = self.spec_data.get("components", {}) + for part in parts: + if isinstance(current, dict) and part in current: + current = current[part] + else: + return None + self._resolved_refs[ref] = current + return current + return None + + def _parse_components(self) -> Optional[Dict[str, Any]]: + components = self.spec_data.get("components") + if not components: + return None + security_schemes = {} + for name, scheme in components.get("securitySchemes", {}).items(): + security_schemes[name] = { + "type": scheme.get("type"), + "scheme": scheme.get("scheme"), + "bearer_format": scheme.get("bearerFormat"), + "flows": scheme.get("flows"), + "open_id_connect_url": scheme.get("openIdConnectUrl"), + "description": scheme.get("description"), + } + return { + "schemas": self._components_schemas, + "responses": self._components_responses, + "parameters": components.get("parameters"), + "request_bodies": self._components_request_bodies, + "headers": components.get("headers"), + "security_schemes": security_schemes, + "links": components.get("links"), + "callbacks": components.get("callbacks"), + } + + def _parse_tags(self) -> Optional[List[Dict[str, Any]]]: + tags = self.spec_data.get("tags", []) + return [{"name": t.get("name"), "description": t.get("description"), "external_docs": t.get("externalDocs")} for t in tags] + + +def _basic_validate(spec_data: Dict[str, Any]) -> tuple: + errors = [] + if not isinstance(spec_data, dict): + errors.append("Spec must be a dictionary") + return False, errors + if "openapi" not in spec_data: + errors.append("Missing 'openapi' version") + return False, errors + if "info" not in spec_data: + errors.append("Missing 'info' object") + return False, errors + info = spec_data.get("info", {}) + if not isinstance(info, dict): + errors.append("'info' must be an object") + return False, errors + if "title" not in info: + errors.append("Missing 'info.title'") + return False, errors + if "version" not in info: + errors.append("Missing 'info.version'") + return False, errors + return True, [] + + +def parse_openapi_spec(spec_source: str | Path | Dict[str, Any]) -> OpenAPISpec: + if isinstance(spec_source, dict): + spec_data = spec_source + elif isinstance(spec_source, Path): + spec_data = _load_file(spec_source) + else: + spec_data = _load_file(Path(spec_source)) + parser = OpenAPIParser(spec_data) + errors = parser.validate() + if errors: + raise ValueError(f"Invalid OpenAPI spec: {errors}") + return parser.parse() + + +def _load_file(path: Path) -> Dict[str, Any]: + content = path.read_text() + if path.suffix in [".yaml", ".yml"]: + import yaml + return yaml.safe_load(content) + return json.loads(content)