Files
codechunk-cli/codechunk/core/parser.py
7000pctAUTO 31c89214ce
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled
fix: resolve CI/CD issues - Poetry setup, type annotations, MyPy errors
2026-02-02 00:08:14 +00:00

651 lines
23 KiB
Python

from pathlib import Path
from typing import Optional
import os
import re
from codechunk.core.chunking import ParsedChunk, ChunkMetadata
LANGUAGE_EXTENSIONS = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".go": "go",
".rs": "rust",
".java": "java",
".cpp": "cpp",
".c": "c",
".h": "c",
".cs": "csharp",
".rb": "ruby",
".php": "php",
".swift": "swift",
".kt": "kotlin",
".scala": "scala",
".r": "r",
".m": "matlab",
".lua": "lua",
".pl": "perl",
".hs": "haskell",
".elm": "elm",
".ex": "elixir",
".erl": "erlang",
".ml": "ocaml",
".fs": "fsharp",
".jl": "julia",
".dart": "dart",
".vue": "vue",
".svelte": "svelte",
}
class CodeParser:
def __init__(self):
self.files: list[Path] = []
self.file_contents: dict[Path, str] = {}
def detect_language(self, file_path: Path) -> Optional[str]:
"""Detect programming language from file extension."""
ext = file_path.suffix.lower()
return LANGUAGE_EXTENSIONS.get(ext)
def discover_files(self, project_path: Path, include_patterns: list[str],
exclude_patterns: list[str]) -> None:
"""Discover source files in project directory."""
from fnmatch import fnmatch
self.files = []
project_path = Path(project_path)
for root, _dirs, files in os.walk(project_path):
root_path = Path(root)
for file_name in files:
file_path = root_path / file_name
rel_path = file_path.relative_to(project_path)
rel_path_str = str(rel_path)
include = False
for pattern in include_patterns:
if fnmatch(file_name, pattern) or fnmatch(rel_path_str, pattern):
include = True
break
if not include:
continue
exclude = False
for pattern in exclude_patterns:
if fnmatch(file_name, pattern) or fnmatch(rel_path_str, pattern):
exclude = True
break
if exclude:
continue
if self.detect_language(file_path):
self.files.append(file_path)
def read_file(self, file_path: Path) -> str:
"""Read file content."""
if file_path in self.file_contents:
return self.file_contents[file_path]
content = file_path.read_text(encoding='utf-8', errors='replace')
self.file_contents[file_path] = content
return content
def parse_all(self) -> list[ParsedChunk]:
"""Parse all discovered files."""
chunks = []
for file_path in self.files:
file_chunks = self.parse_file(file_path)
chunks.extend(file_chunks)
return chunks
def parse_file(self, file_path: Path) -> list[ParsedChunk]:
"""Parse a single file and extract chunks."""
language = self.detect_language(file_path)
if not language:
return []
content = self.read_file(file_path)
lines = content.split('\n')
if language == "python":
return self._parse_python(file_path, content, lines)
elif language in ["javascript", "typescript"]:
return self._parse_js_like(file_path, content, lines, language)
elif language == "go":
return self._parse_go(file_path, content, lines)
elif language == "rust":
return self._parse_rust(file_path, content, lines)
else:
return self._parse_generic(file_path, content, lines, language)
def _parse_python(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]:
"""Parse Python file for classes and functions."""
chunks = []
current_class = None
class_start = 0
imports = self._extract_imports(content, "python")
for i, line in enumerate(lines):
class_match = re.match(r'^class\s+(\w+)(?:\([^)]*\))?\s*:', line)
if class_match:
if current_class:
class_content = '\n'.join(lines[class_start:i])
class_lines = i - class_start
docstring = self._extract_docstring(lines[class_start:])
chunks.append(ParsedChunk(
name=current_class,
chunk_type="class",
content=class_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language="python",
start_line=class_start + 1,
end_line=i,
line_count=class_lines,
docstring=docstring,
imports=imports
)
))
current_class = class_match.group(1)
class_start = i
func_match = re.match(r'^\s*def\s+(\w+)\s*\(([^)]*)\)\s*(?:->\s*(\w+))?\s*:', line)
if func_match and current_class:
func_name = func_match.group(1)
full_name = f"{current_class}.{func_name}"
params = self._parse_params(func_match.group(2))
return_type = func_match.group(3)
indent = len(line) - len(line.lstrip())
func_start = i
for j in range(i + 1, len(lines)):
if j == len(lines) - 1:
next_line = ""
else:
next_line = lines[j]
if not next_line.strip():
continue
next_indent = len(next_line) - len(next_line.lstrip())
if next_indent <= indent and next_line.strip():
break
else:
j = len(lines)
func_content = '\n'.join(lines[func_start:j])
func_lines = j - func_start
docstring = self._extract_docstring(lines[func_start:])
complexity = self._calculate_complexity('\n'.join(lines[func_start:j]))
chunks.append(ParsedChunk(
name=full_name,
chunk_type="method",
content=func_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language="python",
start_line=func_start + 1,
end_line=j,
line_count=func_lines,
docstring=docstring,
imports=imports,
parameters=params,
return_type=return_type,
complexity_score=complexity
)
))
if current_class:
class_content = '\n'.join(lines[class_start:])
class_lines = len(lines) - class_start
docstring = self._extract_docstring(lines[class_start:])
chunks.append(ParsedChunk(
name=current_class,
chunk_type="class",
content=class_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language="python",
start_line=class_start + 1,
end_line=len(lines),
line_count=class_lines,
docstring=docstring,
imports=imports
)
))
for i, line in enumerate(lines):
func_match = re.match(r'^\s*def\s+(\w+)\s*\(([^)]*)\)\s*(?:->\s*(\w+))?\s*:', line)
if func_match and not any(c.metadata.start_line == i + 1 for c in chunks if c.chunk_type == "function"):
func_name = func_match.group(1)
params = self._parse_params(func_match.group(2))
return_type = func_match.group(3)
indent = len(line) - len(line.lstrip())
func_start = i
for j in range(i + 1, len(lines)):
if j == len(lines) - 1:
next_line = ""
else:
next_line = lines[j]
if not next_line.strip():
continue
next_indent = len(next_line) - len(next_line.lstrip())
if next_indent <= indent and next_line.strip():
break
else:
j = len(lines)
func_content = '\n'.join(lines[func_start:j])
func_lines = j - func_start
docstring = self._extract_docstring(lines[func_start:])
complexity = self._calculate_complexity('\n'.join(lines[func_start:j]))
chunks.append(ParsedChunk(
name=func_name,
chunk_type="function",
content=func_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language="python",
start_line=func_start + 1,
end_line=j,
line_count=func_lines,
docstring=docstring,
imports=imports,
parameters=params,
return_type=return_type,
complexity_score=complexity
)
))
return chunks
def _parse_js_like(self, file_path: Path, content: str, lines: list[str],
language: str) -> list[ParsedChunk]:
"""Parse JavaScript/TypeScript file."""
chunks = []
imports = self._extract_imports(content, language)
for i, line in enumerate(lines):
class_match = re.match(r'\s*class\s+(\w+)\s*\{?', line)
if class_match:
class_name = class_match.group(1)
class_start = i
brace_count = 0
found_brace = False
for j in range(i, len(lines)):
brace_count += lines[j].count('{') - lines[j].count('}')
if '{' in lines[j]:
found_brace = True
if found_brace and brace_count == 0:
break
else:
j = len(lines)
class_content = '\n'.join(lines[class_start:j])
chunks.append(ParsedChunk(
name=class_name,
chunk_type="class",
content=class_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language=language,
start_line=class_start + 1,
end_line=j,
line_count=j - class_start,
imports=imports
)
))
func_match = re.match(r'\s*(?:async\s+)?function\s+(\w+)\s*\(', line)
if func_match:
func_name = func_match.group(1)
func_start = i
brace_count = 0
found_brace = False
for j in range(i, len(lines)):
brace_count += lines[j].count('{') - lines[j].count('}')
if '{' in lines[j]:
found_brace = True
if found_brace and brace_count == 0:
break
else:
j = len(lines)
func_content = '\n'.join(lines[func_start:j])
chunks.append(ParsedChunk(
name=func_name,
chunk_type="function",
content=func_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language=language,
start_line=func_start + 1,
end_line=j,
line_count=j - func_start,
imports=imports
)
))
arrow_match = re.match(r'\s*(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>', line)
if arrow_match:
func_name = arrow_match.group(1)
func_start = i
brace_count = 0
found_brace = False
for j in range(i, len(lines)):
brace_count += lines[j].count('{') - lines[j].count('}')
if '{' in lines[j]:
found_brace = True
if found_brace and brace_count == 0:
break
else:
j = len(lines)
func_content = '\n'.join(lines[func_start:j])
chunks.append(ParsedChunk(
name=func_name,
chunk_type="function",
content=func_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language=language,
start_line=func_start + 1,
end_line=j,
line_count=j - func_start,
imports=imports
)
))
return chunks
def _parse_go(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]:
"""Parse Go file."""
chunks = []
imports = self._extract_imports(content, "go")
for i, line in enumerate(lines):
func_match = re.match(r'\s*func\s+(?:\([^)]+\)\s*)?(\w+)\s*\(', line)
if func_match:
func_name = func_match.group(1)
func_start = i
brace_count = 0
for j in range(i, len(lines)):
brace_count += lines[j].count('{') - lines[j].count('}')
if brace_count > 0 and j > i:
break
else:
j = len(lines)
func_content = '\n'.join(lines[func_start:j])
chunks.append(ParsedChunk(
name=func_name,
chunk_type="function",
content=func_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language="go",
start_line=func_start + 1,
end_line=j,
line_count=j - func_start,
imports=imports
)
))
struct_match = re.match(r'\s*type\s+(\w+)\s*struct\s*\{', line)
if struct_match:
struct_name = struct_match.group(1)
struct_start = i
brace_count = 0
for j in range(i, len(lines)):
brace_count += lines[j].count('{') - lines[j].count('}')
if brace_count == 0 and j > i:
break
else:
j = len(lines)
struct_content = '\n'.join(lines[struct_start:j])
chunks.append(ParsedChunk(
name=struct_name,
chunk_type="class",
content=struct_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language="go",
start_line=struct_start + 1,
end_line=j,
line_count=j - struct_start,
imports=imports
)
))
return chunks
def _parse_rust(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]:
"""Parse Rust file."""
chunks = []
imports = self._extract_imports(content, "rust")
for i, line in enumerate(lines):
func_match = re.match(r'\s*(?:pub\s+)?fn\s+(\w+)\s*<', line)
if func_match:
func_name = func_match.group(1)
func_start = i
brace_count = 0
for j in range(i, len(lines)):
brace_count += lines[j].count('{') - lines[j].count('}')
if brace_count > 0 and j > i:
break
else:
j = len(lines)
func_content = '\n'.join(lines[func_start:j])
chunks.append(ParsedChunk(
name=func_name,
chunk_type="function",
content=func_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language="rust",
start_line=func_start + 1,
end_line=j,
line_count=j - func_start,
imports=imports
)
))
struct_match = re.match(r'\s*(?:pub\s+)?struct\s+(\w+)\s*\{?', line)
if struct_match:
struct_name = struct_match.group(1)
struct_start = i
brace_count = 0
for j in range(i, len(lines)):
brace_count += lines[j].count('{') - lines[j].count('}')
if brace_count == 0 and j > i:
break
else:
j = len(lines)
struct_content = '\n'.join(lines[struct_start:j])
chunks.append(ParsedChunk(
name=struct_name,
chunk_type="class",
content=struct_content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language="rust",
start_line=struct_start + 1,
end_line=j,
line_count=j - struct_start,
imports=imports
)
))
return chunks
def _parse_generic(self, file_path: Path, content: str, lines: list[str],
language: str) -> list[ParsedChunk]:
"""Generic parser for unknown languages."""
chunks = []
imports = self._extract_imports(content, language)
docstring = self._extract_docstring(lines)
chunks.append(ParsedChunk(
name=file_path.stem,
chunk_type="file",
content=content,
metadata=ChunkMetadata(
file_path=file_path,
file_name=file_path.name,
language=language,
start_line=1,
end_line=len(lines),
line_count=len(lines),
docstring=docstring,
imports=imports
)
))
return chunks
def _extract_imports(self, content: str, language: str) -> list[str]:
"""Extract import statements from content."""
imports = []
if language == "python":
import_patterns = [
r'^import\s+(\w+(?:\.\w+)*)',
r'^from\s+(\w+(?:\.\w+)*)\s+import',
r'^import\s+\w+\s+as\s+\w+',
r'^from\s+\w+\s+import\s+\w+\s+as\s+\w+',
]
elif language in ["javascript", "typescript"]:
import_patterns = [
r'^\s*import\s+.*\s+from\s+[\'"]([^\'"]+)[\'"]',
r'^\s*import\s+[\'"]([^\'"]+)[\'"]',
r'^\s*require\([\'"]([^\'"]+)[\'"]\)',
]
elif language == "go":
import_patterns = [
r'^\s*import\s*[\'"]([^\'"]+)[\'"]',
]
elif language == "rust":
import_patterns = [
r'^\s*use\s+(\w+(?:::\w+)*)',
]
else:
import_patterns = []
for pattern in import_patterns:
matches = re.findall(pattern, content, re.MULTILINE)
imports.extend(matches)
return list(set(imports))
def _extract_docstring(self, lines: list[str]) -> Optional[str]:
"""Extract docstring from lines."""
if not lines:
return None
first_line = lines[0].strip()
triple_quotes = ['"""', "'''", '"""', '"""']
for quote in triple_quotes:
if first_line.startswith(quote) and first_line.endswith(quote):
return first_line[len(quote):-len(quote)].strip()
if first_line.startswith(quote):
end_quote = None
for i, line in enumerate(lines[1:], 1):
if quote in line:
end_quote = i
break
if end_quote:
doc_lines = [first_line[len(quote):]]
for line in lines[1:end_quote]:
doc_lines.append(line)
if lines[end_quote].rstrip().endswith(quote):
doc_lines[-1] = lines[end_quote].rstrip()[:-len(quote)]
return '\n'.join(doc_lines).strip()
return None
def _parse_params(self, params_str: str) -> list[str]:
"""Parse function parameters."""
if not params_str.strip():
return []
params = []
for param in params_str.split(','):
param = param.strip()
param = re.sub(r'\s+=\s*.+$', '', param)
param = param.split(':')[0].strip()
if param and param != 'self' and param != 'cls':
params.append(param)
return params
def _calculate_complexity(self, content: str) -> int:
"""Calculate cyclomatic complexity."""
complexity = 1
keywords = ['if', 'elif', 'for', 'while', 'and', 'or', 'except', 'with', 'assert']
for keyword in keywords:
complexity += content.count(keyword)
try_count = content.count('try:')
except_count = content.count('except:')
if try_count > except_count:
complexity += try_count
return complexity