fix: resolve CI/CD issues with proper package structure and imports
This commit is contained in:
491
src/local_api_docs_search/indexer/openapi.py
Normal file
491
src/local_api_docs_search/indexer/openapi.py
Normal file
@@ -0,0 +1,491 @@
|
||||
"""OpenAPI/Swagger specification indexer."""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from openapi_spec_validator import validate
|
||||
from yaml import safe_load
|
||||
|
||||
from local_api_docs_search.indexer.base import BaseIndexer
|
||||
from local_api_docs_search.models.document import Document, SourceType
|
||||
|
||||
|
||||
class OpenAPIIndexer(BaseIndexer):
|
||||
"""Indexer for OpenAPI/Swagger specifications."""
|
||||
|
||||
source_type = SourceType.OPENAPI
|
||||
|
||||
SUPPORTED_EXTENSIONS = {".yaml", ".yml", ".json"}
|
||||
|
||||
def __init__(self):
|
||||
self._documents: List[Document] = []
|
||||
|
||||
def index(
|
||||
self, path: Path, recursive: bool = False, batch_size: int = 32
|
||||
) -> List[Document]:
|
||||
"""Index OpenAPI specifications from the given path.
|
||||
|
||||
Args:
|
||||
path: Path to file or directory
|
||||
recursive: Whether to search recursively
|
||||
batch_size: Documents per batch (for progress tracking)
|
||||
|
||||
Returns:
|
||||
List of indexed Document objects
|
||||
"""
|
||||
self._documents = []
|
||||
|
||||
for file_path in self._find_files(path, recursive):
|
||||
try:
|
||||
docs = self._parse_file(file_path)
|
||||
self._documents.extend(docs)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to parse {file_path}: {e}")
|
||||
|
||||
return self._documents
|
||||
|
||||
def _parse_file(self, file_path: Path) -> List[Document]:
|
||||
"""Parse a single OpenAPI file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the OpenAPI file
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
with open(file_path, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
if file_path.suffix == ".json":
|
||||
spec = json.loads(content)
|
||||
else:
|
||||
spec = safe_load(content)
|
||||
|
||||
if spec is None:
|
||||
return []
|
||||
|
||||
validation_errors = self._validate_spec(spec, file_path)
|
||||
if validation_errors:
|
||||
print(f"Warning: Validation errors in {file_path}: {validation_errors}")
|
||||
|
||||
return self._extract_documents(spec, file_path)
|
||||
|
||||
def _validate_spec(
|
||||
self, spec: Dict[str, Any], file_path: Path
|
||||
) -> Optional[str]:
|
||||
"""Validate an OpenAPI specification.
|
||||
|
||||
Args:
|
||||
spec: The parsed specification
|
||||
file_path: Path to the source file
|
||||
|
||||
Returns:
|
||||
None if valid, error message otherwise
|
||||
"""
|
||||
try:
|
||||
validate(spec)
|
||||
return None
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
def _extract_documents(
|
||||
self, spec: Dict[str, Any], file_path: Path
|
||||
) -> List[Document]:
|
||||
"""Extract searchable documents from an OpenAPI spec.
|
||||
|
||||
Args:
|
||||
spec: The parsed OpenAPI specification
|
||||
file_path: Path to the source file
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
documents = []
|
||||
spec_info = spec.get("info", {})
|
||||
title = spec_info.get("title", file_path.stem)
|
||||
version = spec_info.get("version", "unknown")
|
||||
|
||||
doc_id_base = self._generate_id(file_path)
|
||||
|
||||
info_doc = Document(
|
||||
id=f"{doc_id_base}_info",
|
||||
content=self._format_info_content(spec_info),
|
||||
source_type=self.source_type,
|
||||
title=f"{title} - API Info",
|
||||
file_path=str(file_path),
|
||||
metadata={"version": version, "section": "info"},
|
||||
)
|
||||
documents.append(info_doc)
|
||||
|
||||
for path, path_item in spec.get("paths", {}).items():
|
||||
path_docs = self._extract_path_documents(
|
||||
path, path_item, spec, file_path, doc_id_base
|
||||
)
|
||||
documents.extend(path_docs)
|
||||
|
||||
for tag, tag_spec in spec.get("tags", []):
|
||||
tag_doc = Document(
|
||||
id=f"{doc_id_base}_tag_{tag}",
|
||||
content=self._format_tag_content(tag, tag_spec),
|
||||
source_type=self.source_type,
|
||||
title=f"Tag: {tag}",
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "tags", "tag": tag},
|
||||
)
|
||||
documents.append(tag_doc)
|
||||
|
||||
for schema_name, schema in spec.get("components", {}).get("schemas", {}).items():
|
||||
schema_doc = self._extract_schema_document(
|
||||
schema_name, schema, file_path, doc_id_base
|
||||
)
|
||||
if schema_doc:
|
||||
documents.append(schema_doc)
|
||||
|
||||
return documents
|
||||
|
||||
def _extract_path_documents(
|
||||
self,
|
||||
path: str,
|
||||
path_item: Dict[str, Any],
|
||||
spec: Dict[str, Any],
|
||||
file_path: Path,
|
||||
doc_id_base: str,
|
||||
) -> List[Document]:
|
||||
"""Extract documents from a path item.
|
||||
|
||||
Args:
|
||||
path: The path string
|
||||
path_item: The path item specification
|
||||
spec: The full OpenAPI specification
|
||||
file_path: Path to the source file
|
||||
doc_id_base: Base ID for document generation
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
documents = []
|
||||
path_hash = hashlib.md5(path.encode()).hexdigest()[:8]
|
||||
|
||||
methods = ["get", "post", "put", "patch", "delete", "options", "head", "trace"]
|
||||
|
||||
for method in methods:
|
||||
if method in path_item:
|
||||
operation = path_item[method]
|
||||
doc = self._extract_operation_document(
|
||||
method, path, operation, spec, file_path, doc_id_base, path_hash
|
||||
)
|
||||
documents.append(doc)
|
||||
|
||||
summary = path_item.get("summary", "")
|
||||
description = path_item.get("description", "")
|
||||
if summary or description:
|
||||
path_doc = Document(
|
||||
id=f"{doc_id_base}_path_{path_hash}",
|
||||
content=f"Path: {path}\nSummary: {summary}\nDescription: {description}",
|
||||
source_type=self.source_type,
|
||||
title=f"Path: {path}",
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "path", "path": path},
|
||||
)
|
||||
documents.append(path_doc)
|
||||
|
||||
return documents
|
||||
|
||||
def _extract_operation_document(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
operation: Dict[str, Any],
|
||||
spec: Dict[str, Any],
|
||||
file_path: Path,
|
||||
doc_id_base: str,
|
||||
path_hash: str,
|
||||
) -> Document:
|
||||
"""Extract a document from an operation.
|
||||
|
||||
Args:
|
||||
method: HTTP method
|
||||
path: API path
|
||||
operation: The operation specification
|
||||
spec: The full OpenAPI specification
|
||||
file_path: Path to the source file
|
||||
doc_id_base: Base ID for document generation
|
||||
path_hash: Hash of the path for ID generation
|
||||
|
||||
Returns:
|
||||
Document object
|
||||
"""
|
||||
op_id = operation.get("operationId", f"{method}_{path_hash}")
|
||||
summary = operation.get("summary", "")
|
||||
description = operation.get("description", "")
|
||||
deprecated = operation.get("deprecated", False)
|
||||
|
||||
content_parts = [
|
||||
f"Method: {method.upper()}",
|
||||
f"Path: {path}",
|
||||
f"Operation ID: {op_id}",
|
||||
f"Summary: {summary}",
|
||||
f"Description: {description}",
|
||||
]
|
||||
|
||||
if deprecated:
|
||||
content_parts.append("Status: DEPRECATED")
|
||||
|
||||
tags = operation.get("tags", [])
|
||||
if tags:
|
||||
content_parts.append(f"Tags: {', '.join(tags)}")
|
||||
|
||||
parameters = operation.get("parameters", [])
|
||||
if parameters:
|
||||
param_content = self._format_parameters(parameters)
|
||||
content_parts.append(f"Parameters:\n{param_content}")
|
||||
|
||||
request_body = operation.get("requestBody", {})
|
||||
if request_body:
|
||||
rb_content = self._format_request_body(request_body, spec)
|
||||
content_parts.append(f"Request Body:\n{rb_content}")
|
||||
|
||||
responses = operation.get("responses", {})
|
||||
resp_content = self._format_responses(responses)
|
||||
content_parts.append(f"Responses:\n{resp_content}")
|
||||
|
||||
return Document(
|
||||
id=f"{doc_id_base}_{op_id}",
|
||||
content="\n".join(content_parts),
|
||||
source_type=self.source_type,
|
||||
title=f"{method.upper()} {path}",
|
||||
file_path=str(file_path),
|
||||
metadata={
|
||||
"section": "operation",
|
||||
"method": method,
|
||||
"path": path,
|
||||
"operation_id": op_id,
|
||||
"deprecated": deprecated,
|
||||
},
|
||||
)
|
||||
|
||||
def _format_parameters(self, parameters: List[Dict[str, Any]]) -> str:
|
||||
"""Format parameters for display.
|
||||
|
||||
Args:
|
||||
parameters: List of parameter specifications
|
||||
|
||||
Returns:
|
||||
Formatted parameter string
|
||||
"""
|
||||
lines = []
|
||||
for param in parameters:
|
||||
name = param.get("name", "unknown")
|
||||
in_loc = param.get("in", "unknown")
|
||||
required = param.get("required", False)
|
||||
description = param.get("description", "")
|
||||
param_type = param.get("schema", {}).get("type", "any")
|
||||
|
||||
lines.append(
|
||||
f" - {name} ({in_loc}, {'required' if required else 'optional'}): {param_type}"
|
||||
)
|
||||
if description:
|
||||
lines.append(f" Description: {description}")
|
||||
|
||||
return "\n".join(lines) if lines else " No parameters"
|
||||
|
||||
def _format_request_body(
|
||||
self, request_body: Dict[str, Any], spec: Dict[str, Any]
|
||||
) -> str:
|
||||
"""Format request body for display.
|
||||
|
||||
Args:
|
||||
request_body: Request body specification
|
||||
spec: The full OpenAPI specification
|
||||
|
||||
Returns:
|
||||
Formatted request body string
|
||||
"""
|
||||
lines = []
|
||||
description = request_body.get("description", "")
|
||||
if description:
|
||||
lines.append(f"Description: {description}")
|
||||
|
||||
required = request_body.get("required", False)
|
||||
lines.append(f"Required: {required}")
|
||||
|
||||
content = request_body.get("content", {})
|
||||
for content_type, content_spec in content.items():
|
||||
schema = content_spec.get("schema", {})
|
||||
schema_ref = schema.get("$ref", "")
|
||||
if schema_ref:
|
||||
resolved = self._resolve_ref(schema_ref, spec)
|
||||
if resolved:
|
||||
schema = resolved
|
||||
lines.append(f"Content-Type: {content_type}")
|
||||
lines.append(f"Schema: {json.dumps(schema, indent=4)}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_responses(self, responses: Dict[str, Any]) -> str:
|
||||
"""Format responses for display.
|
||||
|
||||
Args:
|
||||
responses: Response specifications
|
||||
|
||||
Returns:
|
||||
Formatted response string
|
||||
"""
|
||||
lines = []
|
||||
for status_code, response in responses.items():
|
||||
description = response.get("description", "")
|
||||
lines.append(f" {status_code}: {description}")
|
||||
|
||||
content = response.get("content", {})
|
||||
for content_type, content_spec in content.items():
|
||||
schema = content_spec.get("schema", {})
|
||||
if schema:
|
||||
schema_type = schema.get("type", "unknown")
|
||||
lines.append(f" Content-Type: {content_type}")
|
||||
lines.append(f" Schema Type: {schema_type}")
|
||||
|
||||
return "\n".join(lines) if lines else " No responses defined"
|
||||
|
||||
def _resolve_ref(self, ref: str, spec: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""Resolve a $ref reference.
|
||||
|
||||
Args:
|
||||
ref: The reference string
|
||||
spec: The full OpenAPI specification
|
||||
|
||||
Returns:
|
||||
Resolved schema or None
|
||||
"""
|
||||
if not ref.startswith("#/"):
|
||||
return None
|
||||
|
||||
parts = ref[2:].split("/")
|
||||
current = spec
|
||||
|
||||
for part in parts:
|
||||
if isinstance(current, dict):
|
||||
current = current.get(part)
|
||||
else:
|
||||
return None
|
||||
|
||||
return current
|
||||
|
||||
def _extract_schema_document(
|
||||
self,
|
||||
schema_name: str,
|
||||
schema: Dict[str, Any],
|
||||
file_path: Path,
|
||||
doc_id_base: str,
|
||||
) -> Document:
|
||||
"""Extract a document from a schema.
|
||||
|
||||
Args:
|
||||
schema_name: Name of the schema
|
||||
schema: Schema specification
|
||||
file_path: Path to the source file
|
||||
doc_id_base: Base ID for document generation
|
||||
|
||||
Returns:
|
||||
Document object
|
||||
"""
|
||||
content_parts = [
|
||||
f"Schema: {schema_name}",
|
||||
]
|
||||
|
||||
schema_type = schema.get("type", "object")
|
||||
content_parts.append(f"Type: {schema_type}")
|
||||
|
||||
description = schema.get("description", "")
|
||||
if description:
|
||||
content_parts.append(f"Description: {description}")
|
||||
|
||||
required_fields = schema.get("required", [])
|
||||
if required_fields:
|
||||
content_parts.append(f"Required Fields: {', '.join(required_fields)}")
|
||||
|
||||
properties = schema.get("properties", {})
|
||||
if properties:
|
||||
prop_lines = ["Properties:"]
|
||||
for prop_name, prop_spec in properties.items():
|
||||
prop_type = prop_spec.get("type", "unknown")
|
||||
prop_desc = prop_spec.get("description", "")
|
||||
prop_required = prop_name in required_fields
|
||||
prop_lines.append(
|
||||
f" - {prop_name} ({prop_type}, {'required' if prop_required else 'optional'})"
|
||||
)
|
||||
if prop_desc:
|
||||
prop_lines.append(f" Description: {prop_desc}")
|
||||
content_parts.append("\n".join(prop_lines))
|
||||
|
||||
return Document(
|
||||
id=f"{doc_id_base}_schema_{schema_name}",
|
||||
content="\n".join(content_parts),
|
||||
source_type=self.source_type,
|
||||
title=f"Schema: {schema_name}",
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "schema", "schema_name": schema_name},
|
||||
)
|
||||
|
||||
def _format_info_content(self, info: Dict[str, Any]) -> str:
|
||||
"""Format the API info section.
|
||||
|
||||
Args:
|
||||
info: Info object from specification
|
||||
|
||||
Returns:
|
||||
Formatted info content
|
||||
"""
|
||||
parts = []
|
||||
for key in ["title", "version", "description", "termsOfService", "contact", "license"]:
|
||||
if key in info:
|
||||
value = info[key]
|
||||
if isinstance(value, dict):
|
||||
if "name" in value:
|
||||
parts.append(f"{key}: {value['name']}")
|
||||
if "url" in value:
|
||||
parts.append(f"{key} URL: {value['url']}")
|
||||
else:
|
||||
parts.append(f"{key}: {value}")
|
||||
return "\n".join(parts)
|
||||
|
||||
def _format_tag_content(self, tag: str, tag_spec: Dict[str, Any]) -> str:
|
||||
"""Format tag content.
|
||||
|
||||
Args:
|
||||
tag: Tag name
|
||||
tag_spec: Tag specification
|
||||
|
||||
Returns:
|
||||
Formatted tag content
|
||||
"""
|
||||
parts = [f"Tag: {tag}"]
|
||||
description = tag_spec.get("description", "")
|
||||
if description:
|
||||
parts.append(f"Description: {description}")
|
||||
external_docs = tag_spec.get("externalDocs", {})
|
||||
if external_docs:
|
||||
docs_url = external_docs.get("url", "")
|
||||
if docs_url:
|
||||
parts.append(f"External Docs: {docs_url}")
|
||||
return "\n".join(parts)
|
||||
|
||||
def _is_supported_file(self, path: Path) -> bool:
|
||||
"""Check if the file is a supported OpenAPI file.
|
||||
|
||||
Args:
|
||||
path: Path to the file
|
||||
|
||||
Returns:
|
||||
True if the file extension is supported
|
||||
"""
|
||||
return path.suffix.lower() in self.SUPPORTED_EXTENSIONS
|
||||
|
||||
def get_documents(self) -> List[Document]:
|
||||
"""Get all indexed documents.
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
return self._documents
|
||||
Reference in New Issue
Block a user