from pathlib import Path from typing import Optional import os import re from codechunk.core.chunking import ParsedChunk, ChunkMetadata LANGUAGE_EXTENSIONS = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".go": "go", ".rs": "rust", ".java": "java", ".cpp": "cpp", ".c": "c", ".h": "c", ".cs": "csharp", ".rb": "ruby", ".php": "php", ".swift": "swift", ".kt": "kotlin", ".scala": "scala", ".r": "r", ".m": "matlab", ".lua": "lua", ".pl": "perl", ".hs": "haskell", ".elm": "elm", ".ex": "elixir", ".erl": "erlang", ".ml": "ocaml", ".fs": "fsharp", ".jl": "julia", ".dart": "dart", ".vue": "vue", ".svelte": "svelte", } class CodeParser: def __init__(self): self.files: list[Path] = [] self.file_contents: dict[Path, str] = {} def detect_language(self, file_path: Path) -> Optional[str]: """Detect programming language from file extension.""" ext = file_path.suffix.lower() return LANGUAGE_EXTENSIONS.get(ext) def discover_files(self, project_path: Path, include_patterns: list[str], exclude_patterns: list[str]) -> None: """Discover source files in project directory.""" from fnmatch import fnmatch self.files = [] project_path = Path(project_path) for root, _dirs, files in os.walk(project_path): root_path = Path(root) for file_name in files: file_path = root_path / file_name rel_path = file_path.relative_to(project_path) rel_path_str = str(rel_path) include = False for pattern in include_patterns: if fnmatch(file_name, pattern) or fnmatch(rel_path_str, pattern): include = True break if not include: continue exclude = False for pattern in exclude_patterns: if fnmatch(file_name, pattern) or fnmatch(rel_path_str, pattern): exclude = True break if exclude: continue if self.detect_language(file_path): self.files.append(file_path) def read_file(self, file_path: Path) -> str: """Read file content.""" if file_path in self.file_contents: return self.file_contents[file_path] content = file_path.read_text(encoding='utf-8', errors='replace') self.file_contents[file_path] = content return content def parse_all(self) -> list[ParsedChunk]: """Parse all discovered files.""" chunks = [] for file_path in self.files: file_chunks = self.parse_file(file_path) chunks.extend(file_chunks) return chunks def parse_file(self, file_path: Path) -> list[ParsedChunk]: """Parse a single file and extract chunks.""" language = self.detect_language(file_path) if not language: return [] content = self.read_file(file_path) lines = content.split('\n') if language == "python": return self._parse_python(file_path, content, lines) elif language in ["javascript", "typescript"]: return self._parse_js_like(file_path, content, lines, language) elif language == "go": return self._parse_go(file_path, content, lines) elif language == "rust": return self._parse_rust(file_path, content, lines) else: return self._parse_generic(file_path, content, lines, language) def _parse_python(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]: """Parse Python file for classes and functions.""" chunks = [] current_class = None class_start = 0 imports = self._extract_imports(content, "python") for i, line in enumerate(lines): class_match = re.match(r'^class\s+(\w+)(?:\([^)]*\))?\s*:', line) if class_match: if current_class: class_content = '\n'.join(lines[class_start:i]) class_lines = i - class_start docstring = self._extract_docstring(lines[class_start:]) chunks.append(ParsedChunk( name=current_class, chunk_type="class", content=class_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language="python", start_line=class_start + 1, end_line=i, line_count=class_lines, docstring=docstring, imports=imports ) )) current_class = class_match.group(1) class_start = i func_match = re.match(r'^\s*def\s+(\w+)\s*\(([^)]*)\)\s*(?:->\s*(\w+))?\s*:', line) if func_match and current_class: func_name = func_match.group(1) full_name = f"{current_class}.{func_name}" params = self._parse_params(func_match.group(2)) return_type = func_match.group(3) indent = len(line) - len(line.lstrip()) func_start = i for j in range(i + 1, len(lines)): if j == len(lines) - 1: next_line = "" else: next_line = lines[j] if not next_line.strip(): continue next_indent = len(next_line) - len(next_line.lstrip()) if next_indent <= indent and next_line.strip(): break else: j = len(lines) func_content = '\n'.join(lines[func_start:j]) func_lines = j - func_start docstring = self._extract_docstring(lines[func_start:]) complexity = self._calculate_complexity('\n'.join(lines[func_start:j])) chunks.append(ParsedChunk( name=full_name, chunk_type="method", content=func_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language="python", start_line=func_start + 1, end_line=j, line_count=func_lines, docstring=docstring, imports=imports, parameters=params, return_type=return_type, complexity_score=complexity ) )) if current_class: class_content = '\n'.join(lines[class_start:]) class_lines = len(lines) - class_start docstring = self._extract_docstring(lines[class_start:]) chunks.append(ParsedChunk( name=current_class, chunk_type="class", content=class_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language="python", start_line=class_start + 1, end_line=len(lines), line_count=class_lines, docstring=docstring, imports=imports ) )) for i, line in enumerate(lines): func_match = re.match(r'^\s*def\s+(\w+)\s*\(([^)]*)\)\s*(?:->\s*(\w+))?\s*:', line) if func_match and not any(c.metadata.start_line == i + 1 for c in chunks if c.chunk_type == "function"): func_name = func_match.group(1) params = self._parse_params(func_match.group(2)) return_type = func_match.group(3) indent = len(line) - len(line.lstrip()) func_start = i for j in range(i + 1, len(lines)): if j == len(lines) - 1: next_line = "" else: next_line = lines[j] if not next_line.strip(): continue next_indent = len(next_line) - len(next_line.lstrip()) if next_indent <= indent and next_line.strip(): break else: j = len(lines) func_content = '\n'.join(lines[func_start:j]) func_lines = j - func_start docstring = self._extract_docstring(lines[func_start:]) complexity = self._calculate_complexity('\n'.join(lines[func_start:j])) chunks.append(ParsedChunk( name=func_name, chunk_type="function", content=func_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language="python", start_line=func_start + 1, end_line=j, line_count=func_lines, docstring=docstring, imports=imports, parameters=params, return_type=return_type, complexity_score=complexity ) )) return chunks def _parse_js_like(self, file_path: Path, content: str, lines: list[str], language: str) -> list[ParsedChunk]: """Parse JavaScript/TypeScript file.""" chunks = [] imports = self._extract_imports(content, language) for i, line in enumerate(lines): class_match = re.match(r'\s*class\s+(\w+)\s*\{?', line) if class_match: class_name = class_match.group(1) class_start = i brace_count = 0 found_brace = False for j in range(i, len(lines)): brace_count += lines[j].count('{') - lines[j].count('}') if '{' in lines[j]: found_brace = True if found_brace and brace_count == 0: break else: j = len(lines) class_content = '\n'.join(lines[class_start:j]) chunks.append(ParsedChunk( name=class_name, chunk_type="class", content=class_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language=language, start_line=class_start + 1, end_line=j, line_count=j - class_start, imports=imports ) )) func_match = re.match(r'\s*(?:async\s+)?function\s+(\w+)\s*\(', line) if func_match: func_name = func_match.group(1) func_start = i brace_count = 0 found_brace = False for j in range(i, len(lines)): brace_count += lines[j].count('{') - lines[j].count('}') if '{' in lines[j]: found_brace = True if found_brace and brace_count == 0: break else: j = len(lines) func_content = '\n'.join(lines[func_start:j]) chunks.append(ParsedChunk( name=func_name, chunk_type="function", content=func_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language=language, start_line=func_start + 1, end_line=j, line_count=j - func_start, imports=imports ) )) arrow_match = re.match(r'\s*(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>', line) if arrow_match: func_name = arrow_match.group(1) func_start = i brace_count = 0 found_brace = False for j in range(i, len(lines)): brace_count += lines[j].count('{') - lines[j].count('}') if '{' in lines[j]: found_brace = True if found_brace and brace_count == 0: break else: j = len(lines) func_content = '\n'.join(lines[func_start:j]) chunks.append(ParsedChunk( name=func_name, chunk_type="function", content=func_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language=language, start_line=func_start + 1, end_line=j, line_count=j - func_start, imports=imports ) )) return chunks def _parse_go(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]: """Parse Go file.""" chunks = [] imports = self._extract_imports(content, "go") for i, line in enumerate(lines): func_match = re.match(r'\s*func\s+(?:\([^)]+\)\s*)?(\w+)\s*\(', line) if func_match: func_name = func_match.group(1) func_start = i brace_count = 0 for j in range(i, len(lines)): brace_count += lines[j].count('{') - lines[j].count('}') if brace_count > 0 and j > i: break else: j = len(lines) func_content = '\n'.join(lines[func_start:j]) chunks.append(ParsedChunk( name=func_name, chunk_type="function", content=func_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language="go", start_line=func_start + 1, end_line=j, line_count=j - func_start, imports=imports ) )) struct_match = re.match(r'\s*type\s+(\w+)\s*struct\s*\{', line) if struct_match: struct_name = struct_match.group(1) struct_start = i brace_count = 0 for j in range(i, len(lines)): brace_count += lines[j].count('{') - lines[j].count('}') if brace_count == 0 and j > i: break else: j = len(lines) struct_content = '\n'.join(lines[struct_start:j]) chunks.append(ParsedChunk( name=struct_name, chunk_type="class", content=struct_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language="go", start_line=struct_start + 1, end_line=j, line_count=j - struct_start, imports=imports ) )) return chunks def _parse_rust(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]: """Parse Rust file.""" chunks = [] imports = self._extract_imports(content, "rust") for i, line in enumerate(lines): func_match = re.match(r'\s*(?:pub\s+)?fn\s+(\w+)\s*<', line) if func_match: func_name = func_match.group(1) func_start = i brace_count = 0 for j in range(i, len(lines)): brace_count += lines[j].count('{') - lines[j].count('}') if brace_count > 0 and j > i: break else: j = len(lines) func_content = '\n'.join(lines[func_start:j]) chunks.append(ParsedChunk( name=func_name, chunk_type="function", content=func_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language="rust", start_line=func_start + 1, end_line=j, line_count=j - func_start, imports=imports ) )) struct_match = re.match(r'\s*(?:pub\s+)?struct\s+(\w+)\s*\{?', line) if struct_match: struct_name = struct_match.group(1) struct_start = i brace_count = 0 for j in range(i, len(lines)): brace_count += lines[j].count('{') - lines[j].count('}') if brace_count == 0 and j > i: break else: j = len(lines) struct_content = '\n'.join(lines[struct_start:j]) chunks.append(ParsedChunk( name=struct_name, chunk_type="class", content=struct_content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language="rust", start_line=struct_start + 1, end_line=j, line_count=j - struct_start, imports=imports ) )) return chunks def _parse_generic(self, file_path: Path, content: str, lines: list[str], language: str) -> list[ParsedChunk]: """Generic parser for unknown languages.""" chunks = [] imports = self._extract_imports(content, language) docstring = self._extract_docstring(lines) chunks.append(ParsedChunk( name=file_path.stem, chunk_type="file", content=content, metadata=ChunkMetadata( file_path=file_path, file_name=file_path.name, language=language, start_line=1, end_line=len(lines), line_count=len(lines), docstring=docstring, imports=imports ) )) return chunks def _extract_imports(self, content: str, language: str) -> list[str]: """Extract import statements from content.""" imports = [] if language == "python": import_patterns = [ r'^import\s+(\w+(?:\.\w+)*)', r'^from\s+(\w+(?:\.\w+)*)\s+import', r'^import\s+\w+\s+as\s+\w+', r'^from\s+\w+\s+import\s+\w+\s+as\s+\w+', ] elif language in ["javascript", "typescript"]: import_patterns = [ r'^\s*import\s+.*\s+from\s+[\'"]([^\'"]+)[\'"]', r'^\s*import\s+[\'"]([^\'"]+)[\'"]', r'^\s*require\([\'"]([^\'"]+)[\'"]\)', ] elif language == "go": import_patterns = [ r'^\s*import\s*[\'"]([^\'"]+)[\'"]', ] elif language == "rust": import_patterns = [ r'^\s*use\s+(\w+(?:::\w+)*)', ] else: import_patterns = [] for pattern in import_patterns: matches = re.findall(pattern, content, re.MULTILINE) imports.extend(matches) return list(set(imports)) def _extract_docstring(self, lines: list[str]) -> Optional[str]: """Extract docstring from lines.""" if not lines: return None first_line = lines[0].strip() triple_quotes = ['"""', "'''", '"""', '"""'] for quote in triple_quotes: if first_line.startswith(quote) and first_line.endswith(quote): return first_line[len(quote):-len(quote)].strip() if first_line.startswith(quote): end_quote = None for i, line in enumerate(lines[1:], 1): if quote in line: end_quote = i break if end_quote: doc_lines = [first_line[len(quote):]] for line in lines[1:end_quote]: doc_lines.append(line) if lines[end_quote].rstrip().endswith(quote): doc_lines[-1] = lines[end_quote].rstrip()[:-len(quote)] return '\n'.join(doc_lines).strip() return None def _parse_params(self, params_str: str) -> list[str]: """Parse function parameters.""" if not params_str.strip(): return [] params = [] for param in params_str.split(','): param = param.strip() param = re.sub(r'\s+=\s*.+$', '', param) param = param.split(':')[0].strip() if param and param != 'self' and param != 'cls': params.append(param) return params def _calculate_complexity(self, content: str) -> int: """Calculate cyclomatic complexity.""" complexity = 1 keywords = ['if', 'elif', 'for', 'while', 'and', 'or', 'except', 'with', 'assert'] for keyword in keywords: complexity += content.count(keyword) try_count = content.count('try:') except_count = content.count('except:') if try_count > except_count: complexity += try_count return complexity