From d4c9af263c188c383abed138754809af7de28048 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Tue, 3 Feb 2026 03:54:39 +0000 Subject: [PATCH] fix: resolve CI/CD issues with proper package structure and imports --- src/local_api_docs_search/indexer/openapi.py | 491 +++++++++++++++++++ 1 file changed, 491 insertions(+) create mode 100644 src/local_api_docs_search/indexer/openapi.py diff --git a/src/local_api_docs_search/indexer/openapi.py b/src/local_api_docs_search/indexer/openapi.py new file mode 100644 index 0000000..b94375e --- /dev/null +++ b/src/local_api_docs_search/indexer/openapi.py @@ -0,0 +1,491 @@ +"""OpenAPI/Swagger specification indexer.""" + +import hashlib +import json +from pathlib import Path +from typing import Any, Dict, List, Optional + +from openapi_spec_validator import validate +from yaml import safe_load + +from local_api_docs_search.indexer.base import BaseIndexer +from local_api_docs_search.models.document import Document, SourceType + + +class OpenAPIIndexer(BaseIndexer): + """Indexer for OpenAPI/Swagger specifications.""" + + source_type = SourceType.OPENAPI + + SUPPORTED_EXTENSIONS = {".yaml", ".yml", ".json"} + + def __init__(self): + self._documents: List[Document] = [] + + def index( + self, path: Path, recursive: bool = False, batch_size: int = 32 + ) -> List[Document]: + """Index OpenAPI specifications from the given path. + + Args: + path: Path to file or directory + recursive: Whether to search recursively + batch_size: Documents per batch (for progress tracking) + + Returns: + List of indexed Document objects + """ + self._documents = [] + + for file_path in self._find_files(path, recursive): + try: + docs = self._parse_file(file_path) + self._documents.extend(docs) + except Exception as e: + print(f"Warning: Failed to parse {file_path}: {e}") + + return self._documents + + def _parse_file(self, file_path: Path) -> List[Document]: + """Parse a single OpenAPI file. + + Args: + file_path: Path to the OpenAPI file + + Returns: + List of Document objects + """ + with open(file_path, "r") as f: + content = f.read() + + if file_path.suffix == ".json": + spec = json.loads(content) + else: + spec = safe_load(content) + + if spec is None: + return [] + + validation_errors = self._validate_spec(spec, file_path) + if validation_errors: + print(f"Warning: Validation errors in {file_path}: {validation_errors}") + + return self._extract_documents(spec, file_path) + + def _validate_spec( + self, spec: Dict[str, Any], file_path: Path + ) -> Optional[str]: + """Validate an OpenAPI specification. + + Args: + spec: The parsed specification + file_path: Path to the source file + + Returns: + None if valid, error message otherwise + """ + try: + validate(spec) + return None + except Exception as e: + return str(e) + + def _extract_documents( + self, spec: Dict[str, Any], file_path: Path + ) -> List[Document]: + """Extract searchable documents from an OpenAPI spec. + + Args: + spec: The parsed OpenAPI specification + file_path: Path to the source file + + Returns: + List of Document objects + """ + documents = [] + spec_info = spec.get("info", {}) + title = spec_info.get("title", file_path.stem) + version = spec_info.get("version", "unknown") + + doc_id_base = self._generate_id(file_path) + + info_doc = Document( + id=f"{doc_id_base}_info", + content=self._format_info_content(spec_info), + source_type=self.source_type, + title=f"{title} - API Info", + file_path=str(file_path), + metadata={"version": version, "section": "info"}, + ) + documents.append(info_doc) + + for path, path_item in spec.get("paths", {}).items(): + path_docs = self._extract_path_documents( + path, path_item, spec, file_path, doc_id_base + ) + documents.extend(path_docs) + + for tag, tag_spec in spec.get("tags", []): + tag_doc = Document( + id=f"{doc_id_base}_tag_{tag}", + content=self._format_tag_content(tag, tag_spec), + source_type=self.source_type, + title=f"Tag: {tag}", + file_path=str(file_path), + metadata={"section": "tags", "tag": tag}, + ) + documents.append(tag_doc) + + for schema_name, schema in spec.get("components", {}).get("schemas", {}).items(): + schema_doc = self._extract_schema_document( + schema_name, schema, file_path, doc_id_base + ) + if schema_doc: + documents.append(schema_doc) + + return documents + + def _extract_path_documents( + self, + path: str, + path_item: Dict[str, Any], + spec: Dict[str, Any], + file_path: Path, + doc_id_base: str, + ) -> List[Document]: + """Extract documents from a path item. + + Args: + path: The path string + path_item: The path item specification + spec: The full OpenAPI specification + file_path: Path to the source file + doc_id_base: Base ID for document generation + + Returns: + List of Document objects + """ + documents = [] + path_hash = hashlib.md5(path.encode()).hexdigest()[:8] + + methods = ["get", "post", "put", "patch", "delete", "options", "head", "trace"] + + for method in methods: + if method in path_item: + operation = path_item[method] + doc = self._extract_operation_document( + method, path, operation, spec, file_path, doc_id_base, path_hash + ) + documents.append(doc) + + summary = path_item.get("summary", "") + description = path_item.get("description", "") + if summary or description: + path_doc = Document( + id=f"{doc_id_base}_path_{path_hash}", + content=f"Path: {path}\nSummary: {summary}\nDescription: {description}", + source_type=self.source_type, + title=f"Path: {path}", + file_path=str(file_path), + metadata={"section": "path", "path": path}, + ) + documents.append(path_doc) + + return documents + + def _extract_operation_document( + self, + method: str, + path: str, + operation: Dict[str, Any], + spec: Dict[str, Any], + file_path: Path, + doc_id_base: str, + path_hash: str, + ) -> Document: + """Extract a document from an operation. + + Args: + method: HTTP method + path: API path + operation: The operation specification + spec: The full OpenAPI specification + file_path: Path to the source file + doc_id_base: Base ID for document generation + path_hash: Hash of the path for ID generation + + Returns: + Document object + """ + op_id = operation.get("operationId", f"{method}_{path_hash}") + summary = operation.get("summary", "") + description = operation.get("description", "") + deprecated = operation.get("deprecated", False) + + content_parts = [ + f"Method: {method.upper()}", + f"Path: {path}", + f"Operation ID: {op_id}", + f"Summary: {summary}", + f"Description: {description}", + ] + + if deprecated: + content_parts.append("Status: DEPRECATED") + + tags = operation.get("tags", []) + if tags: + content_parts.append(f"Tags: {', '.join(tags)}") + + parameters = operation.get("parameters", []) + if parameters: + param_content = self._format_parameters(parameters) + content_parts.append(f"Parameters:\n{param_content}") + + request_body = operation.get("requestBody", {}) + if request_body: + rb_content = self._format_request_body(request_body, spec) + content_parts.append(f"Request Body:\n{rb_content}") + + responses = operation.get("responses", {}) + resp_content = self._format_responses(responses) + content_parts.append(f"Responses:\n{resp_content}") + + return Document( + id=f"{doc_id_base}_{op_id}", + content="\n".join(content_parts), + source_type=self.source_type, + title=f"{method.upper()} {path}", + file_path=str(file_path), + metadata={ + "section": "operation", + "method": method, + "path": path, + "operation_id": op_id, + "deprecated": deprecated, + }, + ) + + def _format_parameters(self, parameters: List[Dict[str, Any]]) -> str: + """Format parameters for display. + + Args: + parameters: List of parameter specifications + + Returns: + Formatted parameter string + """ + lines = [] + for param in parameters: + name = param.get("name", "unknown") + in_loc = param.get("in", "unknown") + required = param.get("required", False) + description = param.get("description", "") + param_type = param.get("schema", {}).get("type", "any") + + lines.append( + f" - {name} ({in_loc}, {'required' if required else 'optional'}): {param_type}" + ) + if description: + lines.append(f" Description: {description}") + + return "\n".join(lines) if lines else " No parameters" + + def _format_request_body( + self, request_body: Dict[str, Any], spec: Dict[str, Any] + ) -> str: + """Format request body for display. + + Args: + request_body: Request body specification + spec: The full OpenAPI specification + + Returns: + Formatted request body string + """ + lines = [] + description = request_body.get("description", "") + if description: + lines.append(f"Description: {description}") + + required = request_body.get("required", False) + lines.append(f"Required: {required}") + + content = request_body.get("content", {}) + for content_type, content_spec in content.items(): + schema = content_spec.get("schema", {}) + schema_ref = schema.get("$ref", "") + if schema_ref: + resolved = self._resolve_ref(schema_ref, spec) + if resolved: + schema = resolved + lines.append(f"Content-Type: {content_type}") + lines.append(f"Schema: {json.dumps(schema, indent=4)}") + + return "\n".join(lines) + + def _format_responses(self, responses: Dict[str, Any]) -> str: + """Format responses for display. + + Args: + responses: Response specifications + + Returns: + Formatted response string + """ + lines = [] + for status_code, response in responses.items(): + description = response.get("description", "") + lines.append(f" {status_code}: {description}") + + content = response.get("content", {}) + for content_type, content_spec in content.items(): + schema = content_spec.get("schema", {}) + if schema: + schema_type = schema.get("type", "unknown") + lines.append(f" Content-Type: {content_type}") + lines.append(f" Schema Type: {schema_type}") + + return "\n".join(lines) if lines else " No responses defined" + + def _resolve_ref(self, ref: str, spec: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Resolve a $ref reference. + + Args: + ref: The reference string + spec: The full OpenAPI specification + + Returns: + Resolved schema or None + """ + if not ref.startswith("#/"): + return None + + parts = ref[2:].split("/") + current = spec + + for part in parts: + if isinstance(current, dict): + current = current.get(part) + else: + return None + + return current + + def _extract_schema_document( + self, + schema_name: str, + schema: Dict[str, Any], + file_path: Path, + doc_id_base: str, + ) -> Document: + """Extract a document from a schema. + + Args: + schema_name: Name of the schema + schema: Schema specification + file_path: Path to the source file + doc_id_base: Base ID for document generation + + Returns: + Document object + """ + content_parts = [ + f"Schema: {schema_name}", + ] + + schema_type = schema.get("type", "object") + content_parts.append(f"Type: {schema_type}") + + description = schema.get("description", "") + if description: + content_parts.append(f"Description: {description}") + + required_fields = schema.get("required", []) + if required_fields: + content_parts.append(f"Required Fields: {', '.join(required_fields)}") + + properties = schema.get("properties", {}) + if properties: + prop_lines = ["Properties:"] + for prop_name, prop_spec in properties.items(): + prop_type = prop_spec.get("type", "unknown") + prop_desc = prop_spec.get("description", "") + prop_required = prop_name in required_fields + prop_lines.append( + f" - {prop_name} ({prop_type}, {'required' if prop_required else 'optional'})" + ) + if prop_desc: + prop_lines.append(f" Description: {prop_desc}") + content_parts.append("\n".join(prop_lines)) + + return Document( + id=f"{doc_id_base}_schema_{schema_name}", + content="\n".join(content_parts), + source_type=self.source_type, + title=f"Schema: {schema_name}", + file_path=str(file_path), + metadata={"section": "schema", "schema_name": schema_name}, + ) + + def _format_info_content(self, info: Dict[str, Any]) -> str: + """Format the API info section. + + Args: + info: Info object from specification + + Returns: + Formatted info content + """ + parts = [] + for key in ["title", "version", "description", "termsOfService", "contact", "license"]: + if key in info: + value = info[key] + if isinstance(value, dict): + if "name" in value: + parts.append(f"{key}: {value['name']}") + if "url" in value: + parts.append(f"{key} URL: {value['url']}") + else: + parts.append(f"{key}: {value}") + return "\n".join(parts) + + def _format_tag_content(self, tag: str, tag_spec: Dict[str, Any]) -> str: + """Format tag content. + + Args: + tag: Tag name + tag_spec: Tag specification + + Returns: + Formatted tag content + """ + parts = [f"Tag: {tag}"] + description = tag_spec.get("description", "") + if description: + parts.append(f"Description: {description}") + external_docs = tag_spec.get("externalDocs", {}) + if external_docs: + docs_url = external_docs.get("url", "") + if docs_url: + parts.append(f"External Docs: {docs_url}") + return "\n".join(parts) + + def _is_supported_file(self, path: Path) -> bool: + """Check if the file is a supported OpenAPI file. + + Args: + path: Path to the file + + Returns: + True if the file extension is supported + """ + return path.suffix.lower() in self.SUPPORTED_EXTENSIONS + + def get_documents(self) -> List[Document]: + """Get all indexed documents. + + Returns: + List of Document objects + """ + return self._documents