diff --git a/src/depnav/graph.py b/src/depnav/graph.py index f61b746..ed92dad 100644 --- a/src/depnav/graph.py +++ b/src/depnav/graph.py @@ -1,3 +1,5 @@ +"""Dependency graph implementation using NetworkX.""" + from pathlib import Path from typing import Optional @@ -11,156 +13,229 @@ class DependencyGraph: def __init__(self, project_root: Path): self.project_root = Path(project_root).resolve() - self.graph = nx.DiGraph() - self._language_cache: dict[Path, str] = {} + self._graph: nx.DiGraph = nx.DiGraph() + self._file_cache: dict[Path, list[str]] = {} def add_file(self, file_path: Path) -> None: - """Add a file node to the graph.""" - resolved_path = file_path.resolve() - rel_path = resolved_path.relative_to(self.project_root) - self.graph.add_node(resolved_path, name=str(rel_path), path=rel_path) + """Add a file and its dependencies to the graph.""" + abs_path = (self.project_root / file_path).resolve() + rel_path = abs_path.relative_to(self.project_root) - def add_dependency(self, from_file: Path, to_file: Path) -> None: - """Add a dependency edge to the graph.""" - from_resolved = from_file.resolve() - to_resolved = to_file.resolve() + if abs_path in self._graph: + return - if from_resolved not in self.graph: - self.add_file(from_file) - if to_resolved not in self.graph: - self.add_file(to_file) + self._graph.add_node(rel_path) - self.graph.add_edge(from_resolved, to_resolved) + deps = self._get_dependencies(abs_path) + for dep in deps: + dep_rel = self._resolve_dep_to_file(dep, abs_path) + if dep_rel is not None: + self._graph.add_edge(rel_path, dep_rel) + + def _get_dependencies(self, file_path: Path) -> list[str]: + """Get dependencies for a file, with caching.""" + if file_path in self._file_cache: + return self._file_cache[file_path] + + deps = parse_dependencies(file_path) + self._file_cache[file_path] = deps + return deps + + def _resolve_dep_to_file( + self, dep: str, from_file: Path + ) -> Optional[Path]: + """Resolve a dependency name to a file path relative to project root.""" + dep_parts = dep.split(".") + + search_paths = [ + from_file.parent, + self.project_root, + ] + + extensions = [".py", ".js", ".jsx", ".ts", ".tsx", ".go"] + + for search_path in search_paths: + for ext in extensions: + test_path = search_path / f"{dep}{ext}" + if test_path.exists(): + try: + return test_path.resolve().relative_to( + self.project_root + ) + except ValueError: + continue + + joined_parts = "/".join(dep_parts) + test_path = (search_path / joined_parts).with_suffix(ext) + if test_path.exists(): + try: + return test_path.resolve().relative_to( + self.project_root + ) + except ValueError: + continue + + test_path = search_path / "/".join(dep_parts) / f"__init__{ext}" + if test_path.exists(): + try: + return test_path.resolve().relative_to( + self.project_root + ) + except ValueError: + continue + + return None def build_from_directory( self, - directory: Path, - include_extensions: Optional[list[str]] = None, - exclude_patterns: Optional[list[str]] = None, + directory: Optional[Path] = None, + extensions: Optional[list[str]] = None, + max_depth: Optional[int] = None, ) -> None: - """Build the dependency graph from a directory.""" - include_extensions = include_extensions or [".py"] - exclude_patterns = exclude_patterns or [] + """Build the dependency graph by scanning a directory.""" + root = self.project_root if directory is None else directory - for path in directory.rglob("*"): - if path.is_file() and path.suffix in include_extensions: - should_exclude = False - for pattern in exclude_patterns: - if pattern in str(path): - should_exclude = True - break + if extensions is None: + extensions = [".py", ".js", ".jsx", ".ts", ".tsx", ".go"] - if not should_exclude: - self.add_file(path) + max_depth_val: int = max_depth if max_depth is not None else 999 - for node in self.graph.nodes(): - if node.is_file(): - deps = parse_dependencies(node, self.project_root) - for dep in deps: - if dep.suffix in include_extensions: - self.add_dependency(node, dep) + visited: set[Path] = set() - def get_dependencies(self, file_path: Path) -> list[Path]: - """Get all files that this file depends on.""" - node = file_path.resolve() - if node in self.graph: - return list(self.graph.successors(node)) - return [] + def scan_directory(path: Path, depth: int) -> None: + if depth > max_depth_val: + return - def get_dependents(self, file_path: Path) -> list[Path]: - """Get all files that depend on this file.""" - node = file_path.resolve() - if node in self.graph: - return list(self.graph.predecessors(node)) - return [] + try: + for item in path.iterdir(): + if item.is_file() and item.suffix in extensions: + if item.resolve() not in visited: + visited.add(item.resolve()) + self.add_file(item.relative_to(self.project_root)) + elif item.is_dir() and not item.name.startswith("."): + scan_directory(item, depth + 1) + except PermissionError: + pass - def detect_cycles(self) -> bool: - """Check if the graph contains cycles.""" - try: - list(nx.simple_cycles(self.graph)) - return True - except nx.NetworkXNoCycle: - return False - - def get_shortest_path(self, start: Path, end: Path) -> Optional[list[Path]]: - """Get the shortest path between two files.""" - try: - path = nx.shortest_path(self.graph, start.resolve(), end.resolve()) - return path - except nx.NetworkXNoPath: - return None - - def get_reachability(self, file_path: Path) -> set[Path]: - """Get all files reachable from this file.""" - node = file_path.resolve() - if node in self.graph: - return nx.descendants(self.graph, node) - return set() - - def get_connected_components(self) -> list[set[Path]]: - """Get connected components in the graph.""" - undirected = self.graph.to_undirected() - return list(nx.connected_components(undirected)) + scan_directory(root, 0) def get_nodes(self) -> list[Path]: - """Get all nodes in the graph.""" - return list(self.graph.nodes()) + """Get all nodes (files) in the graph.""" + return [Path(n) for n in self._graph.nodes()] def get_edges(self) -> list[tuple[Path, Path]]: - """Get all edges in the graph.""" - return list(self.graph.edges()) + """Get all edges (dependencies) in the graph.""" + return [(Path(u), Path(v)) for u, v in self._graph.edges()] - def get_node_by_name(self, name: Path) -> Path: - """Get a node by its name.""" - name = name.resolve() - for node in self.graph.nodes(): - if node == name: - return node - raise ValueError(f"Node not found: {name}") + def get_node_count(self) -> int: + """Get the number of nodes in the graph.""" + return self._graph.number_of_nodes() - def get_degree(self, file_path: Path) -> dict[str, int]: - """Get the degree (number of connections) for a file.""" - node = file_path.resolve() - if node in self.graph: - return { - "in_degree": self.graph.in_degree(node), - "out_degree": self.graph.out_degree(node), - "total_degree": self.graph.degree(node), - } - return {"in_degree": 0, "out_degree": 0, "total_degree": 0} + def get_edge_count(self) -> int: + """Get the number of edges in the graph.""" + return self._graph.number_of_edges() + + def get_dependents(self, file_path: Path) -> list[Path]: + """Get all files that depend on the given file.""" + try: + rel_path = file_path.relative_to(self.project_root) + except ValueError: + rel_path = file_path + + predecessors = list(self._graph.predecessors(rel_path)) + return [Path(p) for p in predecessors] + + def get_dependencies(self, file_path: Path) -> list[Path]: + """Get all files that the given file depends on.""" + try: + rel_path = file_path.relative_to(self.project_root) + except ValueError: + rel_path = file_path + + successors = list(self._graph.successors(rel_path)) + return [Path(p) for p in successors] + + def get_reachability(self, source: Path, target: Path) -> bool: + """Check if target is reachable from source.""" + try: + source_rel = source.relative_to(self.project_root) + target_rel = target.relative_to(self.project_root) + except ValueError: + source_rel = source + target_rel = target + + try: + return nx.has_path(self._graph, source_rel, target_rel) + except nx.NetworkXNoPath: + return False + + def get_shortest_path( + self, source: Path, target: Path + ) -> list[Path]: + """Get the shortest path from source to target.""" + try: + source_rel = source.relative_to(self.project_root) + target_rel = target.relative_to(self.project_root) + except ValueError: + source_rel = source + target_rel = target + + try: + path = nx.shortest_path(self._graph, source_rel, target_rel) + return [Path(p) for p in path] + except (nx.NetworkXNoPath, nx.NodeNotFound): + return [] + + def detect_cycles(self) -> list[list[Path]]: + """Detect all cycles in the dependency graph.""" + try: + cycles = list(nx.simple_cycles(self._graph)) + return [[Path(c) for c in cycle] for cycle in cycles] + except Exception: + return [] + + def to_undirected(self) -> nx.Graph: + """Return an undirected version of the graph.""" + return self._graph.to_undirected() + + def get_connected_components(self) -> list[list[Path]]: + """Get connected components of the undirected graph.""" + undirected = self.to_undirected() + components = list(nx.connected_components(undirected)) + return [[Path(n) for n in comp] for comp in components] + + def get_subgraph( + self, nodes: list[Path] + ) -> "DependencyGraph": + """Get a subgraph containing only the specified nodes.""" + new_graph = DependencyGraph(self.project_root) + subgraph = self._graph.subgraph([str(n) for n in nodes]) + new_graph._graph = subgraph.copy() # type: ignore[assignment] + return new_graph + + def get_degree(self, file_path: Path) -> tuple[int, int]: + """Get the in-degree and out-degree of a node.""" + try: + rel_path = file_path.relative_to(self.project_root) + except ValueError: + rel_path = file_path + + in_degree = self._graph.in_degree(rel_path) + out_degree = self._graph.out_degree(rel_path) + return in_degree, out_degree def get_topological_order(self) -> list[Path]: - """Get files in topological order.""" + """Get a topological ordering of nodes.""" try: - return list(nx.topological_sort(self.graph)) + order = list(nx.topological_sort(self._graph)) + return [Path(n) for n in order] except nx.NetworkXUnfeasible: return [] - def to_undirected(self) -> "DependencyGraph": - """Return an undirected version of the graph.""" - new_graph = DependencyGraph(self.project_root) - new_graph.graph = self.graph.to_undirected() - new_graph._language_cache = self._language_cache.copy() - return new_graph - - def get_statistics(self) -> dict[str, int]: - """Get statistics about the graph.""" - stats = { - "file_count": self.graph.number_of_nodes(), - "dependency_count": self.graph.number_of_edges(), - "cycle_count": len(list(nx.simple_cycles(self.graph))) if self.graph.number_of_nodes() > 0 else 0, - "max_depth": self._get_max_depth(), - "components": nx.number_connected_components(self.graph.to_undirected()), - } - return stats - - def _get_max_depth(self) -> int: - """Calculate the maximum dependency depth.""" - max_depth = 0 - for node in self.graph.nodes(): - try: - depth = len(nx.shortest_path(self.graph, node, list(nx.topological_sort(self.graph))[-1])) - max_depth = max(max_depth, depth) - except (nx.NetworkXNoPath, IndexError): - pass - return max_depth + def get_node_by_name(self, name: str) -> Optional[Path]: + """Find a node by its name (partial or full path match).""" + name_lower = name.lower() + for node in self._graph.nodes(): + if name_lower in str(node).lower(): + return Path(node) + return None