203 lines
7.2 KiB
Python
203 lines
7.2 KiB
Python
from typing import Optional
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from codechunk.core.chunking import ParsedChunk
|
|
|
|
|
|
@dataclass
|
|
class DependencyNode:
|
|
chunk_name: str
|
|
file_path: Path
|
|
module_name: str
|
|
dependencies: set[str] = field(default_factory=set)
|
|
dependents: set[str] = field(default_factory=set)
|
|
is_circular: bool = False
|
|
|
|
|
|
class DependencyAnalyzer:
|
|
def __init__(self):
|
|
self.dependency_graph: dict[str, DependencyNode] = {}
|
|
self.module_to_chunks: dict[str, list[str]] = {}
|
|
|
|
def analyze_dependencies(self, chunks: list[ParsedChunk],
|
|
project_files: list[Path]) -> dict[str, DependencyNode]:
|
|
self.dependency_graph = {}
|
|
self.module_to_chunks = {}
|
|
|
|
project_files_set = set(project_files)
|
|
module_cache = self._build_module_cache(project_files)
|
|
|
|
for chunk in chunks:
|
|
node = DependencyNode(
|
|
chunk_name=chunk.name,
|
|
file_path=chunk.metadata.file_path,
|
|
module_name=self._get_module_name(chunk.metadata.file_path, project_files_set)
|
|
)
|
|
|
|
for imp in chunk.metadata.imports:
|
|
resolved = self._resolve_import(imp, chunk.metadata.file_path,
|
|
project_files_set, module_cache)
|
|
if resolved:
|
|
node.dependencies.add(resolved)
|
|
|
|
self.dependency_graph[chunk.name] = node
|
|
|
|
module = node.module_name
|
|
if module not in self.module_to_chunks:
|
|
self.module_to_chunks[module] = []
|
|
self.module_to_chunks[module].append(chunk.name)
|
|
|
|
self._build_dependency_links()
|
|
self._detect_circular_dependencies()
|
|
|
|
return self.dependency_graph
|
|
|
|
def _build_module_cache(self, project_files: list[Path]) -> dict[Path, str]:
|
|
cache: dict[Path, str] = {}
|
|
for file_path in project_files:
|
|
module_name = self._get_module_name(file_path, set(project_files))
|
|
cache[file_path] = module_name
|
|
return cache
|
|
|
|
def _get_module_name(self, file_path: Path, project_root: set[Path]) -> str:
|
|
try:
|
|
if project_root:
|
|
root = min(project_root, key=lambda p: len(p.parts))
|
|
rel_path = file_path.relative_to(root)
|
|
else:
|
|
rel_path = file_path
|
|
|
|
parts = list(rel_path.parts)
|
|
if not parts:
|
|
return file_path.stem
|
|
|
|
if parts[-1] == "__init__.py":
|
|
parts = parts[:-1]
|
|
else:
|
|
parts[-1] = parts[-1].rsplit('.', 1)[0]
|
|
|
|
return '.'.join(parts)
|
|
except (ValueError, IndexError):
|
|
return file_path.stem
|
|
|
|
def _resolve_import(self, import_str: str, current_file: Path,
|
|
project_root: set[Path], module_cache: dict[Path, str]) -> Optional[str]:
|
|
clean_import = import_str.strip()
|
|
|
|
parts = clean_import.split('.')
|
|
|
|
base_module = parts[0]
|
|
|
|
if base_module == 'self' or base_module == '.':
|
|
return self._get_module_name(current_file, project_root)
|
|
|
|
if parts[0] in ['os', 'sys', 're', 'json', 'yaml', 'typing', 'collections',
|
|
'itertools', 'functools', 'abc', 'dataclasses', 'enum',
|
|
'pathlib', 'abc', 'threading', 'multiprocessing', 'asyncio',
|
|
'requests', 'flask', 'django', 'fastapi', 'numpy', 'pandas',
|
|
'torch', 'tensorflow', 'matplotlib', 'scipy', 'sklearn']:
|
|
return None
|
|
|
|
for _file_path, module_name in module_cache.items():
|
|
if module_name.endswith(base_module) or module_name == base_module:
|
|
return module_name
|
|
|
|
if len(parts) > 1:
|
|
parent_module = '.'.join(parts[:-1])
|
|
if module_name.endswith(parent_module) or module_name == parent_module:
|
|
return module_name
|
|
|
|
return clean_import
|
|
|
|
def _build_dependency_links(self) -> None:
|
|
for node in self.dependency_graph.values():
|
|
for dep in node.dependencies:
|
|
if dep in self.dependency_graph:
|
|
self.dependency_graph[dep].dependents.add(node.chunk_name)
|
|
|
|
def _detect_circular_dependencies(self) -> None:
|
|
visited = set()
|
|
rec_stack = set()
|
|
|
|
def detect_cycle(node_name: str, path: list[str]) -> bool:
|
|
visited.add(node_name)
|
|
rec_stack.add(node_name)
|
|
path.append(node_name)
|
|
|
|
node = self.dependency_graph.get(node_name)
|
|
if node:
|
|
for dep in node.dependencies:
|
|
if dep not in visited:
|
|
if detect_cycle(dep, path):
|
|
return True
|
|
elif dep in rec_stack:
|
|
for n in path + [dep]:
|
|
if n in self.dependency_graph:
|
|
self.dependency_graph[n].is_circular = True
|
|
return True
|
|
|
|
rec_stack.remove(node_name)
|
|
path.pop()
|
|
return False
|
|
|
|
for node_name in self.dependency_graph:
|
|
if node_name not in visited:
|
|
detect_cycle(node_name, [])
|
|
|
|
def get_essential_chunks(self, selected_chunks: list[str]) -> list[str]:
|
|
essential = set(selected_chunks)
|
|
to_process = list(selected_chunks)
|
|
|
|
while to_process:
|
|
chunk_name = to_process.pop(0)
|
|
node = self.dependency_graph.get(chunk_name)
|
|
if node:
|
|
for dep in node.dependencies:
|
|
if dep not in essential:
|
|
essential.add(dep)
|
|
to_process.append(dep)
|
|
|
|
return list(essential)
|
|
|
|
def get_impacted_chunks(self, modified_chunks: list[str]) -> list[str]:
|
|
impacted = set(modified_chunks)
|
|
to_process = list(modified_chunks)
|
|
|
|
while to_process:
|
|
chunk_name = to_process.pop(0)
|
|
node = self.dependency_graph.get(chunk_name)
|
|
if node:
|
|
for dependent in node.dependents:
|
|
if dependent not in impacted:
|
|
impacted.add(dependent)
|
|
to_process.append(dependent)
|
|
|
|
return list(impacted)
|
|
|
|
def get_dependency_stats(self) -> dict[str, int]:
|
|
stats = {
|
|
"total_nodes": len(self.dependency_graph),
|
|
"nodes_with_deps": 0,
|
|
"nodes_with_dependents": 0,
|
|
"circular_deps": 0,
|
|
"total_edges": 0,
|
|
"max_dependencies": 0,
|
|
"max_dependents": 0,
|
|
}
|
|
|
|
for node in self.dependency_graph.values():
|
|
dep_count = len(node.dependencies)
|
|
depent_count = len(node.dependents)
|
|
|
|
stats["total_edges"] += dep_count
|
|
if dep_count > 0:
|
|
stats["nodes_with_deps"] += 1
|
|
if depent_count > 0:
|
|
stats["nodes_with_dependents"] += 1
|
|
if node.is_circular:
|
|
stats["circular_deps"] += 1
|
|
stats["max_dependencies"] = max(stats["max_dependencies"], dep_count)
|
|
stats["max_dependents"] = max(stats["max_dependents"], depent_count)
|
|
|
|
return stats
|