"""OpenAPI/Swagger specification indexer.""" import hashlib import json from pathlib import Path from typing import Any, Dict, List, Optional from openapi_spec_validator import validate from yaml import safe_load from src.indexer.base import BaseIndexer from src.models.document import Document, SourceType class OpenAPIIndexer(BaseIndexer): """Indexer for OpenAPI/Swagger specifications.""" source_type = SourceType.OPENAPI SUPPORTED_EXTENSIONS = {".yaml", ".yml", ".json"} def __init__(self): self._documents: List[Document] = [] def index( self, path: Path, recursive: bool = False, batch_size: int = 32 ) -> List[Document]: """Index OpenAPI specifications from the given path. Args: path: Path to file or directory recursive: Whether to search recursively batch_size: Documents per batch (for progress tracking) Returns: List of indexed Document objects """ self._documents = [] for file_path in self._find_files(path, recursive): try: docs = self._parse_file(file_path) self._documents.extend(docs) except Exception as e: print(f"Warning: Failed to parse {file_path}: {e}") return self._documents def _parse_file(self, file_path: Path) -> List[Document]: """Parse a single OpenAPI file. Args: file_path: Path to the OpenAPI file Returns: List of Document objects """ with open(file_path, "r") as f: content = f.read() if file_path.suffix == ".json": spec = json.loads(content) else: spec = safe_load(content) if spec is None: return [] validation_errors = self._validate_spec(spec, file_path) if validation_errors: print(f"Warning: Validation errors in {file_path}: {validation_errors}") return self._extract_documents(spec, file_path) def _validate_spec( self, spec: Dict[str, Any], file_path: Path ) -> Optional[str]: """Validate an OpenAPI specification. Args: spec: The parsed specification file_path: Path to the source file Returns: None if valid, error message otherwise """ try: validate(spec) return None except Exception as e: return str(e) def _extract_documents( self, spec: Dict[str, Any], file_path: Path ) -> List[Document]: """Extract searchable documents from an OpenAPI spec. Args: spec: The parsed OpenAPI specification file_path: Path to the source file Returns: List of Document objects """ documents = [] spec_info = spec.get("info", {}) title = spec_info.get("title", file_path.stem) version = spec_info.get("version", "unknown") doc_id_base = self._generate_id(file_path) info_doc = Document( id=f"{doc_id_base}_info", content=self._format_info_content(spec_info), source_type=self.source_type, title=f"{title} - API Info", file_path=str(file_path), metadata={"version": version, "section": "info"}, ) documents.append(info_doc) for path, path_item in spec.get("paths", {}).items(): path_docs = self._extract_path_documents( path, path_item, spec, file_path, doc_id_base ) documents.extend(path_docs) for tag, tag_spec in spec.get("tags", []): tag_doc = Document( id=f"{doc_id_base}_tag_{tag}", content=self._format_tag_content(tag, tag_spec), source_type=self.source_type, title=f"Tag: {tag}", file_path=str(file_path), metadata={"section": "tags", "tag": tag}, ) documents.append(tag_doc) for schema_name, schema in spec.get("components", {}).get("schemas", {}).items(): schema_doc = self._extract_schema_document( schema_name, schema, file_path, doc_id_base ) if schema_doc: documents.append(schema_doc) return documents def _extract_path_documents( self, path: str, path_item: Dict[str, Any], spec: Dict[str, Any], file_path: Path, doc_id_base: str, ) -> List[Document]: """Extract documents from a path item. Args: path: The path string path_item: The path item specification spec: The full OpenAPI specification file_path: Path to the source file doc_id_base: Base ID for document generation Returns: List of Document objects """ documents = [] path_hash = hashlib.md5(path.encode()).hexdigest()[:8] methods = ["get", "post", "put", "patch", "delete", "options", "head", "trace"] for method in methods: if method in path_item: operation = path_item[method] doc = self._extract_operation_document( method, path, operation, spec, file_path, doc_id_base, path_hash ) documents.append(doc) summary = path_item.get("summary", "") description = path_item.get("description", "") if summary or description: path_doc = Document( id=f"{doc_id_base}_path_{path_hash}", content=f"Path: {path}\nSummary: {summary}\nDescription: {description}", source_type=self.source_type, title=f"Path: {path}", file_path=str(file_path), metadata={"section": "path", "path": path}, ) documents.append(path_doc) return documents def _extract_operation_document( self, method: str, path: str, operation: Dict[str, Any], spec: Dict[str, Any], file_path: Path, doc_id_base: str, path_hash: str, ) -> Document: """Extract a document from an operation. Args: method: HTTP method path: API path operation: The operation specification spec: The full OpenAPI specification file_path: Path to the source file doc_id_base: Base ID for document generation path_hash: Hash of the path for ID generation Returns: Document object """ op_id = operation.get("operationId", f"{method}_{path_hash}") summary = operation.get("summary", "") description = operation.get("description", "") deprecated = operation.get("deprecated", False) content_parts = [ f"Method: {method.upper()}", f"Path: {path}", f"Operation ID: {op_id}", f"Summary: {summary}", f"Description: {description}", ] if deprecated: content_parts.append("Status: DEPRECATED") tags = operation.get("tags", []) if tags: content_parts.append(f"Tags: {', '.join(tags)}") parameters = operation.get("parameters", []) if parameters: param_content = self._format_parameters(parameters) content_parts.append(f"Parameters:\n{param_content}") request_body = operation.get("requestBody", {}) if request_body: rb_content = self._format_request_body(request_body, spec) content_parts.append(f"Request Body:\n{rb_content}") responses = operation.get("responses", {}) resp_content = self._format_responses(responses) content_parts.append(f"Responses:\n{resp_content}") return Document( id=f"{doc_id_base}_{op_id}", content="\n".join(content_parts), source_type=self.source_type, title=f"{method.upper()} {path}", file_path=str(file_path), metadata={ "section": "operation", "method": method, "path": path, "operation_id": op_id, "deprecated": deprecated, }, ) def _format_parameters(self, parameters: List[Dict[str, Any]]) -> str: """Format parameters for display. Args: parameters: List of parameter specifications Returns: Formatted parameter string """ lines = [] for param in parameters: name = param.get("name", "unknown") in_loc = param.get("in", "unknown") required = param.get("required", False) description = param.get("description", "") param_type = param.get("schema", {}).get("type", "any") lines.append( f" - {name} ({in_loc}, {'required' if required else 'optional'}): {param_type}" ) if description: lines.append(f" Description: {description}") return "\n".join(lines) if lines else " No parameters" def _format_request_body( self, request_body: Dict[str, Any], spec: Dict[str, Any] ) -> str: """Format request body for display. Args: request_body: Request body specification spec: The full OpenAPI specification Returns: Formatted request body string """ lines = [] description = request_body.get("description", "") if description: lines.append(f"Description: {description}") required = request_body.get("required", False) lines.append(f"Required: {required}") content = request_body.get("content", {}) for content_type, content_spec in content.items(): schema = content_spec.get("schema", {}) schema_ref = schema.get("$ref", "") if schema_ref: resolved = self._resolve_ref(schema_ref, spec) if resolved: schema = resolved lines.append(f"Content-Type: {content_type}") lines.append(f"Schema: {json.dumps(schema, indent=4)}") return "\n".join(lines) def _format_responses(self, responses: Dict[str, Any]) -> str: """Format responses for display. Args: responses: Response specifications Returns: Formatted response string """ lines = [] for status_code, response in responses.items(): description = response.get("description", "") lines.append(f" {status_code}: {description}") content = response.get("content", {}) for content_type, content_spec in content.items(): schema = content_spec.get("schema", {}) if schema: schema_type = schema.get("type", "unknown") lines.append(f" Content-Type: {content_type}") lines.append(f" Schema Type: {schema_type}") return "\n".join(lines) if lines else " No responses defined" def _resolve_ref(self, ref: str, spec: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Resolve a $ref reference. Args: ref: The reference string spec: The full OpenAPI specification Returns: Resolved schema or None """ if not ref.startswith("#/"): return None parts = ref[2:].split("/") current = spec for part in parts: if isinstance(current, dict): current = current.get(part) else: return None return current def _extract_schema_document( self, schema_name: str, schema: Dict[str, Any], file_path: Path, doc_id_base: str, ) -> Document: """Extract a document from a schema. Args: schema_name: Name of the schema schema: Schema specification file_path: Path to the source file doc_id_base: Base ID for document generation Returns: Document object """ content_parts = [ f"Schema: {schema_name}", ] schema_type = schema.get("type", "object") content_parts.append(f"Type: {schema_type}") description = schema.get("description", "") if description: content_parts.append(f"Description: {description}") required_fields = schema.get("required", []) if required_fields: content_parts.append(f"Required Fields: {', '.join(required_fields)}") properties = schema.get("properties", {}) if properties: prop_lines = ["Properties:"] for prop_name, prop_spec in properties.items(): prop_type = prop_spec.get("type", "unknown") prop_desc = prop_spec.get("description", "") prop_required = prop_name in required_fields prop_lines.append( f" - {prop_name} ({prop_type}, {'required' if prop_required else 'optional'})" ) if prop_desc: prop_lines.append(f" Description: {prop_desc}") content_parts.append("\n".join(prop_lines)) return Document( id=f"{doc_id_base}_schema_{schema_name}", content="\n".join(content_parts), source_type=self.source_type, title=f"Schema: {schema_name}", file_path=str(file_path), metadata={"section": "schema", "schema_name": schema_name}, ) def _format_info_content(self, info: Dict[str, Any]) -> str: """Format the API info section. Args: info: Info object from specification Returns: Formatted info content """ parts = [] for key in ["title", "version", "description", "termsOfService", "contact", "license"]: if key in info: value = info[key] if isinstance(value, dict): if "name" in value: parts.append(f"{key}: {value['name']}") if "url" in value: parts.append(f"{key} URL: {value['url']}") else: parts.append(f"{key}: {value}") return "\n".join(parts) def _format_tag_content(self, tag: str, tag_spec: Dict[str, Any]) -> str: """Format tag content. Args: tag: Tag name tag_spec: Tag specification Returns: Formatted tag content """ parts = [f"Tag: {tag}"] description = tag_spec.get("description", "") if description: parts.append(f"Description: {description}") external_docs = tag_spec.get("externalDocs", {}) if external_docs: docs_url = external_docs.get("url", "") if docs_url: parts.append(f"External Docs: {docs_url}") return "\n".join(parts) def _is_supported_file(self, path: Path) -> bool: """Check if the file is a supported OpenAPI file. Args: path: Path to the file Returns: True if the file extension is supported """ return path.suffix.lower() in self.SUPPORTED_EXTENSIONS def get_documents(self) -> List[Document]: """Get all indexed documents. Returns: List of Document objects """ return self._documents