Initial commit: CodeMap v0.1.0 - CLI tool for code analysis and diagram generation
This commit is contained in:
173
codemap/core/graph_builder.py
Normal file
173
codemap/core/graph_builder.py
Normal file
@@ -0,0 +1,173 @@
|
||||
import networkx as nx
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
from dataclasses import dataclass, field
|
||||
from codemap.parsers import ParsedFile, Dependency
|
||||
|
||||
|
||||
@dataclass
|
||||
class Node:
|
||||
id: str
|
||||
name: str
|
||||
file_path: Path
|
||||
file_type: str
|
||||
module_name: str = ""
|
||||
package: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class Edge:
|
||||
source: str
|
||||
target: str
|
||||
label: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class GraphData:
|
||||
nodes: List[Node] = field(default_factory=list)
|
||||
edges: List[Edge] = field(default_factory=list)
|
||||
packages: Dict[str, List[str]] = field(default_factory=dict)
|
||||
|
||||
|
||||
class GraphBuilder:
|
||||
def __init__(self):
|
||||
self.graph = nx.DiGraph()
|
||||
self.files: Dict[Path, ParsedFile] = {}
|
||||
|
||||
def add_file(self, parsed_file: ParsedFile) -> None:
|
||||
self.files[parsed_file.file_path] = parsed_file
|
||||
node_id = self._get_node_id(parsed_file.file_path)
|
||||
self.graph.add_node(
|
||||
node_id,
|
||||
file_path=str(parsed_file.file_path),
|
||||
module_name=parsed_file.module_name,
|
||||
file_type=parsed_file.file_type,
|
||||
label=parsed_file.module_name
|
||||
)
|
||||
|
||||
for dep in parsed_file.dependencies:
|
||||
target_id = self._get_dep_id(dep)
|
||||
self.graph.add_node(target_id, label=dep.module_name)
|
||||
self.graph.add_edge(node_id, target_id, label=dep.module_name)
|
||||
|
||||
def _get_node_id(self, file_path: Path) -> str:
|
||||
return str(file_path.absolute())
|
||||
|
||||
def _get_dep_id(self, dep: Dependency) -> str:
|
||||
return dep.module_name
|
||||
|
||||
def build_from_files(self, files: List[ParsedFile]) -> None:
|
||||
for parsed_file in files:
|
||||
self.add_file(parsed_file)
|
||||
|
||||
def get_dependencies(self, node_id: str, depth: int = 1) -> Set[str]:
|
||||
try:
|
||||
return nx.descendants(self.graph, node_id)
|
||||
except nx.NetworkXError:
|
||||
return set()
|
||||
|
||||
def get_dependents(self, node_id: str, depth: int = 1) -> Set[str]:
|
||||
try:
|
||||
return nx.ancestors(self.graph, node_id)
|
||||
except nx.NetworkXError:
|
||||
return set()
|
||||
|
||||
def filter_by_depth(self, start_nodes: List[str], max_depth: int) -> nx.DiGraph:
|
||||
filtered = nx.DiGraph()
|
||||
for start in start_nodes:
|
||||
if start not in self.graph:
|
||||
continue
|
||||
for node in self.graph.nodes():
|
||||
try:
|
||||
path_length = nx.shortest_path_length(self.graph, start, node)
|
||||
if path_length <= max_depth:
|
||||
if node in self.graph.nodes():
|
||||
filtered.add_node(node, **self.graph.nodes[node])
|
||||
except (nx.NetworkXError, nx.NetworkXNoPath):
|
||||
continue
|
||||
|
||||
for source, target in self.graph.edges():
|
||||
if source in filtered.nodes() and target in filtered.nodes():
|
||||
filtered.add_edge(source, target, **self.graph.edges[source, target])
|
||||
|
||||
return filtered
|
||||
|
||||
def get_packages(self) -> Dict[str, List[str]]:
|
||||
packages: Dict[str, List[str]] = {}
|
||||
for node_id in self.graph.nodes():
|
||||
if node_id.startswith("/"):
|
||||
path = Path(node_id)
|
||||
parts = path.parts
|
||||
if len(parts) > 1:
|
||||
package = parts[-2]
|
||||
if package not in packages:
|
||||
packages[package] = []
|
||||
packages[package].append(node_id)
|
||||
return packages
|
||||
|
||||
def get_graph_data(self) -> GraphData:
|
||||
nodes = []
|
||||
for node_id in self.graph.nodes():
|
||||
data = self.graph.nodes[node_id]
|
||||
if node_id.startswith("/"):
|
||||
file_path = Path(node_id)
|
||||
node = Node(
|
||||
id=node_id,
|
||||
name=data.get("label", file_path.stem),
|
||||
file_path=file_path,
|
||||
file_type=data.get("file_type", ""),
|
||||
module_name=data.get("module_name", "")
|
||||
)
|
||||
nodes.append(node)
|
||||
|
||||
edges = []
|
||||
for source, target, data in self.graph.edges(data=True):
|
||||
edges.append(Edge(source=source, target=target, label=data.get("label", "")))
|
||||
|
||||
packages = self.get_packages()
|
||||
|
||||
return GraphData(nodes=nodes, edges=edges, packages=packages)
|
||||
|
||||
def _build_graph_data_from_filtered(self, filtered_graph: nx.DiGraph) -> GraphData:
|
||||
nodes = []
|
||||
for node_id in filtered_graph.nodes():
|
||||
data = filtered_graph.nodes[node_id]
|
||||
if node_id.startswith("/"):
|
||||
file_path = Path(node_id)
|
||||
node = Node(
|
||||
id=node_id,
|
||||
name=data.get("label", file_path.stem),
|
||||
file_path=file_path,
|
||||
file_type=data.get("file_type", ""),
|
||||
module_name=data.get("module_name", "")
|
||||
)
|
||||
nodes.append(node)
|
||||
|
||||
edges = []
|
||||
for source, target, data in filtered_graph.edges(data=True):
|
||||
edges.append(Edge(source=source, target=target, label=data.get("label", "")))
|
||||
|
||||
packages = self._get_packages_from_graph(filtered_graph)
|
||||
|
||||
return GraphData(nodes=nodes, edges=edges, packages=packages)
|
||||
|
||||
def _get_packages_from_graph(self, graph: nx.DiGraph) -> Dict[str, List[str]]:
|
||||
packages: Dict[str, List[str]] = {}
|
||||
for node_id in graph.nodes():
|
||||
if node_id.startswith("/"):
|
||||
path = Path(node_id)
|
||||
parts = path.parts
|
||||
if len(parts) > 1:
|
||||
package = parts[-2]
|
||||
if package not in packages:
|
||||
packages[package] = []
|
||||
packages[package].append(node_id)
|
||||
return packages
|
||||
|
||||
def get_stats(self) -> Dict:
|
||||
return {
|
||||
"node_count": self.graph.number_of_nodes(),
|
||||
"edge_count": self.graph.number_of_edges(),
|
||||
"file_count": len(self.files),
|
||||
"is_dag": nx.is_directed_acyclic_graph(self.graph)
|
||||
}
|
||||
Reference in New Issue
Block a user