Add graph, analyzers, and exporters modules
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-02-02 02:40:22 +00:00
parent 7cffd10dbc
commit 62facd3d06

279
src/graph/builder.py Normal file
View File

@@ -0,0 +1,279 @@
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Optional
import networkx as nx
from src.parsers.base import Entity, EntityType, ParserResult
class GraphType(Enum):
DIRECTED = "directed"
UNDIRECTED = "undirected"
class NodeType(Enum):
FILE = "file"
FUNCTION = "function"
CLASS = "class"
METHOD = "method"
MODULE = "module"
@dataclass
class GraphNode:
node_id: str
node_type: NodeType
name: str
file_path: Optional[Path] = None
start_line: int = 0
end_line: int = 0
attributes: dict = field(default_factory=dict)
label: str = ""
style: str = "filled"
shape: str = "ellipse"
color: str = "#97c2fc"
def __post_init__(self):
if not self.label:
self.label = self.name
@dataclass
class GraphEdge:
source: str
target: str
edge_type: str = "depends"
label: str = ""
attributes: dict = field(default_factory=dict)
class GraphBuilder:
def __init__(self, graph_type: GraphType = GraphType.DIRECTED):
self.graph = nx.DiGraph() if graph_type == GraphType.DIRECTED else nx.Graph()
self.nodes: dict[str, GraphNode] = {}
self.edges: list[GraphEdge] = []
self.node_counter = 0
def add_node(self, node: GraphNode) -> str:
if node.node_id in self.nodes:
return node.node_id
self.node_counter += 1
if not node.node_id:
node.node_id = f"node_{self.node_counter}"
self.nodes[node.node_id] = node
self.graph.add_node(
node.node_id,
node_type=node.node_type.value,
name=node.name,
label=node.label,
style=node.style,
shape=node.shape,
color=node.color,
**node.attributes,
)
return node.node_id
def add_edge(self, edge: GraphEdge) -> None:
if edge.source not in self.nodes or edge.target not in self.nodes:
return
self.edges.append(edge)
self.graph.add_edge(
edge.source,
edge.target,
edge_type=edge.edge_type,
label=edge.label,
**edge.attributes,
)
def build_from_parser_results(self, results: list[ParserResult]) -> "GraphBuilder":
file_nodes: dict[Path, str] = {}
for result in results:
if result.errors:
continue
file_node_id = self._add_file_node(result.file_path)
file_nodes[result.file_path] = file_node_id
for entity in result.entities:
entity_node_id = self._add_entity_node(entity)
self._connect_entity_to_file(entity_node_id, file_node_id)
for call in entity.calls:
self._add_call_edge(entity_node_id, call, entity.file_path)
self._build_import_edges(results, file_nodes)
return self
def _add_file_node(self, file_path: Path) -> str:
node_id = f"file_{file_path}"
node = GraphNode(
node_id=node_id,
node_type=NodeType.FILE,
name=file_path.name,
file_path=file_path,
label=file_path.name,
shape="box",
color="#fb7e81",
)
return self.add_node(node)
def _add_entity_node(self, entity: Entity) -> str:
node_type = self._entity_type_to_node_type(entity.entity_type)
node_id = f"{entity.entity_type.value}_{entity.name}_{entity.file_path}"
node = GraphNode(
node_id=node_id,
node_type=node_type,
name=entity.name,
file_path=entity.file_path,
start_line=entity.start_line,
end_line=entity.end_line,
label=f"{entity.name}:{entity.start_line}",
color=self._get_color_for_entity_type(entity.entity_type),
)
if entity.entity_type == EntityType.CLASS:
node.shape = "diamond"
elif entity.entity_type == EntityType.FUNCTION:
node.shape = "ellipse"
node.attributes = entity.attributes.copy()
return self.add_node(node)
def _connect_entity_to_file(self, entity_node_id: str, file_node_id: str) -> None:
edge = GraphEdge(
source=entity_node_id,
target=file_node_id,
edge_type="contains",
label="defined in",
)
self.add_edge(edge)
def _add_call_edge(self, from_node_id: str, call_name: str, file_path: Path) -> None:
target_node_id = f"function_{call_name}_{file_path}"
edge = GraphEdge(
source=from_node_id,
target=target_node_id,
edge_type="calls",
label="calls",
)
self.add_edge(edge)
def _build_import_edges(self, results: list[ParserResult], file_nodes: dict[Path, str]) -> None:
for result in results:
if result.file_path not in file_nodes:
continue
source_node_id = file_nodes[result.file_path]
for import_path in result.imports:
target_file = self._resolve_import_to_file(import_path, result.file_path)
if target_file and target_file in file_nodes:
target_node_id = file_nodes[target_file]
edge = GraphEdge(
source=source_node_id,
target=target_node_id,
edge_type="imports",
label=import_path,
)
self.add_edge(edge)
def _resolve_import_to_file(self, import_path: str, source_file: Path) -> Optional[Path]:
pass
def _entity_type_to_node_type(self, entity_type: EntityType) -> NodeType:
mapping = {
EntityType.FILE: NodeType.FILE,
EntityType.FUNCTION: NodeType.FUNCTION,
EntityType.CLASS: NodeType.CLASS,
EntityType.METHOD: NodeType.METHOD,
}
return mapping.get(entity_type, NodeType.FILE)
def _get_color_for_entity_type(self, entity_type: EntityType) -> str:
colors = {
EntityType.FILE: "#fb7e81",
EntityType.FUNCTION: "#97c2fc",
EntityType.CLASS: "#fa7a18",
EntityType.METHOD: "#2a9df4",
EntityType.IMPORT: "#93c47d",
}
return colors.get(entity_type, "#97c2fc")
def get_graph(self) -> nx.Graph:
return self.graph
def get_nodes(self) -> list[GraphNode]:
return list(self.nodes.values())
def get_edges(self) -> list[GraphEdge]:
return self.edges
def get_node_by_id(self, node_id: str) -> Optional[GraphNode]:
return self.nodes.get(node_id)
def get_nodes_by_type(self, node_type: NodeType) -> list[GraphNode]:
return [node for node in self.nodes.values() if node.node_type == node_type]
def get_subgraph(self, node_ids: list[str]) -> nx.Graph:
subgraph = self.graph.subgraph(node_ids).copy()
return subgraph
def serialize(self) -> dict:
return {
"nodes": [
{
"id": node.node_id,
"type": node.node_type.value,
"name": node.name,
"file_path": str(node.file_path) if node.file_path else None,
"start_line": node.start_line,
"end_line": node.end_line,
"label": node.label,
"attributes": node.attributes,
}
for node in self.nodes.values()
],
"edges": [
{
"source": edge.source,
"target": edge.target,
"type": edge.edge_type,
"label": edge.label,
}
for edge in self.edges
],
}
def deserialize(self, data: dict) -> "GraphBuilder":
self.nodes = {}
self.edges = []
for node_data in data.get("nodes", []):
node = GraphNode(
node_id=node_data["id"],
node_type=NodeType(node_data["type"]),
name=node_data["name"],
file_path=Path(node_data["file_path"]) if node_data.get("file_path") else None,
start_line=node_data.get("start_line", 0),
end_line=node_data.get("end_line", 0),
label=node_data.get("label", ""),
attributes=node_data.get("attributes", {}),
)
self.nodes[node.node_id] = node
self.graph.add_node(node.node_id, **node_data)
for edge_data in data.get("edges", []):
edge = GraphEdge(
source=edge_data["source"],
target=edge_data["target"],
edge_type=edge_data.get("type", "depends"),
label=edge_data.get("label", ""),
)
self.edges.append(edge)
self.graph.add_edge(edge.source, edge.target, **edge_data)
return self