diff --git a/src/indexer/code.py b/src/indexer/code.py new file mode 100644 index 0000000..9203f67 --- /dev/null +++ b/src/indexer/code.py @@ -0,0 +1,545 @@ +"""Code comment indexer for Python, JavaScript, and TypeScript files.""" + +import ast +import hashlib +import re +from pathlib import Path +from typing import Any, Dict, Generator, List, Optional, Tuple + +from src.indexer.base import BaseIndexer +from src.models.document import Document, SourceType + + +class CodeIndexer(BaseIndexer): + """Indexer for code comments and docstrings.""" + + source_type = SourceType.CODE + + SUPPORTED_EXTENSIONS = { + ".py": "python", + ".js": "javascript", + ".jsx": "javascript", + ".ts": "typescript", + ".tsx": "typescript", + } + + def __init__(self): + self._documents: List[Document] = [] + self._parsed_files: Dict[str, Any] = {} + + def index( + self, path: Path, recursive: bool = False, batch_size: int = 32 + ) -> List[Document]: + """Index code files from the given path. + + Args: + path: Path to file or directory + recursive: Whether to search recursively + batch_size: Documents per batch (for progress tracking) + + Returns: + List of indexed Document objects + """ + self._documents = [] + self._parsed_files = {} + + for file_path in self._find_files(path, recursive): + try: + docs = self._parse_file(file_path) + self._documents.extend(docs) + except Exception as e: + print(f"Warning: Failed to parse {file_path}: {e}") + + return self._documents + + def _parse_file(self, file_path: Path) -> List[Document]: + """Parse a single code file. + + Args: + file_path: Path to the code file + + Returns: + List of Document objects + """ + ext = file_path.suffix.lower() + language = self.SUPPORTED_EXTENSIONS.get(ext) + + if language is None: + return [] + + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + self._parsed_files[str(file_path)] = content + + if language == "python": + return self._parse_python(content, file_path) + elif language in ("javascript", "typescript"): + return self._parse_js_ts(content, file_path, language) + + return [] + + def _parse_python(self, content: str, file_path: Path) -> List[Document]: + """Parse Python file for docstrings. + + Args: + content: Python file content + file_path: Path to the file + + Returns: + List of Document objects + """ + documents = [] + doc_id_base = self._generate_id(file_path) + + try: + tree = ast.parse(content) + except SyntaxError: + return [] + + module_doc = self._get_module_docstring(content) + if module_doc: + doc = Document( + id=f"{doc_id_base}_module", + content=module_doc, + source_type=self.source_type, + title=f"Module: {file_path.stem}", + file_path=str(file_path), + metadata={"doc_type": "module"}, + ) + documents.append(doc) + + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef): + doc = self._parse_python_function(node, file_path, doc_id_base) + if doc: + documents.append(doc) + elif isinstance(node, ast.ClassDef): + doc = self._parse_python_class(node, file_path, doc_id_base) + if doc: + documents.append(doc) + + if documents: + index_doc = Document( + id=f"{doc_id_base}_index", + content=self._generate_python_index(tree, file_path), + source_type=self.source_type, + title=f"Index: {file_path.stem}", + file_path=str(file_path), + metadata={"doc_type": "index"}, + ) + documents.append(index_doc) + + return documents + + def _get_module_docstring(self, content: str) -> Optional[str]: + """Extract module docstring. + + Args: + content: Python file content + + Returns: + Module docstring or None + """ + tree = ast.parse(content) + if tree.body and isinstance(tree.body[0], ast.Expr): + docstring = tree.body[0].value + if isinstance(docstring, ast.Constant) and isinstance( + docstring.value, str + ): + return docstring.value + return None + + def _parse_python_function( + self, node: ast.FunctionDef, file_path: Path, doc_id_base: str + ) -> Optional[Document]: + """Parse a Python function for docstring. + + Args: + node: AST function node + file_path: Path to the file + doc_id_base: Base ID for document generation + + Returns: + Document or None + """ + docstring = self._get_docstring(node) + if not docstring: + return None + + func_info = self._extract_python_function_info(node) + + content = f"Function: {node.name}\n" + content += f"Docstring:\n{docstring}\n" + content += f"Parameters: {', '.join(func_info['args'])}\n" + content += f"Returns: {func_info['returns']}\n" + content += f"Line: {node.lineno}" + + return Document( + id=f"{doc_id_base}_func_{node.name}", + content=content, + source_type=self.source_type, + title=f"Function: {node.name}", + file_path=str(file_path), + metadata={ + "doc_type": "function", + "function_name": node.name, + "line": node.lineno, + }, + ) + + def _parse_python_class( + self, node: ast.ClassDef, file_path: Path, doc_id_base: str + ) -> Optional[Document]: + """Parse a Python class for docstring. + + Args: + node: AST class node + file_path: Path to the file + doc_id_base: Base ID for document generation + + Returns: + Document or None + """ + docstring = self._get_docstring(node) + if not docstring: + return None + + methods = [] + attributes = [] + + for item in node.body: + if isinstance(item, ast.FunctionDef) or isinstance( + item, ast.AsyncFunctionDef + ): + if not item.name.startswith("_"): + methods.append(item.name) + elif isinstance(item, ast.AnnAssign) and isinstance( + item.target, ast.Name + ): + attributes.append(item.target.name) + + content = f"Class: {node.name}\n" + content += f"Docstring:\n{docstring}\n" + if attributes: + content += f"Attributes: {', '.join(attributes)}\n" + if methods: + content += f"Methods: {', '.join(methods)}\n" + content += f"Line: {node.lineno}" + + return Document( + id=f"{doc_id_base}_class_{node.name}", + content=content, + source_type=self.source_type, + title=f"Class: {node.name}", + file_path=str(file_path), + metadata={ + "doc_type": "class", + "class_name": node.name, + "line": node.lineno, + }, + ) + + def _get_docstring(self, node: ast.AST) -> Optional[str]: + """Extract docstring from an AST node. + + Args: + node: AST node + + Returns: + Docstring or None + """ + if hasattr(node, "body") and node.body: + first = node.body[0] + if isinstance(first, ast.Expr) and isinstance(first.value, ast.Constant): + value = first.value.value + if isinstance(value, str): + return value + return None + + def _extract_python_function_info( + self, node: ast.FunctionDef + ) -> Dict[str, Any]: + """Extract function information. + + Args: + node: AST function node + + Returns: + Dictionary with function information + """ + args = [] + defaults = [] + + for arg in node.args.args: + if arg.arg != "self" and arg.arg != "cls": + args.append(arg.arg) + + for default in node.args.defaults: + if isinstance(default, ast.Constant): + defaults.append(str(default.value)) + + returns = "unknown" + if node.returns: + if isinstance(node.returns, ast.Name): + returns = node.returns.id + elif isinstance(node.returns, ast.Constant): + returns = str(node.returns.value) + + return {"args": args, "defaults": defaults, "returns": returns} + + def _generate_python_index( + self, tree: ast.AST, file_path: Path + ) -> str: + """Generate an index of all documented items. + + Args: + tree: Parsed AST tree + file_path: Path to the file + + Returns: + Index content + """ + functions = [] + classes = [] + + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef) or isinstance( + node, ast.AsyncFunctionDef + ): + if self._get_docstring(node) and not node.name.startswith("_"): + functions.append(node.name) + elif isinstance(node, ast.ClassDef): + if self._get_docstring(node): + classes.append(node.name) + + content = f"File: {file_path.name}\n\n" + if classes: + content += "Classes:\n" + "\n".join(f" - {c}" for c in classes) + "\n\n" + if functions: + content += "Functions:\n" + "\n".join(f" - {f}" for f in functions) + + return content + + def _parse_js_ts( + self, content: str, file_path: Path, language: str + ) -> List[Document]: + """Parse JavaScript/TypeScript file for JSDoc comments. + + Args: + content: File content + file_path: Path to the file + language: Language identifier + + Returns: + List of Document objects + """ + documents = [] + doc_id_base = self._generate_id(file_path) + + jsdocs = self._extract_jsdocs(content) + + if not jsdocs: + return documents + + module_doc = self._extract_js_module_doc(content) + if module_doc: + doc = Document( + id=f"{doc_id_base}_module", + content=module_doc, + source_type=self.source_type, + title=f"Module: {file_path.stem}", + file_path=str(file_path), + metadata={"doc_type": "module"}, + ) + documents.append(doc) + + for i, jsdoc in enumerate(jsdocs): + doc = self._create_jsdoc_document(jsdoc, file_path, doc_id_base, i) + documents.append(doc) + + return documents + + def _extract_jsdocs(self, content: str) -> List[Dict[str, Any]]: + """Extract JSDoc comments from content. + + Args: + content: File content + + Returns: + List of JSDoc dictionaries + """ + jsdocs = [] + pattern = r"/\*\*([\s\S]*?)\*/\s*(export\s+)?(async\s+)?(function|const|let|var|class|interface|type|enum)\s+(\w+)" + matches = re.findall(pattern, content, re.MULTILINE) + + for match in matches: + full_comment = f"/**{match[0]}*/" + exported = bool(match[1]) + async_kw = bool(match[2]) + decl_type = match[3] + name = match[4] + + parsed = self._parse_jsdoc_comment(full_comment) + parsed.update({ + "name": name, + "type": decl_type, + "exported": exported, + "async": async_kw, + }) + jsdocs.append(parsed) + + return jsdocs + + def _parse_jsdoc_comment(self, comment: str) -> Dict[str, Any]: + """Parse a JSDoc comment. + + Args: + comment: JSDoc comment string + + Returns: + Parsed JSDoc dictionary + """ + result = { + "description": "", + "params": [], + "returns": None, + "examples": [], + "throws": [], + "see": [], + } + + lines = comment.strip("/**").strip("*/").split("\n") + current_description = [] + + for line in lines: + line = line.strip().lstrip("*").strip() + + if line.startswith("@param"): + param_match = re.match(r"@param\s+\{([^}]+)\}\s+(\w+)(?:\s+-)?\s*(.*)", line) + if param_match: + result["params"].append({ + "type": param_match.group(1), + "name": param_match.group(2), + "description": param_match.group(3), + }) + elif line.startswith("@returns") or line.startswith("@return"): + return_match = re.match(r"@returns?\{([^}]+)\}\s*(.*)", line) + if return_match: + result["returns"] = { + "type": return_match.group(1), + "description": return_match.group(2), + } + elif line.startswith("@example"): + result["examples"].append(line[8:].strip()) + elif line.startswith("@throws"): + throw_match = re.match(r"@throws\{([^}]+)\}\s*(.*)", line) + if throw_match: + result["throws"].append({ + "type": throw_match.group(1), + "description": throw_match.group(2), + }) + elif line.startswith("@see"): + result["see"].append(line[4:].strip()) + elif line and not line.startswith("@"): + current_description.append(line) + + result["description"] = " ".join(current_description) + return result + + def _extract_js_module_doc(self, content: str) -> Optional[str]: + """Extract module-level documentation. + + Args: + content: File content + + Returns: + Module docstring or None + """ + file_doc_pattern = r"/\*\*([\s\S]*?)\*/\s*@module\s+(\w+)" + match = re.search(file_doc_pattern, content) + if match: + return f"Module: {match.group(2)}\n\n{match.group(1).strip()}" + return None + + def _create_jsdoc_document( + self, + jsdoc: Dict[str, Any], + file_path: Path, + doc_id_base: str, + index: int, + ) -> Document: + """Create a Document from parsed JSDoc. + + Args: + jsdoc: Parsed JSDoc dictionary + file_path: Path to the source file + doc_id_base: Base ID for document generation + index: Index for ID generation + + Returns: + Document object + """ + content_parts = [] + + decl_type = jsdoc.get("type", "unknown") + name = jsdoc.get("name", "unknown") + is_async = "async " if jsdoc.get("async") else "" + is_exported = "export " if jsdoc.get("exported") else "" + + content_parts.append(f"{is_exported}{is_async}{decl_type} {name}") + + if jsdoc.get("description"): + content_parts.append(f"\nDescription: {jsdoc['description']}") + + if jsdoc.get("params"): + param_lines = ["\nParameters:"] + for param in jsdoc["params"]: + param_lines.append( + f" - {param['name']} ({param['type']}): {param['description']}" + ) + content_parts.append("\n".join(param_lines)) + + if jsdoc.get("returns"): + ret = jsdoc["returns"] + content_parts.append(f"\nReturns ({ret['type']}): {ret['description']}") + + if jsdoc.get("examples"): + examples = "\nExamples:\n" + "\n".join( + f" {i+1}. {ex}" for i, ex in enumerate(jsdoc["examples"]) + ) + content_parts.append(examples) + + content = "\n".join(content_parts) + + return Document( + id=f"{doc_id_base}_jsdoc_{index}", + content=content, + source_type=self.source_type, + title=f"{decl_type.capitalize()}: {name}", + file_path=str(file_path), + metadata={ + "doc_type": "jsdoc", + "name": name, + "jsdoc_type": decl_type, + }, + ) + + def _is_supported_file(self, path: Path) -> bool: + """Check if the file is a supported code file. + + Args: + path: Path to the file + + Returns: + True if the file extension is supported + """ + return path.suffix.lower() in self.SUPPORTED_EXTENSIONS + + def get_documents(self) -> List[Document]: + """Get all indexed documents. + + Returns: + List of Document objects + """ + return self._documents