Add graph, analyzers, and exporters modules
This commit is contained in:
144
src/analyzers/dependencies.py
Normal file
144
src/analyzers/dependencies.py
Normal file
@@ -0,0 +1,144 @@
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
import networkx as nx
|
||||
|
||||
from src.graph.builder import GraphBuilder, NodeType
|
||||
|
||||
|
||||
@dataclass
|
||||
class DependencyReport:
|
||||
total_files: int = 0
|
||||
total_functions: int = 0
|
||||
total_classes: int = 0
|
||||
circular_dependencies: list[list[str]] = field(default_factory=list)
|
||||
orphan_files: list[str] = field(default_factory=list)
|
||||
dependency_cycles: list[list[Path]] = field(default_factory=list)
|
||||
modules: dict[str, list[str]] = field(default_factory=dict)
|
||||
warnings: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class DependencyAnalyzer:
|
||||
def __init__(self, graph_builder: GraphBuilder):
|
||||
self.graph_builder = graph_builder
|
||||
self.graph = graph_builder.get_graph()
|
||||
self.report = DependencyReport()
|
||||
|
||||
def analyze(self) -> DependencyReport:
|
||||
self.report = DependencyReport()
|
||||
self._count_entities()
|
||||
self._detect_circular_dependencies()
|
||||
self._find_orphan_files()
|
||||
self._identify_modules()
|
||||
return self.report
|
||||
|
||||
def _count_entities(self) -> None:
|
||||
nodes = self.graph_builder.get_nodes()
|
||||
for node in nodes:
|
||||
if node.node_type == NodeType.FILE:
|
||||
self.report.total_files += 1
|
||||
elif node.node_type == NodeType.FUNCTION:
|
||||
self.report.total_functions += 1
|
||||
elif node.node_type == NodeType.CLASS:
|
||||
self.report.total_classes += 1
|
||||
|
||||
def _detect_circular_dependencies(self) -> None:
|
||||
try:
|
||||
cycles = list(nx.simple_cycles(self.graph))
|
||||
for cycle in cycles:
|
||||
if len(cycle) > 1:
|
||||
cycle_paths = []
|
||||
for node_id in cycle:
|
||||
node = self.graph_builder.get_node_by_id(node_id)
|
||||
if node and node.file_path:
|
||||
cycle_paths.append(node.file_path)
|
||||
if cycle_paths:
|
||||
self.report.circular_dependencies.append(cycle)
|
||||
self.report.dependency_cycles.append(cycle_paths)
|
||||
|
||||
if cycles:
|
||||
self.report.warnings.append(f"Found {len(cycles)} circular dependency cycle(s)")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _find_orphan_files(self) -> None:
|
||||
nodes = self.graph_builder.get_nodes()
|
||||
for node in nodes:
|
||||
if node.node_type == NodeType.FILE:
|
||||
predecessors = list(self.graph.predecessors(node.node_id))
|
||||
|
||||
import_edges = [
|
||||
e for e in self.graph_builder.edges
|
||||
if e.source == node.node_id and e.edge_type == "imports"
|
||||
]
|
||||
|
||||
if not import_edges and not predecessors:
|
||||
if node.file_path:
|
||||
self.report.orphan_files.append(str(node.file_path))
|
||||
|
||||
def _identify_modules(self) -> None:
|
||||
nodes = self.graph_builder.get_nodes()
|
||||
for node in nodes:
|
||||
if node.node_type == NodeType.FILE and node.file_path:
|
||||
parts = node.file_path.parts
|
||||
module_name = parts[0] if parts else "root"
|
||||
if module_name not in self.report.modules:
|
||||
self.report.modules[module_name] = []
|
||||
self.report.modules[module_name].append(str(node.file_path))
|
||||
|
||||
def get_file_dependencies(self, file_path: Path) -> list[tuple[str, str]]:
|
||||
dependencies = []
|
||||
nodes = self.graph_builder.get_nodes()
|
||||
|
||||
file_node_id = f"file_{file_path}"
|
||||
for node in nodes:
|
||||
if node.node_id == file_node_id:
|
||||
for edge in self.graph_builder.edges:
|
||||
if edge.source == file_node_id and edge.edge_type == "imports":
|
||||
target_node = self.graph_builder.get_node_by_id(edge.target)
|
||||
if target_node:
|
||||
dependencies.append((target_node.name, edge.label))
|
||||
break
|
||||
|
||||
return dependencies
|
||||
|
||||
def get_file_dependents(self, file_path: Path) -> list[str]:
|
||||
dependents = []
|
||||
file_node_id = f"file_{file_path}"
|
||||
|
||||
for edge in self.graph_builder.edges:
|
||||
if edge.target == file_node_id and edge.edge_type == "imports":
|
||||
source_node = self.graph_builder.get_node_by_id(edge.source)
|
||||
if source_node:
|
||||
dependents.append(source_node.name)
|
||||
|
||||
return dependents
|
||||
|
||||
def get_call_graph(self) -> nx.Graph:
|
||||
call_graph = nx.DiGraph()
|
||||
|
||||
for edge in self.graph_builder.edges:
|
||||
if edge.edge_type == "calls":
|
||||
call_graph.add_edge(edge.source, edge.target)
|
||||
|
||||
return call_graph
|
||||
|
||||
def get_architecture_layers(self) -> dict[str, list[str]]:
|
||||
layers = {"presentation": [], "business": [], "data": [], "other": []}
|
||||
|
||||
layer_keywords = {
|
||||
"presentation": ["ui", "view", "controller", "handler", "route", "api"],
|
||||
"business": ["service", "business", "logic", "manager", "facade"],
|
||||
"data": ["repository", "dao", "model", "entity", "database", "db"],
|
||||
}
|
||||
|
||||
for node in self.graph_builder.get_nodes():
|
||||
if node.node_type == NodeType.FILE:
|
||||
path_lower = str(node.file_path).lower() if node.file_path else ""
|
||||
for layer, keywords in layer_keywords.items():
|
||||
if any(kw in path_lower for kw in keywords):
|
||||
layers[layer].append(node.name)
|
||||
break
|
||||
else:
|
||||
layers["other"].append(node.name)
|
||||
|
||||
return {k: v for k, v in layers.items() if v}
|
||||
Reference in New Issue
Block a user