Initial upload with CI/CD workflow
This commit is contained in:
161
codesnap/core/dependency_analyzer.py
Normal file
161
codesnap/core/dependency_analyzer.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Dependency analysis module using NetworkX."""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import networkx as nx
|
||||
|
||||
from .parser import CodeParser, ImportStatement, ParsedFile
|
||||
|
||||
|
||||
@dataclass
|
||||
class DependencyEdge:
|
||||
"""Represents a dependency between two files."""
|
||||
|
||||
source: Path
|
||||
target: Path
|
||||
import_module: str
|
||||
line_number: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class DependencyReport:
|
||||
"""Complete dependency analysis report."""
|
||||
|
||||
files: list[Path] = field(default_factory=list)
|
||||
dependencies: list[DependencyEdge] = field(default_factory=list)
|
||||
cycles: list[list[Path]] = field(default_factory=list)
|
||||
orphaned_files: list[Path] = field(default_factory=list)
|
||||
most_depended: list[tuple[Path, int]] = field(default_factory=list)
|
||||
most_dependant: list[tuple[Path, int]] = field(default_factory=list)
|
||||
|
||||
|
||||
class DependencyAnalyzer:
|
||||
"""Analyzes dependencies between files in a codebase."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.parser = CodeParser()
|
||||
self.graph = nx.DiGraph()
|
||||
|
||||
def analyze(
|
||||
self, files: list[ParsedFile], base_path: Optional[Path] = None
|
||||
) -> DependencyReport:
|
||||
"""Analyze dependencies from a list of parsed files."""
|
||||
self.graph.clear()
|
||||
|
||||
if base_path is None and files:
|
||||
base_path = files[0].path.parent
|
||||
|
||||
file_paths = {f.path: f for f in files}
|
||||
|
||||
for parsed_file in files:
|
||||
self.graph.add_node(parsed_file.path)
|
||||
|
||||
dependencies: list[DependencyEdge] = []
|
||||
|
||||
for parsed_file in files:
|
||||
imports = self.parser.extract_imports(parsed_file)
|
||||
|
||||
for imp in imports:
|
||||
target_path = self._resolve_import(
|
||||
imp, parsed_file.path, list(file_paths.keys())
|
||||
)
|
||||
if target_path and target_path in file_paths:
|
||||
edge = DependencyEdge(
|
||||
source=parsed_file.path,
|
||||
target=target_path,
|
||||
import_module=imp.module,
|
||||
line_number=imp.line_number,
|
||||
)
|
||||
dependencies.append(edge)
|
||||
self.graph.add_edge(parsed_file.path, target_path)
|
||||
|
||||
report = DependencyReport(
|
||||
files=list(file_paths.keys()),
|
||||
dependencies=dependencies,
|
||||
)
|
||||
|
||||
if self.graph.nodes:
|
||||
report.cycles = self._find_cycles()
|
||||
report.orphaned_files = self._find_orphaned()
|
||||
report.most_depended = self._find_most_depended()
|
||||
report.most_dependant = self._find_most_dependant()
|
||||
|
||||
return report
|
||||
|
||||
def _resolve_import(
|
||||
self, imp: ImportStatement, source_path: Path, available_files: list[Path]
|
||||
) -> Optional[Path]:
|
||||
"""Resolve an import statement to a file path."""
|
||||
module_parts = imp.module.replace("/", ".").replace("\\", ".").split(".")
|
||||
|
||||
for filepath in available_files:
|
||||
if filepath == source_path:
|
||||
continue
|
||||
|
||||
filepath_parts = filepath.stem.split(".")
|
||||
|
||||
if module_parts[0] == filepath.stem or module_parts[0] in filepath_parts:
|
||||
return filepath
|
||||
|
||||
filepath_name = filepath.name
|
||||
for part in reversed(module_parts):
|
||||
if filepath_name.startswith(part):
|
||||
return filepath
|
||||
|
||||
return None
|
||||
|
||||
def _find_cycles(self) -> list[list[Path]]:
|
||||
"""Find circular dependencies in the graph."""
|
||||
cycles: list[list[Path]] = []
|
||||
try:
|
||||
for cycle in nx.simple_cycles(self.graph):
|
||||
if len(cycle) > 1:
|
||||
cycles.append(cycle)
|
||||
except nx.NetworkXNoCycle:
|
||||
pass
|
||||
return cycles
|
||||
|
||||
def _find_orphaned(self) -> list[Path]:
|
||||
"""Find files with no dependencies (orphaned files)."""
|
||||
orphaned: list[Path] = []
|
||||
for node in self.graph.nodes():
|
||||
if self.graph.in_degree(node) == 0 and self.graph.out_degree(node) == 0:
|
||||
orphaned.append(node)
|
||||
return orphaned
|
||||
|
||||
def _find_most_depended(self) -> list[tuple[Path, int]]:
|
||||
"""Find files that are depended on by most other files."""
|
||||
in_degrees = [(node, self.graph.in_degree(node)) for node in self.graph.nodes()]
|
||||
in_degrees.sort(key=lambda x: x[1], reverse=True)
|
||||
return in_degrees[:10]
|
||||
|
||||
def _find_most_dependant(self) -> list[tuple[Path, int]]:
|
||||
"""Find files that depend on most other files."""
|
||||
out_degrees = [(node, self.graph.out_degree(node)) for node in self.graph.nodes()]
|
||||
out_degrees.sort(key=lambda x: x[1], reverse=True)
|
||||
return out_degrees[:10]
|
||||
|
||||
def get_graph_stats(self) -> dict:
|
||||
"""Get statistics about the dependency graph."""
|
||||
return {
|
||||
"total_nodes": self.graph.number_of_nodes(),
|
||||
"total_edges": self.graph.number_of_edges(),
|
||||
"is_dag": nx.is_directed_acyclic_graph(self.graph),
|
||||
"weakly_connected_components": nx.number_weakly_connected_components(
|
||||
self.graph
|
||||
),
|
||||
}
|
||||
|
||||
def export_graph(self, format: str = "dot") -> str:
|
||||
"""Export the dependency graph in specified format."""
|
||||
if format == "dot":
|
||||
return nx.drawing.nx_pydot.to_pydot(self.graph).to_string()
|
||||
elif format == "adjacency":
|
||||
return "\n".join(
|
||||
f"{node}: {list(self.graph.successors(node))}"
|
||||
for node in self.graph.nodes()
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported format: {format}")
|
||||
Reference in New Issue
Block a user