fix: resolve CI/CD issues with proper package structure and imports
This commit is contained in:
254
src/local_api_docs_search/indexer/readme.py
Normal file
254
src/local_api_docs_search/indexer/readme.py
Normal file
@@ -0,0 +1,254 @@
|
||||
"""README/Markdown file indexer."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
|
||||
from local_api_docs_search.indexer.base import BaseIndexer
|
||||
from local_api_docs_search.models.document import Document, SourceType
|
||||
|
||||
|
||||
class READMEIndexer(BaseIndexer):
|
||||
"""Indexer for README and Markdown files."""
|
||||
|
||||
source_type = SourceType.README
|
||||
|
||||
SUPPORTED_EXTENSIONS = {".md", ".markdown", ".txt"}
|
||||
|
||||
def __init__(self):
|
||||
self._documents: List[Document] = []
|
||||
|
||||
def index(
|
||||
self, path: Path, recursive: bool = False, chunk_size: int = 1000
|
||||
) -> List[Document]:
|
||||
"""Index README/Markdown files from the given path.
|
||||
|
||||
Args:
|
||||
path: Path to file or directory
|
||||
recursive: Whether to search recursively
|
||||
chunk_size: Maximum chunk size in characters
|
||||
|
||||
Returns:
|
||||
List of indexed Document objects
|
||||
"""
|
||||
self._documents = []
|
||||
|
||||
for file_path in self._find_files(path, recursive):
|
||||
try:
|
||||
docs = self._parse_file(file_path, chunk_size)
|
||||
self._documents.extend(docs)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to parse {file_path}: {e}")
|
||||
|
||||
return self._documents
|
||||
|
||||
def _parse_file(
|
||||
self, file_path: Path, chunk_size: int = 1000
|
||||
) -> List[Document]:
|
||||
"""Parse a single Markdown file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the Markdown file
|
||||
chunk_size: Maximum chunk size
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
title = self._extract_title(content, file_path.stem)
|
||||
sections = self._parse_sections(content)
|
||||
|
||||
documents = []
|
||||
doc_id_base = self._generate_id(file_path)
|
||||
|
||||
if not sections:
|
||||
doc = Document(
|
||||
id=doc_id_base,
|
||||
content=content.strip(),
|
||||
source_type=self.source_type,
|
||||
title=title,
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "root"},
|
||||
)
|
||||
documents.append(doc)
|
||||
else:
|
||||
for i, (section_title, section_content, level) in enumerate(sections):
|
||||
chunks = self._chunk_content(
|
||||
section_content, section_title, chunk_size
|
||||
)
|
||||
for j, chunk in enumerate(chunks):
|
||||
doc_id = f"{doc_id_base}_section_{i}_{j}" if len(chunks) > 1 else f"{doc_id_base}_section_{i}"
|
||||
doc = Document(
|
||||
id=doc_id,
|
||||
content=chunk,
|
||||
source_type=self.source_type,
|
||||
title=f"{title} - {section_title}",
|
||||
file_path=str(file_path),
|
||||
metadata={
|
||||
"section": section_title,
|
||||
"section_level": level,
|
||||
"chunk_index": j,
|
||||
"total_chunks": len(chunks),
|
||||
},
|
||||
)
|
||||
documents.append(doc)
|
||||
|
||||
if len(sections) == 1:
|
||||
full_doc = Document(
|
||||
id=f"{doc_id_base}_full",
|
||||
content=content.strip(),
|
||||
source_type=self.source_type,
|
||||
title=f"{title} (Full)",
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "full_document"},
|
||||
)
|
||||
documents.append(full_doc)
|
||||
|
||||
return documents
|
||||
|
||||
def _extract_title(self, content: str, default: str) -> str:
|
||||
"""Extract the title from Markdown content.
|
||||
|
||||
Args:
|
||||
content: Markdown content
|
||||
default: Default title if none found
|
||||
|
||||
Returns:
|
||||
Extracted title
|
||||
"""
|
||||
for line in content.split("\n"):
|
||||
line = line.strip()
|
||||
if line.startswith("# "):
|
||||
return line[2:].strip()
|
||||
return default
|
||||
|
||||
def _parse_sections(
|
||||
self, content: str
|
||||
) -> List[Tuple[str, str, int]]:
|
||||
"""Parse Markdown content into sections.
|
||||
|
||||
Args:
|
||||
content: Markdown content
|
||||
|
||||
Returns:
|
||||
List of (title, content, level) tuples
|
||||
"""
|
||||
sections = []
|
||||
lines = content.split("\n")
|
||||
current_section = ("", "", 0)
|
||||
current_lines = []
|
||||
|
||||
in_code_block = False
|
||||
code_fence = "```"
|
||||
|
||||
for line in lines:
|
||||
if line.startswith(code_fence):
|
||||
in_code_block = not in_code_block
|
||||
if not in_code_block:
|
||||
current_lines.append(line)
|
||||
continue
|
||||
|
||||
if not in_code_block and line.startswith("#"):
|
||||
if current_section[1]:
|
||||
sections.append(
|
||||
(current_section[0], "\n".join(current_lines), current_section[2])
|
||||
)
|
||||
|
||||
header = line.lstrip("#")
|
||||
level = len(line) - len(header)
|
||||
title = header.strip()
|
||||
current_lines = []
|
||||
current_section = (title, "", level)
|
||||
else:
|
||||
current_lines.append(line)
|
||||
|
||||
if current_section[1]:
|
||||
sections.append(
|
||||
(current_section[0], "\n".join(current_lines), current_section[2])
|
||||
)
|
||||
|
||||
return sections
|
||||
|
||||
def _chunk_content(
|
||||
self, content: str, section_title: str, max_size: int
|
||||
) -> List[str]:
|
||||
"""Chunk content into smaller pieces.
|
||||
|
||||
Args:
|
||||
content: Section content
|
||||
section_title: Section title for context
|
||||
max_size: Maximum chunk size
|
||||
|
||||
Returns:
|
||||
List of content chunks
|
||||
"""
|
||||
if len(content) <= max_size:
|
||||
return [content]
|
||||
|
||||
chunks = []
|
||||
current_chunk = []
|
||||
current_size = 0
|
||||
|
||||
paragraphs = self._split_paragraphs(content)
|
||||
|
||||
for para in paragraphs:
|
||||
para_size = len(para)
|
||||
|
||||
if current_size + para_size > max_size and current_chunk:
|
||||
chunks.append("\n\n".join(current_chunk))
|
||||
current_chunk = []
|
||||
current_size = 0
|
||||
|
||||
current_chunk.append(para)
|
||||
current_size += para_size
|
||||
|
||||
if current_chunk:
|
||||
chunks.append("\n\n".join(current_chunk))
|
||||
|
||||
return chunks
|
||||
|
||||
def _split_paragraphs(self, content: str) -> List[str]:
|
||||
"""Split content into paragraphs.
|
||||
|
||||
Args:
|
||||
content: Section content
|
||||
|
||||
Returns:
|
||||
List of paragraphs
|
||||
"""
|
||||
paragraphs = []
|
||||
current_lines = []
|
||||
|
||||
for line in content.split("\n"):
|
||||
stripped = line.strip()
|
||||
if stripped:
|
||||
current_lines.append(line)
|
||||
elif current_lines:
|
||||
paragraphs.append("\n".join(current_lines))
|
||||
current_lines = []
|
||||
|
||||
if current_lines:
|
||||
paragraphs.append("\n".join(current_lines))
|
||||
|
||||
return paragraphs
|
||||
|
||||
def _is_supported_file(self, path: Path) -> bool:
|
||||
"""Check if the file is a supported Markdown file.
|
||||
|
||||
Args:
|
||||
path: Path to the file
|
||||
|
||||
Returns:
|
||||
True if the file extension is supported
|
||||
"""
|
||||
return path.suffix.lower() in self.SUPPORTED_EXTENSIONS
|
||||
|
||||
def get_documents(self) -> List[Document]:
|
||||
"""Get all indexed documents.
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
return self._documents
|
||||
Reference in New Issue
Block a user