From 8dcf6cc0bef938fa33f6ce8a14e468cc4632f013 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Tue, 3 Feb 2026 01:21:36 +0000 Subject: [PATCH] Add indexer modules (base, openapi, readme, code) --- src/indexer/readme.py | 257 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 src/indexer/readme.py diff --git a/src/indexer/readme.py b/src/indexer/readme.py new file mode 100644 index 0000000..1fc4abf --- /dev/null +++ b/src/indexer/readme.py @@ -0,0 +1,257 @@ +"""README/Markdown file indexer.""" + +import hashlib +from pathlib import Path +from typing import Generator, List, Tuple + +import yaml +from markdown import markdown + +from src.indexer.base import BaseIndexer +from src.models.document import Document, SourceType + + +class READMEIndexer(BaseIndexer): + """Indexer for README and Markdown files.""" + + source_type = SourceType.README + + SUPPORTED_EXTENSIONS = {".md", ".markdown", ".txt"} + + def __init__(self): + self._documents: List[Document] = [] + + def index( + self, path: Path, recursive: bool = False, chunk_size: int = 1000 + ) -> List[Document]: + """Index README/Markdown files from the given path. + + Args: + path: Path to file or directory + recursive: Whether to search recursively + chunk_size: Maximum chunk size in characters + + Returns: + List of indexed Document objects + """ + self._documents = [] + + for file_path in self._find_files(path, recursive): + try: + docs = self._parse_file(file_path, chunk_size) + self._documents.extend(docs) + except Exception as e: + print(f"Warning: Failed to parse {file_path}: {e}") + + return self._documents + + def _parse_file( + self, file_path: Path, chunk_size: int = 1000 + ) -> List[Document]: + """Parse a single Markdown file. + + Args: + file_path: Path to the Markdown file + chunk_size: Maximum chunk size + + Returns: + List of Document objects + """ + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + title = self._extract_title(content, file_path.stem) + sections = self._parse_sections(content) + + documents = [] + doc_id_base = self._generate_id(file_path) + + if not sections: + doc = Document( + id=doc_id_base, + content=content.strip(), + source_type=self.source_type, + title=title, + file_path=str(file_path), + metadata={"section": "root"}, + ) + documents.append(doc) + else: + for i, (section_title, section_content, level) in enumerate(sections): + chunks = self._chunk_content( + section_content, section_title, chunk_size + ) + for j, chunk in enumerate(chunks): + doc_id = f"{doc_id_base}_section_{i}_{j}" if len(chunks) > 1 else f"{doc_id_base}_section_{i}" + doc = Document( + id=doc_id, + content=chunk, + source_type=self.source_type, + title=f"{title} - {section_title}", + file_path=str(file_path), + metadata={ + "section": section_title, + "section_level": level, + "chunk_index": j, + "total_chunks": len(chunks), + }, + ) + documents.append(doc) + + if len(sections) == 1: + full_doc = Document( + id=f"{doc_id_base}_full", + content=content.strip(), + source_type=self.source_type, + title=f"{title} (Full)", + file_path=str(file_path), + metadata={"section": "full_document"}, + ) + documents.append(full_doc) + + return documents + + def _extract_title(self, content: str, default: str) -> str: + """Extract the title from Markdown content. + + Args: + content: Markdown content + default: Default title if none found + + Returns: + Extracted title + """ + for line in content.split("\n"): + line = line.strip() + if line.startswith("# "): + return line[2:].strip() + return default + + def _parse_sections( + self, content: str + ) -> List[Tuple[str, str, int]]: + """Parse Markdown content into sections. + + Args: + content: Markdown content + + Returns: + List of (title, content, level) tuples + """ + sections = [] + lines = content.split("\n") + current_section = ("", "", 0) + current_lines = [] + + in_code_block = False + code_fence = "```" + + for line in lines: + if line.startswith(code_fence): + in_code_block = not in_code_block + if not in_code_block: + current_lines.append(line) + continue + + if not in_code_block and line.startswith("#"): + if current_section[1]: + sections.append( + (current_section[0], "\n".join(current_lines), current_section[2]) + ) + + header = line.lstrip("#") + level = len(line) - len(header) + title = header.strip() + current_lines = [] + current_section = (title, "", level) + else: + current_lines.append(line) + + if current_section[1]: + sections.append( + (current_section[0], "\n".join(current_lines), current_section[2]) + ) + + return sections + + def _chunk_content( + self, content: str, section_title: str, max_size: int + ) -> List[str]: + """Chunk content into smaller pieces. + + Args: + content: Section content + section_title: Section title for context + max_size: Maximum chunk size + + Returns: + List of content chunks + """ + if len(content) <= max_size: + return [content] + + chunks = [] + current_chunk = [] + current_size = 0 + + paragraphs = self._split_paragraphs(content) + + for para in paragraphs: + para_size = len(para) + + if current_size + para_size > max_size and current_chunk: + chunks.append("\n\n".join(current_chunk)) + current_chunk = [] + current_size = 0 + + current_chunk.append(para) + current_size += para_size + + if current_chunk: + chunks.append("\n\n".join(current_chunk)) + + return chunks + + def _split_paragraphs(self, content: str) -> List[str]: + """Split content into paragraphs. + + Args: + content: Section content + + Returns: + List of paragraphs + """ + paragraphs = [] + current_lines = [] + + for line in content.split("\n"): + stripped = line.strip() + if stripped: + current_lines.append(line) + elif current_lines: + paragraphs.append("\n".join(current_lines)) + current_lines = [] + + if current_lines: + paragraphs.append("\n".join(current_lines)) + + return paragraphs + + def _is_supported_file(self, path: Path) -> bool: + """Check if the file is a supported Markdown file. + + Args: + path: Path to the file + + Returns: + True if the file extension is supported + """ + return path.suffix.lower() in self.SUPPORTED_EXTENSIONS + + def get_documents(self) -> List[Document]: + """Get all indexed documents. + + Returns: + List of Document objects + """ + return self._documents