From 62facd3d0639d2b89cb9156793cc86323aff3746 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Mon, 2 Feb 2026 02:40:22 +0000 Subject: [PATCH] Add graph, analyzers, and exporters modules --- src/graph/builder.py | 279 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 279 insertions(+) create mode 100644 src/graph/builder.py diff --git a/src/graph/builder.py b/src/graph/builder.py new file mode 100644 index 0000000..8de8eee --- /dev/null +++ b/src/graph/builder.py @@ -0,0 +1,279 @@ +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Optional +import networkx as nx + +from src.parsers.base import Entity, EntityType, ParserResult + + +class GraphType(Enum): + DIRECTED = "directed" + UNDIRECTED = "undirected" + + +class NodeType(Enum): + FILE = "file" + FUNCTION = "function" + CLASS = "class" + METHOD = "method" + MODULE = "module" + + +@dataclass +class GraphNode: + node_id: str + node_type: NodeType + name: str + file_path: Optional[Path] = None + start_line: int = 0 + end_line: int = 0 + attributes: dict = field(default_factory=dict) + label: str = "" + style: str = "filled" + shape: str = "ellipse" + color: str = "#97c2fc" + + def __post_init__(self): + if not self.label: + self.label = self.name + + +@dataclass +class GraphEdge: + source: str + target: str + edge_type: str = "depends" + label: str = "" + attributes: dict = field(default_factory=dict) + + +class GraphBuilder: + def __init__(self, graph_type: GraphType = GraphType.DIRECTED): + self.graph = nx.DiGraph() if graph_type == GraphType.DIRECTED else nx.Graph() + self.nodes: dict[str, GraphNode] = {} + self.edges: list[GraphEdge] = [] + self.node_counter = 0 + + def add_node(self, node: GraphNode) -> str: + if node.node_id in self.nodes: + return node.node_id + self.node_counter += 1 + if not node.node_id: + node.node_id = f"node_{self.node_counter}" + self.nodes[node.node_id] = node + self.graph.add_node( + node.node_id, + node_type=node.node_type.value, + name=node.name, + label=node.label, + style=node.style, + shape=node.shape, + color=node.color, + **node.attributes, + ) + return node.node_id + + def add_edge(self, edge: GraphEdge) -> None: + if edge.source not in self.nodes or edge.target not in self.nodes: + return + self.edges.append(edge) + self.graph.add_edge( + edge.source, + edge.target, + edge_type=edge.edge_type, + label=edge.label, + **edge.attributes, + ) + + def build_from_parser_results(self, results: list[ParserResult]) -> "GraphBuilder": + file_nodes: dict[Path, str] = {} + + for result in results: + if result.errors: + continue + + file_node_id = self._add_file_node(result.file_path) + file_nodes[result.file_path] = file_node_id + + for entity in result.entities: + entity_node_id = self._add_entity_node(entity) + self._connect_entity_to_file(entity_node_id, file_node_id) + + for call in entity.calls: + self._add_call_edge(entity_node_id, call, entity.file_path) + + self._build_import_edges(results, file_nodes) + + return self + + def _add_file_node(self, file_path: Path) -> str: + node_id = f"file_{file_path}" + node = GraphNode( + node_id=node_id, + node_type=NodeType.FILE, + name=file_path.name, + file_path=file_path, + label=file_path.name, + shape="box", + color="#fb7e81", + ) + return self.add_node(node) + + def _add_entity_node(self, entity: Entity) -> str: + node_type = self._entity_type_to_node_type(entity.entity_type) + node_id = f"{entity.entity_type.value}_{entity.name}_{entity.file_path}" + + node = GraphNode( + node_id=node_id, + node_type=node_type, + name=entity.name, + file_path=entity.file_path, + start_line=entity.start_line, + end_line=entity.end_line, + label=f"{entity.name}:{entity.start_line}", + color=self._get_color_for_entity_type(entity.entity_type), + ) + + if entity.entity_type == EntityType.CLASS: + node.shape = "diamond" + elif entity.entity_type == EntityType.FUNCTION: + node.shape = "ellipse" + + node.attributes = entity.attributes.copy() + + return self.add_node(node) + + def _connect_entity_to_file(self, entity_node_id: str, file_node_id: str) -> None: + edge = GraphEdge( + source=entity_node_id, + target=file_node_id, + edge_type="contains", + label="defined in", + ) + self.add_edge(edge) + + def _add_call_edge(self, from_node_id: str, call_name: str, file_path: Path) -> None: + target_node_id = f"function_{call_name}_{file_path}" + edge = GraphEdge( + source=from_node_id, + target=target_node_id, + edge_type="calls", + label="calls", + ) + self.add_edge(edge) + + def _build_import_edges(self, results: list[ParserResult], file_nodes: dict[Path, str]) -> None: + for result in results: + if result.file_path not in file_nodes: + continue + source_node_id = file_nodes[result.file_path] + + for import_path in result.imports: + target_file = self._resolve_import_to_file(import_path, result.file_path) + if target_file and target_file in file_nodes: + target_node_id = file_nodes[target_file] + edge = GraphEdge( + source=source_node_id, + target=target_node_id, + edge_type="imports", + label=import_path, + ) + self.add_edge(edge) + + def _resolve_import_to_file(self, import_path: str, source_file: Path) -> Optional[Path]: + pass + + def _entity_type_to_node_type(self, entity_type: EntityType) -> NodeType: + mapping = { + EntityType.FILE: NodeType.FILE, + EntityType.FUNCTION: NodeType.FUNCTION, + EntityType.CLASS: NodeType.CLASS, + EntityType.METHOD: NodeType.METHOD, + } + return mapping.get(entity_type, NodeType.FILE) + + def _get_color_for_entity_type(self, entity_type: EntityType) -> str: + colors = { + EntityType.FILE: "#fb7e81", + EntityType.FUNCTION: "#97c2fc", + EntityType.CLASS: "#fa7a18", + EntityType.METHOD: "#2a9df4", + EntityType.IMPORT: "#93c47d", + } + return colors.get(entity_type, "#97c2fc") + + def get_graph(self) -> nx.Graph: + return self.graph + + def get_nodes(self) -> list[GraphNode]: + return list(self.nodes.values()) + + def get_edges(self) -> list[GraphEdge]: + return self.edges + + def get_node_by_id(self, node_id: str) -> Optional[GraphNode]: + return self.nodes.get(node_id) + + def get_nodes_by_type(self, node_type: NodeType) -> list[GraphNode]: + return [node for node in self.nodes.values() if node.node_type == node_type] + + def get_subgraph(self, node_ids: list[str]) -> nx.Graph: + subgraph = self.graph.subgraph(node_ids).copy() + return subgraph + + def serialize(self) -> dict: + return { + "nodes": [ + { + "id": node.node_id, + "type": node.node_type.value, + "name": node.name, + "file_path": str(node.file_path) if node.file_path else None, + "start_line": node.start_line, + "end_line": node.end_line, + "label": node.label, + "attributes": node.attributes, + } + for node in self.nodes.values() + ], + "edges": [ + { + "source": edge.source, + "target": edge.target, + "type": edge.edge_type, + "label": edge.label, + } + for edge in self.edges + ], + } + + def deserialize(self, data: dict) -> "GraphBuilder": + self.nodes = {} + self.edges = [] + + for node_data in data.get("nodes", []): + node = GraphNode( + node_id=node_data["id"], + node_type=NodeType(node_data["type"]), + name=node_data["name"], + file_path=Path(node_data["file_path"]) if node_data.get("file_path") else None, + start_line=node_data.get("start_line", 0), + end_line=node_data.get("end_line", 0), + label=node_data.get("label", ""), + attributes=node_data.get("attributes", {}), + ) + self.nodes[node.node_id] = node + self.graph.add_node(node.node_id, **node_data) + + for edge_data in data.get("edges", []): + edge = GraphEdge( + source=edge_data["source"], + target=edge_data["target"], + edge_type=edge_data.get("type", "depends"), + label=edge_data.get("label", ""), + ) + self.edges.append(edge) + self.graph.add_edge(edge.source, edge.target, **edge_data) + + return self