651 lines
23 KiB
Python
651 lines
23 KiB
Python
from pathlib import Path
|
|
from typing import Optional
|
|
import os
|
|
import re
|
|
from codechunk.core.chunking import ParsedChunk, ChunkMetadata
|
|
|
|
|
|
LANGUAGE_EXTENSIONS = {
|
|
".py": "python",
|
|
".js": "javascript",
|
|
".ts": "typescript",
|
|
".go": "go",
|
|
".rs": "rust",
|
|
".java": "java",
|
|
".cpp": "cpp",
|
|
".c": "c",
|
|
".h": "c",
|
|
".cs": "csharp",
|
|
".rb": "ruby",
|
|
".php": "php",
|
|
".swift": "swift",
|
|
".kt": "kotlin",
|
|
".scala": "scala",
|
|
".r": "r",
|
|
".m": "matlab",
|
|
".lua": "lua",
|
|
".pl": "perl",
|
|
".hs": "haskell",
|
|
".elm": "elm",
|
|
".ex": "elixir",
|
|
".erl": "erlang",
|
|
".ml": "ocaml",
|
|
".fs": "fsharp",
|
|
".jl": "julia",
|
|
".dart": "dart",
|
|
".vue": "vue",
|
|
".svelte": "svelte",
|
|
}
|
|
|
|
|
|
class CodeParser:
|
|
def __init__(self):
|
|
self.files: list[Path] = []
|
|
self.file_contents: dict[Path, str] = {}
|
|
|
|
def detect_language(self, file_path: Path) -> Optional[str]:
|
|
"""Detect programming language from file extension."""
|
|
ext = file_path.suffix.lower()
|
|
return LANGUAGE_EXTENSIONS.get(ext)
|
|
|
|
def discover_files(self, project_path: Path, include_patterns: list[str],
|
|
exclude_patterns: list[str]) -> None:
|
|
"""Discover source files in project directory."""
|
|
from fnmatch import fnmatch
|
|
|
|
self.files = []
|
|
project_path = Path(project_path)
|
|
|
|
for root, _dirs, files in os.walk(project_path):
|
|
root_path = Path(root)
|
|
|
|
for file_name in files:
|
|
file_path = root_path / file_name
|
|
|
|
rel_path = file_path.relative_to(project_path)
|
|
rel_path_str = str(rel_path)
|
|
|
|
include = False
|
|
for pattern in include_patterns:
|
|
if fnmatch(file_name, pattern) or fnmatch(rel_path_str, pattern):
|
|
include = True
|
|
break
|
|
|
|
if not include:
|
|
continue
|
|
|
|
exclude = False
|
|
for pattern in exclude_patterns:
|
|
if fnmatch(file_name, pattern) or fnmatch(rel_path_str, pattern):
|
|
exclude = True
|
|
break
|
|
|
|
if exclude:
|
|
continue
|
|
|
|
if self.detect_language(file_path):
|
|
self.files.append(file_path)
|
|
|
|
def read_file(self, file_path: Path) -> str:
|
|
"""Read file content."""
|
|
if file_path in self.file_contents:
|
|
return self.file_contents[file_path]
|
|
|
|
content = file_path.read_text(encoding='utf-8', errors='replace')
|
|
self.file_contents[file_path] = content
|
|
return content
|
|
|
|
def parse_all(self) -> list[ParsedChunk]:
|
|
"""Parse all discovered files."""
|
|
chunks = []
|
|
for file_path in self.files:
|
|
file_chunks = self.parse_file(file_path)
|
|
chunks.extend(file_chunks)
|
|
return chunks
|
|
|
|
def parse_file(self, file_path: Path) -> list[ParsedChunk]:
|
|
"""Parse a single file and extract chunks."""
|
|
language = self.detect_language(file_path)
|
|
if not language:
|
|
return []
|
|
|
|
content = self.read_file(file_path)
|
|
lines = content.split('\n')
|
|
|
|
if language == "python":
|
|
return self._parse_python(file_path, content, lines)
|
|
elif language in ["javascript", "typescript"]:
|
|
return self._parse_js_like(file_path, content, lines, language)
|
|
elif language == "go":
|
|
return self._parse_go(file_path, content, lines)
|
|
elif language == "rust":
|
|
return self._parse_rust(file_path, content, lines)
|
|
else:
|
|
return self._parse_generic(file_path, content, lines, language)
|
|
|
|
def _parse_python(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]:
|
|
"""Parse Python file for classes and functions."""
|
|
chunks = []
|
|
current_class = None
|
|
class_start = 0
|
|
|
|
imports = self._extract_imports(content, "python")
|
|
|
|
for i, line in enumerate(lines):
|
|
class_match = re.match(r'^class\s+(\w+)(?:\([^)]*\))?\s*:', line)
|
|
if class_match:
|
|
if current_class:
|
|
class_content = '\n'.join(lines[class_start:i])
|
|
class_lines = i - class_start
|
|
|
|
docstring = self._extract_docstring(lines[class_start:])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=current_class,
|
|
chunk_type="class",
|
|
content=class_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language="python",
|
|
start_line=class_start + 1,
|
|
end_line=i,
|
|
line_count=class_lines,
|
|
docstring=docstring,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
current_class = class_match.group(1)
|
|
class_start = i
|
|
|
|
func_match = re.match(r'^\s*def\s+(\w+)\s*\(([^)]*)\)\s*(?:->\s*(\w+))?\s*:', line)
|
|
if func_match and current_class:
|
|
func_name = func_match.group(1)
|
|
full_name = f"{current_class}.{func_name}"
|
|
params = self._parse_params(func_match.group(2))
|
|
return_type = func_match.group(3)
|
|
|
|
indent = len(line) - len(line.lstrip())
|
|
func_start = i
|
|
|
|
for j in range(i + 1, len(lines)):
|
|
if j == len(lines) - 1:
|
|
next_line = ""
|
|
else:
|
|
next_line = lines[j]
|
|
|
|
if not next_line.strip():
|
|
continue
|
|
next_indent = len(next_line) - len(next_line.lstrip())
|
|
if next_indent <= indent and next_line.strip():
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
func_content = '\n'.join(lines[func_start:j])
|
|
func_lines = j - func_start
|
|
|
|
docstring = self._extract_docstring(lines[func_start:])
|
|
|
|
complexity = self._calculate_complexity('\n'.join(lines[func_start:j]))
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=full_name,
|
|
chunk_type="method",
|
|
content=func_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language="python",
|
|
start_line=func_start + 1,
|
|
end_line=j,
|
|
line_count=func_lines,
|
|
docstring=docstring,
|
|
imports=imports,
|
|
parameters=params,
|
|
return_type=return_type,
|
|
complexity_score=complexity
|
|
)
|
|
))
|
|
|
|
if current_class:
|
|
class_content = '\n'.join(lines[class_start:])
|
|
class_lines = len(lines) - class_start
|
|
|
|
docstring = self._extract_docstring(lines[class_start:])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=current_class,
|
|
chunk_type="class",
|
|
content=class_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language="python",
|
|
start_line=class_start + 1,
|
|
end_line=len(lines),
|
|
line_count=class_lines,
|
|
docstring=docstring,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
for i, line in enumerate(lines):
|
|
func_match = re.match(r'^\s*def\s+(\w+)\s*\(([^)]*)\)\s*(?:->\s*(\w+))?\s*:', line)
|
|
if func_match and not any(c.metadata.start_line == i + 1 for c in chunks if c.chunk_type == "function"):
|
|
func_name = func_match.group(1)
|
|
params = self._parse_params(func_match.group(2))
|
|
return_type = func_match.group(3)
|
|
|
|
indent = len(line) - len(line.lstrip())
|
|
func_start = i
|
|
|
|
for j in range(i + 1, len(lines)):
|
|
if j == len(lines) - 1:
|
|
next_line = ""
|
|
else:
|
|
next_line = lines[j]
|
|
|
|
if not next_line.strip():
|
|
continue
|
|
next_indent = len(next_line) - len(next_line.lstrip())
|
|
if next_indent <= indent and next_line.strip():
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
func_content = '\n'.join(lines[func_start:j])
|
|
func_lines = j - func_start
|
|
|
|
docstring = self._extract_docstring(lines[func_start:])
|
|
|
|
complexity = self._calculate_complexity('\n'.join(lines[func_start:j]))
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=func_name,
|
|
chunk_type="function",
|
|
content=func_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language="python",
|
|
start_line=func_start + 1,
|
|
end_line=j,
|
|
line_count=func_lines,
|
|
docstring=docstring,
|
|
imports=imports,
|
|
parameters=params,
|
|
return_type=return_type,
|
|
complexity_score=complexity
|
|
)
|
|
))
|
|
|
|
return chunks
|
|
|
|
def _parse_js_like(self, file_path: Path, content: str, lines: list[str],
|
|
language: str) -> list[ParsedChunk]:
|
|
"""Parse JavaScript/TypeScript file."""
|
|
chunks = []
|
|
imports = self._extract_imports(content, language)
|
|
|
|
for i, line in enumerate(lines):
|
|
class_match = re.match(r'\s*class\s+(\w+)\s*\{?', line)
|
|
if class_match:
|
|
class_name = class_match.group(1)
|
|
class_start = i
|
|
|
|
brace_count = 0
|
|
found_brace = False
|
|
for j in range(i, len(lines)):
|
|
brace_count += lines[j].count('{') - lines[j].count('}')
|
|
if '{' in lines[j]:
|
|
found_brace = True
|
|
if found_brace and brace_count == 0:
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
class_content = '\n'.join(lines[class_start:j])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=class_name,
|
|
chunk_type="class",
|
|
content=class_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language=language,
|
|
start_line=class_start + 1,
|
|
end_line=j,
|
|
line_count=j - class_start,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
func_match = re.match(r'\s*(?:async\s+)?function\s+(\w+)\s*\(', line)
|
|
if func_match:
|
|
func_name = func_match.group(1)
|
|
func_start = i
|
|
|
|
brace_count = 0
|
|
found_brace = False
|
|
for j in range(i, len(lines)):
|
|
brace_count += lines[j].count('{') - lines[j].count('}')
|
|
if '{' in lines[j]:
|
|
found_brace = True
|
|
if found_brace and brace_count == 0:
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
func_content = '\n'.join(lines[func_start:j])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=func_name,
|
|
chunk_type="function",
|
|
content=func_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language=language,
|
|
start_line=func_start + 1,
|
|
end_line=j,
|
|
line_count=j - func_start,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
arrow_match = re.match(r'\s*(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>', line)
|
|
if arrow_match:
|
|
func_name = arrow_match.group(1)
|
|
func_start = i
|
|
|
|
brace_count = 0
|
|
found_brace = False
|
|
for j in range(i, len(lines)):
|
|
brace_count += lines[j].count('{') - lines[j].count('}')
|
|
if '{' in lines[j]:
|
|
found_brace = True
|
|
if found_brace and brace_count == 0:
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
func_content = '\n'.join(lines[func_start:j])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=func_name,
|
|
chunk_type="function",
|
|
content=func_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language=language,
|
|
start_line=func_start + 1,
|
|
end_line=j,
|
|
line_count=j - func_start,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
return chunks
|
|
|
|
def _parse_go(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]:
|
|
"""Parse Go file."""
|
|
chunks = []
|
|
imports = self._extract_imports(content, "go")
|
|
|
|
for i, line in enumerate(lines):
|
|
func_match = re.match(r'\s*func\s+(?:\([^)]+\)\s*)?(\w+)\s*\(', line)
|
|
if func_match:
|
|
func_name = func_match.group(1)
|
|
func_start = i
|
|
|
|
brace_count = 0
|
|
for j in range(i, len(lines)):
|
|
brace_count += lines[j].count('{') - lines[j].count('}')
|
|
if brace_count > 0 and j > i:
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
func_content = '\n'.join(lines[func_start:j])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=func_name,
|
|
chunk_type="function",
|
|
content=func_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language="go",
|
|
start_line=func_start + 1,
|
|
end_line=j,
|
|
line_count=j - func_start,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
struct_match = re.match(r'\s*type\s+(\w+)\s*struct\s*\{', line)
|
|
if struct_match:
|
|
struct_name = struct_match.group(1)
|
|
struct_start = i
|
|
|
|
brace_count = 0
|
|
for j in range(i, len(lines)):
|
|
brace_count += lines[j].count('{') - lines[j].count('}')
|
|
if brace_count == 0 and j > i:
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
struct_content = '\n'.join(lines[struct_start:j])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=struct_name,
|
|
chunk_type="class",
|
|
content=struct_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language="go",
|
|
start_line=struct_start + 1,
|
|
end_line=j,
|
|
line_count=j - struct_start,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
return chunks
|
|
|
|
def _parse_rust(self, file_path: Path, content: str, lines: list[str]) -> list[ParsedChunk]:
|
|
"""Parse Rust file."""
|
|
chunks = []
|
|
imports = self._extract_imports(content, "rust")
|
|
|
|
for i, line in enumerate(lines):
|
|
func_match = re.match(r'\s*(?:pub\s+)?fn\s+(\w+)\s*<', line)
|
|
if func_match:
|
|
func_name = func_match.group(1)
|
|
func_start = i
|
|
|
|
brace_count = 0
|
|
for j in range(i, len(lines)):
|
|
brace_count += lines[j].count('{') - lines[j].count('}')
|
|
if brace_count > 0 and j > i:
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
func_content = '\n'.join(lines[func_start:j])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=func_name,
|
|
chunk_type="function",
|
|
content=func_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language="rust",
|
|
start_line=func_start + 1,
|
|
end_line=j,
|
|
line_count=j - func_start,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
struct_match = re.match(r'\s*(?:pub\s+)?struct\s+(\w+)\s*\{?', line)
|
|
if struct_match:
|
|
struct_name = struct_match.group(1)
|
|
struct_start = i
|
|
|
|
brace_count = 0
|
|
for j in range(i, len(lines)):
|
|
brace_count += lines[j].count('{') - lines[j].count('}')
|
|
if brace_count == 0 and j > i:
|
|
break
|
|
else:
|
|
j = len(lines)
|
|
|
|
struct_content = '\n'.join(lines[struct_start:j])
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=struct_name,
|
|
chunk_type="class",
|
|
content=struct_content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language="rust",
|
|
start_line=struct_start + 1,
|
|
end_line=j,
|
|
line_count=j - struct_start,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
return chunks
|
|
|
|
def _parse_generic(self, file_path: Path, content: str, lines: list[str],
|
|
language: str) -> list[ParsedChunk]:
|
|
"""Generic parser for unknown languages."""
|
|
chunks = []
|
|
imports = self._extract_imports(content, language)
|
|
|
|
docstring = self._extract_docstring(lines)
|
|
|
|
chunks.append(ParsedChunk(
|
|
name=file_path.stem,
|
|
chunk_type="file",
|
|
content=content,
|
|
metadata=ChunkMetadata(
|
|
file_path=file_path,
|
|
file_name=file_path.name,
|
|
language=language,
|
|
start_line=1,
|
|
end_line=len(lines),
|
|
line_count=len(lines),
|
|
docstring=docstring,
|
|
imports=imports
|
|
)
|
|
))
|
|
|
|
return chunks
|
|
|
|
def _extract_imports(self, content: str, language: str) -> list[str]:
|
|
"""Extract import statements from content."""
|
|
imports = []
|
|
|
|
if language == "python":
|
|
import_patterns = [
|
|
r'^import\s+(\w+(?:\.\w+)*)',
|
|
r'^from\s+(\w+(?:\.\w+)*)\s+import',
|
|
r'^import\s+\w+\s+as\s+\w+',
|
|
r'^from\s+\w+\s+import\s+\w+\s+as\s+\w+',
|
|
]
|
|
elif language in ["javascript", "typescript"]:
|
|
import_patterns = [
|
|
r'^\s*import\s+.*\s+from\s+[\'"]([^\'"]+)[\'"]',
|
|
r'^\s*import\s+[\'"]([^\'"]+)[\'"]',
|
|
r'^\s*require\([\'"]([^\'"]+)[\'"]\)',
|
|
]
|
|
elif language == "go":
|
|
import_patterns = [
|
|
r'^\s*import\s*[\'"]([^\'"]+)[\'"]',
|
|
]
|
|
elif language == "rust":
|
|
import_patterns = [
|
|
r'^\s*use\s+(\w+(?:::\w+)*)',
|
|
]
|
|
else:
|
|
import_patterns = []
|
|
|
|
for pattern in import_patterns:
|
|
matches = re.findall(pattern, content, re.MULTILINE)
|
|
imports.extend(matches)
|
|
|
|
return list(set(imports))
|
|
|
|
def _extract_docstring(self, lines: list[str]) -> Optional[str]:
|
|
"""Extract docstring from lines."""
|
|
if not lines:
|
|
return None
|
|
|
|
first_line = lines[0].strip()
|
|
|
|
triple_quotes = ['"""', "'''", '"""', '"""']
|
|
|
|
for quote in triple_quotes:
|
|
if first_line.startswith(quote) and first_line.endswith(quote):
|
|
return first_line[len(quote):-len(quote)].strip()
|
|
|
|
if first_line.startswith(quote):
|
|
end_quote = None
|
|
for i, line in enumerate(lines[1:], 1):
|
|
if quote in line:
|
|
end_quote = i
|
|
break
|
|
|
|
if end_quote:
|
|
doc_lines = [first_line[len(quote):]]
|
|
for line in lines[1:end_quote]:
|
|
doc_lines.append(line)
|
|
if lines[end_quote].rstrip().endswith(quote):
|
|
doc_lines[-1] = lines[end_quote].rstrip()[:-len(quote)]
|
|
return '\n'.join(doc_lines).strip()
|
|
|
|
return None
|
|
|
|
def _parse_params(self, params_str: str) -> list[str]:
|
|
"""Parse function parameters."""
|
|
if not params_str.strip():
|
|
return []
|
|
|
|
params = []
|
|
for param in params_str.split(','):
|
|
param = param.strip()
|
|
param = re.sub(r'\s+=\s*.+$', '', param)
|
|
param = param.split(':')[0].strip()
|
|
if param and param != 'self' and param != 'cls':
|
|
params.append(param)
|
|
|
|
return params
|
|
|
|
def _calculate_complexity(self, content: str) -> int:
|
|
"""Calculate cyclomatic complexity."""
|
|
complexity = 1
|
|
|
|
keywords = ['if', 'elif', 'for', 'while', 'and', 'or', 'except', 'with', 'assert']
|
|
|
|
for keyword in keywords:
|
|
complexity += content.count(keyword)
|
|
|
|
try_count = content.count('try:')
|
|
except_count = content.count('except:')
|
|
|
|
if try_count > except_count:
|
|
complexity += try_count
|
|
|
|
return complexity
|