Add indexer modules (base, openapi, readme, code)
Some checks failed
CI / build (push) Has been cancelled
CI / test (push) Has been cancelled

This commit is contained in:
2026-02-03 01:21:33 +00:00
parent 92b9c5a441
commit 42ff56b5d8

492
src/indexer/openapi.py Normal file
View File

@@ -0,0 +1,492 @@
"""OpenAPI/Swagger specification indexer."""
import hashlib
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from openapi_spec_validator import validate
from openapi_spec_validator.versions import consts as validator_versions
from yaml import safe_load
from src.indexer.base import BaseIndexer
from src.models.document import Document, SourceType
class OpenAPIIndexer(BaseIndexer):
"""Indexer for OpenAPI/Swagger specifications."""
source_type = SourceType.OPENAPI
SUPPORTED_EXTENSIONS = {".yaml", ".yml", ".json"}
def __init__(self):
self._documents: List[Document] = []
def index(
self, path: Path, recursive: bool = False, batch_size: int = 32
) -> List[Document]:
"""Index OpenAPI specifications from the given path.
Args:
path: Path to file or directory
recursive: Whether to search recursively
batch_size: Documents per batch (for progress tracking)
Returns:
List of indexed Document objects
"""
self._documents = []
for file_path in self._find_files(path, recursive):
try:
docs = self._parse_file(file_path)
self._documents.extend(docs)
except Exception as e:
print(f"Warning: Failed to parse {file_path}: {e}")
return self._documents
def _parse_file(self, file_path: Path) -> List[Document]:
"""Parse a single OpenAPI file.
Args:
file_path: Path to the OpenAPI file
Returns:
List of Document objects
"""
with open(file_path, "r") as f:
content = f.read()
if file_path.suffix == ".json":
spec = json.loads(content)
else:
spec = safe_load(content)
if spec is None:
return []
validation_errors = self._validate_spec(spec, file_path)
if validation_errors:
print(f"Warning: Validation errors in {file_path}: {validation_errors}")
return self._extract_documents(spec, file_path)
def _validate_spec(
self, spec: Dict[str, Any], file_path: Path
) -> Optional[str]:
"""Validate an OpenAPI specification.
Args:
spec: The parsed specification
file_path: Path to the source file
Returns:
None if valid, error message otherwise
"""
try:
validate(spec)
return None
except Exception as e:
return str(e)
def _extract_documents(
self, spec: Dict[str, Any], file_path: Path
) -> List[Document]:
"""Extract searchable documents from an OpenAPI spec.
Args:
spec: The parsed OpenAPI specification
file_path: Path to the source file
Returns:
List of Document objects
"""
documents = []
spec_info = spec.get("info", {})
title = spec_info.get("title", file_path.stem)
version = spec_info.get("version", "unknown")
doc_id_base = self._generate_id(file_path)
info_doc = Document(
id=f"{doc_id_base}_info",
content=self._format_info_content(spec_info),
source_type=self.source_type,
title=f"{title} - API Info",
file_path=str(file_path),
metadata={"version": version, "section": "info"},
)
documents.append(info_doc)
for path, path_item in spec.get("paths", {}).items():
path_docs = self._extract_path_documents(
path, path_item, spec, file_path, doc_id_base
)
documents.extend(path_docs)
for tag, tag_spec in spec.get("tags", []):
tag_doc = Document(
id=f"{doc_id_base}_tag_{tag}",
content=self._format_tag_content(tag, tag_spec),
source_type=self.source_type,
title=f"Tag: {tag}",
file_path=str(file_path),
metadata={"section": "tags", "tag": tag},
)
documents.append(tag_doc)
for schema_name, schema in spec.get("components", {}).get("schemas", {}).items():
schema_doc = self._extract_schema_document(
schema_name, schema, file_path, doc_id_base
)
if schema_doc:
documents.append(schema_doc)
return documents
def _extract_path_documents(
self,
path: str,
path_item: Dict[str, Any],
spec: Dict[str, Any],
file_path: Path,
doc_id_base: str,
) -> List[Document]:
"""Extract documents from a path item.
Args:
path: The path string
path_item: The path item specification
spec: The full OpenAPI specification
file_path: Path to the source file
doc_id_base: Base ID for document generation
Returns:
List of Document objects
"""
documents = []
path_hash = hashlib.md5(path.encode()).hexdigest()[:8]
methods = ["get", "post", "put", "patch", "delete", "options", "head", "trace"]
for method in methods:
if method in path_item:
operation = path_item[method]
doc = self._extract_operation_document(
method, path, operation, spec, file_path, doc_id_base, path_hash
)
documents.append(doc)
summary = path_item.get("summary", "")
description = path_item.get("description", "")
if summary or description:
path_doc = Document(
id=f"{doc_id_base}_path_{path_hash}",
content=f"Path: {path}\nSummary: {summary}\nDescription: {description}",
source_type=self.source_type,
title=f"Path: {path}",
file_path=str(file_path),
metadata={"section": "path", "path": path},
)
documents.append(path_doc)
return documents
def _extract_operation_document(
self,
method: str,
path: str,
operation: Dict[str, Any],
spec: Dict[str, Any],
file_path: Path,
doc_id_base: str,
path_hash: str,
) -> Document:
"""Extract a document from an operation.
Args:
method: HTTP method
path: API path
operation: The operation specification
spec: The full OpenAPI specification
file_path: Path to the source file
doc_id_base: Base ID for document generation
path_hash: Hash of the path for ID generation
Returns:
Document object
"""
op_id = operation.get("operationId", f"{method}_{path_hash}")
summary = operation.get("summary", "")
description = operation.get("description", "")
deprecated = operation.get("deprecated", False)
content_parts = [
f"Method: {method.upper()}",
f"Path: {path}",
f"Operation ID: {op_id}",
f"Summary: {summary}",
f"Description: {description}",
]
if deprecated:
content_parts.append("Status: DEPRECATED")
tags = operation.get("tags", [])
if tags:
content_parts.append(f"Tags: {', '.join(tags)}")
parameters = operation.get("parameters", [])
if parameters:
param_content = self._format_parameters(parameters)
content_parts.append(f"Parameters:\n{param_content}")
request_body = operation.get("requestBody", {})
if request_body:
rb_content = self._format_request_body(request_body, spec)
content_parts.append(f"Request Body:\n{rb_content}")
responses = operation.get("responses", {})
resp_content = self._format_responses(responses)
content_parts.append(f"Responses:\n{resp_content}")
return Document(
id=f"{doc_id_base}_{op_id}",
content="\n".join(content_parts),
source_type=self.source_type,
title=f"{method.upper()} {path}",
file_path=str(file_path),
metadata={
"section": "operation",
"method": method,
"path": path,
"operation_id": op_id,
"deprecated": deprecated,
},
)
def _format_parameters(self, parameters: List[Dict[str, Any]]) -> str:
"""Format parameters for display.
Args:
parameters: List of parameter specifications
Returns:
Formatted parameter string
"""
lines = []
for param in parameters:
name = param.get("name", "unknown")
in_loc = param.get("in", "unknown")
required = param.get("required", False)
description = param.get("description", "")
param_type = param.get("schema", {}).get("type", "any")
lines.append(
f" - {name} ({in_loc}, {'required' if required else 'optional'}): {param_type}"
)
if description:
lines.append(f" Description: {description}")
return "\n".join(lines) if lines else " No parameters"
def _format_request_body(
self, request_body: Dict[str, Any], spec: Dict[str, Any]
) -> str:
"""Format request body for display.
Args:
request_body: Request body specification
spec: The full OpenAPI specification
Returns:
Formatted request body string
"""
lines = []
description = request_body.get("description", "")
if description:
lines.append(f"Description: {description}")
required = request_body.get("required", False)
lines.append(f"Required: {required}")
content = request_body.get("content", {})
for content_type, content_spec in content.items():
schema = content_spec.get("schema", {})
schema_ref = schema.get("$ref", "")
if schema_ref:
resolved = self._resolve_ref(schema_ref, spec)
if resolved:
schema = resolved
lines.append(f"Content-Type: {content_type}")
lines.append(f"Schema: {json.dumps(schema, indent=4)}")
return "\n".join(lines)
def _format_responses(self, responses: Dict[str, Any]) -> str:
"""Format responses for display.
Args:
responses: Response specifications
Returns:
Formatted response string
"""
lines = []
for status_code, response in responses.items():
description = response.get("description", "")
lines.append(f" {status_code}: {description}")
content = response.get("content", {})
for content_type, content_spec in content.items():
schema = content_spec.get("schema", {})
if schema:
schema_type = schema.get("type", "unknown")
lines.append(f" Content-Type: {content_type}")
lines.append(f" Schema Type: {schema_type}")
return "\n".join(lines) if lines else " No responses defined"
def _resolve_ref(self, ref: str, spec: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Resolve a $ref reference.
Args:
ref: The reference string
spec: The full OpenAPI specification
Returns:
Resolved schema or None
"""
if not ref.startswith("#/"):
return None
parts = ref[2:].split("/")
current = spec
for part in parts:
if isinstance(current, dict):
current = current.get(part)
else:
return None
return current
def _extract_schema_document(
self,
schema_name: str,
schema: Dict[str, Any],
file_path: Path,
doc_id_base: str,
) -> Document:
"""Extract a document from a schema.
Args:
schema_name: Name of the schema
schema: Schema specification
file_path: Path to the source file
doc_id_base: Base ID for document generation
Returns:
Document object
"""
content_parts = [
f"Schema: {schema_name}",
]
schema_type = schema.get("type", "object")
content_parts.append(f"Type: {schema_type}")
description = schema.get("description", "")
if description:
content_parts.append(f"Description: {description}")
required_fields = schema.get("required", [])
if required_fields:
content_parts.append(f"Required Fields: {', '.join(required_fields)}")
properties = schema.get("properties", {})
if properties:
prop_lines = ["Properties:"]
for prop_name, prop_spec in properties.items():
prop_type = prop_spec.get("type", "unknown")
prop_desc = prop_spec.get("description", "")
prop_required = prop_name in required_fields
prop_lines.append(
f" - {prop_name} ({prop_type}, {'required' if prop_required else 'optional'})"
)
if prop_desc:
prop_lines.append(f" Description: {prop_desc}")
content_parts.append("\n".join(prop_lines))
return Document(
id=f"{doc_id_base}_schema_{schema_name}",
content="\n".join(content_parts),
source_type=self.source_type,
title=f"Schema: {schema_name}",
file_path=str(file_path),
metadata={"section": "schema", "schema_name": schema_name},
)
def _format_info_content(self, info: Dict[str, Any]) -> str:
"""Format the API info section.
Args:
info: Info object from specification
Returns:
Formatted info content
"""
parts = []
for key in ["title", "version", "description", "termsOfService", "contact", "license"]:
if key in info:
value = info[key]
if isinstance(value, dict):
if "name" in value:
parts.append(f"{key}: {value['name']}")
if "url" in value:
parts.append(f"{key} URL: {value['url']}")
else:
parts.append(f"{key}: {value}")
return "\n".join(parts)
def _format_tag_content(self, tag: str, tag_spec: Dict[str, Any]) -> str:
"""Format tag content.
Args:
tag: Tag name
tag_spec: Tag specification
Returns:
Formatted tag content
"""
parts = [f"Tag: {tag}"]
description = tag_spec.get("description", "")
if description:
parts.append(f"Description: {description}")
external_docs = tag_spec.get("externalDocs", {})
if external_docs:
docs_url = external_docs.get("url", "")
if docs_url:
parts.append(f"External Docs: {docs_url}")
return "\n".join(parts)
def _is_supported_file(self, path: Path) -> bool:
"""Check if the file is a supported OpenAPI file.
Args:
path: Path to the file
Returns:
True if the file extension is supported
"""
return path.suffix.lower() in self.SUPPORTED_EXTENSIONS
def get_documents(self) -> List[Document]:
"""Get all indexed documents.
Returns:
List of Document objects
"""
return self._documents