From ae2526ccdc334187346274205dfb8163bb79f2c7 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Fri, 30 Jan 2026 12:18:57 +0000 Subject: [PATCH] Initial commit: CodeMap v0.1.0 - CLI tool for code analysis and diagram generation --- codemap/core/graph_builder.py | 173 ++++++++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 codemap/core/graph_builder.py diff --git a/codemap/core/graph_builder.py b/codemap/core/graph_builder.py new file mode 100644 index 0000000..54a584f --- /dev/null +++ b/codemap/core/graph_builder.py @@ -0,0 +1,173 @@ +import networkx as nx +from pathlib import Path +from typing import Dict, List, Optional, Set, Tuple +from dataclasses import dataclass, field +from codemap.parsers import ParsedFile, Dependency + + +@dataclass +class Node: + id: str + name: str + file_path: Path + file_type: str + module_name: str = "" + package: str = "" + + +@dataclass +class Edge: + source: str + target: str + label: str = "" + + +@dataclass +class GraphData: + nodes: List[Node] = field(default_factory=list) + edges: List[Edge] = field(default_factory=list) + packages: Dict[str, List[str]] = field(default_factory=dict) + + +class GraphBuilder: + def __init__(self): + self.graph = nx.DiGraph() + self.files: Dict[Path, ParsedFile] = {} + + def add_file(self, parsed_file: ParsedFile) -> None: + self.files[parsed_file.file_path] = parsed_file + node_id = self._get_node_id(parsed_file.file_path) + self.graph.add_node( + node_id, + file_path=str(parsed_file.file_path), + module_name=parsed_file.module_name, + file_type=parsed_file.file_type, + label=parsed_file.module_name + ) + + for dep in parsed_file.dependencies: + target_id = self._get_dep_id(dep) + self.graph.add_node(target_id, label=dep.module_name) + self.graph.add_edge(node_id, target_id, label=dep.module_name) + + def _get_node_id(self, file_path: Path) -> str: + return str(file_path.absolute()) + + def _get_dep_id(self, dep: Dependency) -> str: + return dep.module_name + + def build_from_files(self, files: List[ParsedFile]) -> None: + for parsed_file in files: + self.add_file(parsed_file) + + def get_dependencies(self, node_id: str, depth: int = 1) -> Set[str]: + try: + return nx.descendants(self.graph, node_id) + except nx.NetworkXError: + return set() + + def get_dependents(self, node_id: str, depth: int = 1) -> Set[str]: + try: + return nx.ancestors(self.graph, node_id) + except nx.NetworkXError: + return set() + + def filter_by_depth(self, start_nodes: List[str], max_depth: int) -> nx.DiGraph: + filtered = nx.DiGraph() + for start in start_nodes: + if start not in self.graph: + continue + for node in self.graph.nodes(): + try: + path_length = nx.shortest_path_length(self.graph, start, node) + if path_length <= max_depth: + if node in self.graph.nodes(): + filtered.add_node(node, **self.graph.nodes[node]) + except (nx.NetworkXError, nx.NetworkXNoPath): + continue + + for source, target in self.graph.edges(): + if source in filtered.nodes() and target in filtered.nodes(): + filtered.add_edge(source, target, **self.graph.edges[source, target]) + + return filtered + + def get_packages(self) -> Dict[str, List[str]]: + packages: Dict[str, List[str]] = {} + for node_id in self.graph.nodes(): + if node_id.startswith("/"): + path = Path(node_id) + parts = path.parts + if len(parts) > 1: + package = parts[-2] + if package not in packages: + packages[package] = [] + packages[package].append(node_id) + return packages + + def get_graph_data(self) -> GraphData: + nodes = [] + for node_id in self.graph.nodes(): + data = self.graph.nodes[node_id] + if node_id.startswith("/"): + file_path = Path(node_id) + node = Node( + id=node_id, + name=data.get("label", file_path.stem), + file_path=file_path, + file_type=data.get("file_type", ""), + module_name=data.get("module_name", "") + ) + nodes.append(node) + + edges = [] + for source, target, data in self.graph.edges(data=True): + edges.append(Edge(source=source, target=target, label=data.get("label", ""))) + + packages = self.get_packages() + + return GraphData(nodes=nodes, edges=edges, packages=packages) + + def _build_graph_data_from_filtered(self, filtered_graph: nx.DiGraph) -> GraphData: + nodes = [] + for node_id in filtered_graph.nodes(): + data = filtered_graph.nodes[node_id] + if node_id.startswith("/"): + file_path = Path(node_id) + node = Node( + id=node_id, + name=data.get("label", file_path.stem), + file_path=file_path, + file_type=data.get("file_type", ""), + module_name=data.get("module_name", "") + ) + nodes.append(node) + + edges = [] + for source, target, data in filtered_graph.edges(data=True): + edges.append(Edge(source=source, target=target, label=data.get("label", ""))) + + packages = self._get_packages_from_graph(filtered_graph) + + return GraphData(nodes=nodes, edges=edges, packages=packages) + + def _get_packages_from_graph(self, graph: nx.DiGraph) -> Dict[str, List[str]]: + packages: Dict[str, List[str]] = {} + for node_id in graph.nodes(): + if node_id.startswith("/"): + path = Path(node_id) + parts = path.parts + if len(parts) > 1: + package = parts[-2] + if package not in packages: + packages[package] = [] + packages[package].append(node_id) + return packages + + def get_stats(self) -> Dict: + return { + "node_count": self.graph.number_of_nodes(), + "edge_count": self.graph.number_of_edges(), + "file_count": len(self.files), + "is_dag": nx.is_directed_acyclic_graph(self.graph) + }