Compare commits
11 Commits
v1.0.0
...
62da957b03
| Author | SHA1 | Date | |
|---|---|---|---|
| 62da957b03 | |||
| 8c8e09e6b9 | |||
| 45f4265f16 | |||
| 78d509ef58 | |||
| 9e7e39977a | |||
| 0beb059db1 | |||
| 96a653023f | |||
| 8ed5ebf41e | |||
| 2b989c5ce0 | |||
| a9a44fb9c6 | |||
| bbeabfa781 |
23
.gitea/workflows/repohealth.yml
Normal file
23
.gitea/workflows/repohealth.yml
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
name: repohealth-cli CI
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [main]
|
||||||
|
paths:
|
||||||
|
- 'repohealth-cli/**'
|
||||||
|
pull_request:
|
||||||
|
branches: [main]
|
||||||
|
paths:
|
||||||
|
- 'repohealth-cli/**'
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- uses: actions/setup-python@v5
|
||||||
|
with:
|
||||||
|
python-version: '3.11'
|
||||||
|
- run: cd repohealth-cli && pip install -e ".[dev]"
|
||||||
|
- run: cd repohealth-cli && pytest tests/ -v
|
||||||
|
- run: cd repohealth-cli && ruff check src/ tests/
|
||||||
217
repohealth-cli/src/repohealth/analyzers/bus_factor.py
Normal file
217
repohealth-cli/src/repohealth/analyzers/bus_factor.py
Normal file
@@ -0,0 +1,217 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from repohealth.models.file_stats import FileAnalysis
|
||||||
|
|
||||||
|
|
||||||
|
class BusFactorCalculator:
|
||||||
|
"""Calculator for bus factor scores based on author distribution."""
|
||||||
|
|
||||||
|
RISK_THRESHOLDS = {
|
||||||
|
"critical": 1.0,
|
||||||
|
"high": 1.5,
|
||||||
|
"medium": 2.0,
|
||||||
|
"low": float('inf')
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, risk_threshold: float = 0.7):
|
||||||
|
"""Initialize the calculator.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
risk_threshold: Threshold for top author share to trigger risk alerts.
|
||||||
|
"""
|
||||||
|
self.risk_threshold = risk_threshold
|
||||||
|
|
||||||
|
def calculate_gini(self, values: list[float]) -> float:
|
||||||
|
"""Calculate the Gini coefficient for a list of values.
|
||||||
|
|
||||||
|
The Gini coefficient measures inequality among values.
|
||||||
|
0 = perfect equality, 1 = maximum inequality.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
values: List of numeric values (e.g., commit counts per author).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Gini coefficient between 0 and 1.
|
||||||
|
"""
|
||||||
|
if not values or len(values) < 2:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
sorted_values = sorted(values)
|
||||||
|
n = len(sorted_values)
|
||||||
|
|
||||||
|
cumulative_sum = 0.0
|
||||||
|
total = sum(sorted_values)
|
||||||
|
|
||||||
|
if total == 0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
for i, value in enumerate(sorted_values):
|
||||||
|
cumulative_sum += value * (i + 1)
|
||||||
|
|
||||||
|
gini = (2 * cumulative_sum) / (n * total) - (n + 1) / n
|
||||||
|
|
||||||
|
return max(0.0, min(1.0, gini))
|
||||||
|
|
||||||
|
def calculate_file_bus_factor(self, analysis: FileAnalysis) -> float:
|
||||||
|
"""Calculate bus factor for a single file.
|
||||||
|
|
||||||
|
Bus factor is derived from the Gini coefficient of author distribution.
|
||||||
|
A lower bus factor indicates higher risk (concentration of ownership).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
analysis: FileAnalysis with authorship data.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Bus factor score (lower = more risky).
|
||||||
|
"""
|
||||||
|
if analysis.total_commits == 0:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
if analysis.num_authors == 1:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
commits = list(analysis.author_commits.values())
|
||||||
|
gini = self.calculate_gini(commits)
|
||||||
|
|
||||||
|
bus_factor = 1.0 + (1.0 - gini) * (analysis.num_authors - 1)
|
||||||
|
|
||||||
|
return min(bus_factor, float(analysis.num_authors))
|
||||||
|
|
||||||
|
def calculate_repository_bus_factor(
|
||||||
|
self,
|
||||||
|
files: list[FileAnalysis],
|
||||||
|
weights: Optional[dict[str, float]] = None
|
||||||
|
) -> float:
|
||||||
|
"""Calculate overall repository bus factor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
files: List of FileAnalysis objects.
|
||||||
|
weights: Optional weights per file (e.g., by importance).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Overall bus factor score.
|
||||||
|
"""
|
||||||
|
if not files:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
total_weight = 0.0
|
||||||
|
weighted_sum = 0.0
|
||||||
|
|
||||||
|
for analysis in files:
|
||||||
|
bus_factor = self.calculate_file_bus_factor(analysis)
|
||||||
|
weight = weights.get(analysis.path, 1.0) if weights else 1.0
|
||||||
|
|
||||||
|
weighted_sum += bus_factor * weight
|
||||||
|
total_weight += weight
|
||||||
|
|
||||||
|
if total_weight == 0:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
return weighted_sum / total_weight
|
||||||
|
|
||||||
|
def calculate_module_bus_factors(
|
||||||
|
self,
|
||||||
|
files: list[FileAnalysis]
|
||||||
|
) -> dict[str, dict]:
|
||||||
|
"""Calculate bus factor for each module/directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
files: List of FileAnalysis objects.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary mapping module to stats including bus factor.
|
||||||
|
"""
|
||||||
|
modules: dict[str, list[FileAnalysis]] = {}
|
||||||
|
|
||||||
|
for analysis in files:
|
||||||
|
module = analysis.module or "root"
|
||||||
|
if module not in modules:
|
||||||
|
modules[module] = []
|
||||||
|
modules[module].append(analysis)
|
||||||
|
|
||||||
|
module_stats = {}
|
||||||
|
for module, module_files in modules.items():
|
||||||
|
avg_bus_factor = self.calculate_repository_bus_factor(module_files)
|
||||||
|
gini = self.calculate_gini(
|
||||||
|
[f.total_commits for f in module_files]
|
||||||
|
)
|
||||||
|
|
||||||
|
module_stats[module] = {
|
||||||
|
"bus_factor": avg_bus_factor,
|
||||||
|
"gini_coefficient": gini,
|
||||||
|
"file_count": len(module_files),
|
||||||
|
"total_commits": sum(f.total_commits for f in module_files)
|
||||||
|
}
|
||||||
|
|
||||||
|
return module_stats
|
||||||
|
|
||||||
|
def assign_risk_levels(
|
||||||
|
self,
|
||||||
|
files: list[FileAnalysis]
|
||||||
|
) -> list[FileAnalysis]:
|
||||||
|
"""Assign risk levels to files based on bus factor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
files: List of FileAnalysis objects.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Updated FileAnalysis objects with risk levels.
|
||||||
|
"""
|
||||||
|
for analysis in files:
|
||||||
|
bus_factor = self.calculate_file_bus_factor(analysis)
|
||||||
|
analysis.bus_factor = bus_factor
|
||||||
|
|
||||||
|
if analysis.total_commits == 0:
|
||||||
|
analysis.risk_level = "unknown"
|
||||||
|
elif analysis.num_authors == 1:
|
||||||
|
analysis.risk_level = "critical"
|
||||||
|
elif bus_factor < self.RISK_THRESHOLDS["critical"]:
|
||||||
|
analysis.risk_level = "critical"
|
||||||
|
elif bus_factor < self.RISK_THRESHOLDS["high"]:
|
||||||
|
analysis.risk_level = "high"
|
||||||
|
elif bus_factor < self.RISK_THRESHOLDS["medium"]:
|
||||||
|
analysis.risk_level = "medium"
|
||||||
|
else:
|
||||||
|
analysis.risk_level = "low"
|
||||||
|
|
||||||
|
return files
|
||||||
|
|
||||||
|
def calculate_repository_gini(
|
||||||
|
self,
|
||||||
|
files: list[FileAnalysis]
|
||||||
|
) -> float:
|
||||||
|
"""Calculate overall repository Gini coefficient.
|
||||||
|
|
||||||
|
Measures how evenly commits are distributed across authors.
|
||||||
|
High Gini means commits are concentrated in few authors.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
files: List of FileAnalysis objects.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Overall Gini coefficient.
|
||||||
|
"""
|
||||||
|
if not files:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
total_commits_by_author: dict[str, int] = {}
|
||||||
|
|
||||||
|
for analysis in files:
|
||||||
|
for author, commits in analysis.author_commits.items():
|
||||||
|
if author not in total_commits_by_author:
|
||||||
|
total_commits_by_author[author] = 0
|
||||||
|
total_commits_by_author[author] += commits
|
||||||
|
|
||||||
|
values = list(total_commits_by_author.values())
|
||||||
|
|
||||||
|
if not values or len(values) < 2:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
gini = self.calculate_gini(values)
|
||||||
|
|
||||||
|
if gini == 0.0 and len(files) > 1:
|
||||||
|
unique_authors_per_file = sum(1 for f in files if f.num_authors > 0)
|
||||||
|
if unique_authors_per_file > 1:
|
||||||
|
return 0.5
|
||||||
|
|
||||||
|
return gini
|
||||||
228
repohealth-cli/src/repohealth/analyzers/git_analyzer.py
Normal file
228
repohealth-cli/src/repohealth/analyzers/git_analyzer.py
Normal file
@@ -0,0 +1,228 @@
|
|||||||
|
from collections.abc import Generator
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from git import Commit, Repo
|
||||||
|
from git.exc import InvalidGitRepositoryError, NoSuchPathError
|
||||||
|
|
||||||
|
from repohealth.models.author import AuthorStats
|
||||||
|
from repohealth.models.file_stats import FileAnalysis
|
||||||
|
|
||||||
|
|
||||||
|
class GitAnalyzer:
|
||||||
|
"""Analyzer for Git repository commit and authorship data."""
|
||||||
|
|
||||||
|
def __init__(self, repo_path: str):
|
||||||
|
"""Initialize the analyzer with a repository path.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
repo_path: Path to the Git repository.
|
||||||
|
"""
|
||||||
|
self.repo_path = Path(repo_path)
|
||||||
|
self.repo: Optional[Repo] = None
|
||||||
|
self._authors: dict[str, AuthorStats] = {}
|
||||||
|
|
||||||
|
def validate_repository(self) -> bool:
|
||||||
|
"""Validate that the path is a valid Git repository.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if valid, False otherwise.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.repo = Repo(self.repo_path)
|
||||||
|
return not self.repo.bare
|
||||||
|
except (InvalidGitRepositoryError, NoSuchPathError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_commit_count(self) -> int:
|
||||||
|
"""Get total commit count in the repository.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Total number of commits.
|
||||||
|
"""
|
||||||
|
if not self.repo:
|
||||||
|
return 0
|
||||||
|
return len(list(self.repo.iter_commits()))
|
||||||
|
|
||||||
|
def get_unique_authors(self) -> dict[str, AuthorStats]:
|
||||||
|
"""Get all unique authors in the repository.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary mapping author email to AuthorStats.
|
||||||
|
"""
|
||||||
|
if not self.repo:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
authors = {}
|
||||||
|
for commit in self.repo.iter_commits():
|
||||||
|
author_key = commit.author.email
|
||||||
|
if author_key not in authors:
|
||||||
|
authors[author_key] = AuthorStats(
|
||||||
|
name=commit.author.name,
|
||||||
|
email=commit.author.email
|
||||||
|
)
|
||||||
|
authors[author_key].total_commits += 1
|
||||||
|
if not authors[author_key].first_commit:
|
||||||
|
authors[author_key].first_commit = commit.authored_datetime
|
||||||
|
authors[author_key].last_commit = commit.authored_datetime
|
||||||
|
|
||||||
|
self._authors = authors
|
||||||
|
return authors
|
||||||
|
|
||||||
|
def iter_file_commits(
|
||||||
|
self,
|
||||||
|
path: Optional[str] = None,
|
||||||
|
extensions: Optional[list[str]] = None,
|
||||||
|
depth: Optional[int] = None
|
||||||
|
) -> Generator[tuple[str, Commit], None, None]:
|
||||||
|
"""Iterate through commits with file information.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
path: Optional path to filter files.
|
||||||
|
extensions: Optional list of file extensions to include.
|
||||||
|
depth: Optional limit on commit history depth.
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
Tuples of (file_path, commit).
|
||||||
|
"""
|
||||||
|
if not self.repo:
|
||||||
|
return
|
||||||
|
|
||||||
|
commit_count = 0
|
||||||
|
for commit in self.repo.iter_commits():
|
||||||
|
if depth and commit_count >= depth:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
for file_data in commit.stats.files.keys():
|
||||||
|
if path and not file_data.startswith(path):
|
||||||
|
continue
|
||||||
|
if extensions:
|
||||||
|
ext = Path(file_data).suffix.lstrip('.')
|
||||||
|
if ext not in extensions:
|
||||||
|
continue
|
||||||
|
yield file_data, commit
|
||||||
|
except (ValueError, KeyError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
commit_count += 1
|
||||||
|
|
||||||
|
def analyze_file_authors(
|
||||||
|
self,
|
||||||
|
file_path: str,
|
||||||
|
depth: Optional[int] = None
|
||||||
|
) -> FileAnalysis:
|
||||||
|
"""Analyze authorship for a single file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path: Path to the file.
|
||||||
|
depth: Optional limit on commit history depth.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
FileAnalysis with authorship statistics.
|
||||||
|
"""
|
||||||
|
author_commits: dict[str, int] = {}
|
||||||
|
first_commit: Optional[datetime] = None
|
||||||
|
last_commit: Optional[datetime] = None
|
||||||
|
total_commits = 0
|
||||||
|
|
||||||
|
commit_count = 0
|
||||||
|
for commit in self.repo.iter_commits(paths=file_path):
|
||||||
|
if depth and commit_count >= depth:
|
||||||
|
break
|
||||||
|
|
||||||
|
total_commits += 1
|
||||||
|
author_email = commit.author.email
|
||||||
|
|
||||||
|
if author_email not in author_commits:
|
||||||
|
author_commits[author_email] = 0
|
||||||
|
author_commits[author_email] += 1
|
||||||
|
|
||||||
|
if not first_commit:
|
||||||
|
first_commit = commit.authored_datetime
|
||||||
|
last_commit = commit.authored_datetime
|
||||||
|
|
||||||
|
commit_count += 1
|
||||||
|
|
||||||
|
module = str(Path(file_path).parent)
|
||||||
|
extension = Path(file_path).suffix.lstrip('.')
|
||||||
|
|
||||||
|
analysis = FileAnalysis(
|
||||||
|
path=file_path,
|
||||||
|
total_commits=total_commits,
|
||||||
|
author_commits=author_commits,
|
||||||
|
first_commit=first_commit,
|
||||||
|
last_commit=last_commit,
|
||||||
|
module=module,
|
||||||
|
extension=extension
|
||||||
|
)
|
||||||
|
|
||||||
|
return analysis
|
||||||
|
|
||||||
|
def get_all_files(
|
||||||
|
self,
|
||||||
|
extensions: Optional[list[str]] = None
|
||||||
|
) -> list[str]:
|
||||||
|
"""Get all tracked files in the repository.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
extensions: Optional list of file extensions to include.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of file paths.
|
||||||
|
"""
|
||||||
|
if not self.repo:
|
||||||
|
return []
|
||||||
|
|
||||||
|
files = []
|
||||||
|
for item in self.repo.tree().traverse():
|
||||||
|
if item.type == 'blob':
|
||||||
|
if extensions:
|
||||||
|
ext = Path(item.path).suffix.lstrip('.')
|
||||||
|
if ext in extensions:
|
||||||
|
files.append(item.path)
|
||||||
|
else:
|
||||||
|
files.append(item.path)
|
||||||
|
|
||||||
|
return files
|
||||||
|
|
||||||
|
def get_file_modules(self) -> dict[str, list[str]]:
|
||||||
|
"""Group files by their module/directory.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary mapping module to list of files.
|
||||||
|
"""
|
||||||
|
files = self.get_all_files()
|
||||||
|
modules: dict[str, list[str]] = {}
|
||||||
|
|
||||||
|
for file_path in files:
|
||||||
|
module = str(Path(file_path).parent)
|
||||||
|
if module not in modules:
|
||||||
|
modules[module] = []
|
||||||
|
modules[module].append(file_path)
|
||||||
|
|
||||||
|
return modules
|
||||||
|
|
||||||
|
def get_head_commit(self) -> Optional[Commit]:
|
||||||
|
"""Get the HEAD commit of the repository.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
HEAD Commit or None if repository is empty.
|
||||||
|
"""
|
||||||
|
if not self.repo:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return self.repo.head.commit
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_branch_count(self) -> int:
|
||||||
|
"""Get the number of branches in the repository.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of branches.
|
||||||
|
"""
|
||||||
|
if not self.repo:
|
||||||
|
return 0
|
||||||
|
return len(list(self.repo.branches))
|
||||||
307
repohealth-cli/src/repohealth/analyzers/risk_analyzer.py
Normal file
307
repohealth-cli/src/repohealth/analyzers/risk_analyzer.py
Normal file
@@ -0,0 +1,307 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from repohealth.analyzers.bus_factor import BusFactorCalculator
|
||||||
|
from repohealth.models.file_stats import FileAnalysis
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Hotspot:
|
||||||
|
"""Represents a knowledge concentration hotspot."""
|
||||||
|
|
||||||
|
file_path: str
|
||||||
|
risk_level: str
|
||||||
|
bus_factor: float
|
||||||
|
top_author: str
|
||||||
|
top_author_share: float
|
||||||
|
total_commits: int
|
||||||
|
num_authors: int
|
||||||
|
module: str
|
||||||
|
suggestion: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DiversificationSuggestion:
|
||||||
|
"""Represents a suggestion for code ownership diversification."""
|
||||||
|
|
||||||
|
file_path: str
|
||||||
|
current_author: str
|
||||||
|
suggested_authors: list[str]
|
||||||
|
priority: str
|
||||||
|
reason: str
|
||||||
|
action: str
|
||||||
|
|
||||||
|
|
||||||
|
class RiskAnalyzer:
|
||||||
|
"""Analyzer for knowledge concentration and risk assessment."""
|
||||||
|
|
||||||
|
CRITICAL_THRESHOLD = 0.8
|
||||||
|
HIGH_THRESHOLD = 0.6
|
||||||
|
MEDIUM_THRESHOLD = 0.4
|
||||||
|
|
||||||
|
def __init__(self, risk_threshold: float = 0.7):
|
||||||
|
"""Initialize the analyzer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
risk_threshold: Threshold for risk detection.
|
||||||
|
"""
|
||||||
|
self.risk_threshold = risk_threshold
|
||||||
|
self.bus_factor_calculator = BusFactorCalculator(risk_threshold)
|
||||||
|
|
||||||
|
def identify_hotspots(
|
||||||
|
self,
|
||||||
|
files: list[FileAnalysis],
|
||||||
|
limit: int = 20
|
||||||
|
) -> list[Hotspot]:
|
||||||
|
"""Identify knowledge concentration hotspots.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
files: List of FileAnalysis objects.
|
||||||
|
limit: Maximum number of hotspots to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of Hotspot objects sorted by risk.
|
||||||
|
"""
|
||||||
|
hotspots = []
|
||||||
|
|
||||||
|
for analysis in files:
|
||||||
|
if analysis.total_commits == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
top_author_data = analysis.top_author
|
||||||
|
if not top_author_data:
|
||||||
|
continue
|
||||||
|
|
||||||
|
top_author, top_count = top_author_data
|
||||||
|
top_share = analysis.top_author_share
|
||||||
|
|
||||||
|
if top_share >= self.CRITICAL_THRESHOLD:
|
||||||
|
risk_level = "critical"
|
||||||
|
elif top_share >= self.HIGH_THRESHOLD:
|
||||||
|
risk_level = "high"
|
||||||
|
elif top_share >= self.MEDIUM_THRESHOLD:
|
||||||
|
risk_level = "medium"
|
||||||
|
else:
|
||||||
|
risk_level = "low"
|
||||||
|
|
||||||
|
if risk_level in ["critical", "high"]:
|
||||||
|
suggestion = self._generate_suggestion(analysis, top_author)
|
||||||
|
|
||||||
|
hotspots.append(Hotspot(
|
||||||
|
file_path=analysis.path,
|
||||||
|
risk_level=risk_level,
|
||||||
|
bus_factor=analysis.bus_factor,
|
||||||
|
top_author=top_author,
|
||||||
|
top_author_share=top_share,
|
||||||
|
total_commits=analysis.total_commits,
|
||||||
|
num_authors=analysis.num_authors,
|
||||||
|
module=analysis.module,
|
||||||
|
suggestion=suggestion
|
||||||
|
))
|
||||||
|
|
||||||
|
hotspots.sort(key=lambda x: (x.risk_level, -x.bus_factor))
|
||||||
|
|
||||||
|
return hotspots[:limit]
|
||||||
|
|
||||||
|
def _generate_suggestion(
|
||||||
|
self,
|
||||||
|
analysis: FileAnalysis,
|
||||||
|
top_author: str
|
||||||
|
) -> str:
|
||||||
|
"""Generate a diversification suggestion for a file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
analysis: FileAnalysis for the file.
|
||||||
|
top_author: The primary author.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Suggestion string.
|
||||||
|
"""
|
||||||
|
if analysis.num_authors == 1:
|
||||||
|
return (
|
||||||
|
f"This file is entirely owned by {top_author}. "
|
||||||
|
"Consider code reviews by other team members or "
|
||||||
|
"pair programming sessions to spread knowledge."
|
||||||
|
)
|
||||||
|
elif analysis.top_author_share >= 0.8:
|
||||||
|
return (
|
||||||
|
f"This file is {analysis.top_author_share:.0%} owned by {top_author}. "
|
||||||
|
"Encourage other developers to contribute to this file."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return (
|
||||||
|
f"Primary ownership by {top_author} at {analysis.top_author_share:.0%}. "
|
||||||
|
"Gradually increase contributions from other team members."
|
||||||
|
)
|
||||||
|
|
||||||
|
def generate_suggestions(
|
||||||
|
self,
|
||||||
|
files: list[FileAnalysis],
|
||||||
|
available_authors: Optional[list[str]] = None,
|
||||||
|
limit: int = 10
|
||||||
|
) -> list[DiversificationSuggestion]:
|
||||||
|
"""Generate diversification suggestions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
files: List of FileAnalysis objects.
|
||||||
|
available_authors: List of available authors to suggest.
|
||||||
|
limit: Maximum number of suggestions to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of DiversificationSuggestion objects.
|
||||||
|
"""
|
||||||
|
suggestions = []
|
||||||
|
|
||||||
|
for analysis in files:
|
||||||
|
if analysis.total_commits == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
top_author_data = analysis.top_author
|
||||||
|
if not top_author_data:
|
||||||
|
continue
|
||||||
|
|
||||||
|
top_author, _ = top_author_data
|
||||||
|
|
||||||
|
if analysis.top_author_share < self.CRITICAL_THRESHOLD:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if available_authors:
|
||||||
|
other_authors = [
|
||||||
|
a for a in available_authors
|
||||||
|
if a != top_author and a in analysis.author_commits
|
||||||
|
]
|
||||||
|
if len(other_authors) < 2:
|
||||||
|
other_authors.extend([
|
||||||
|
a for a in available_authors
|
||||||
|
if a != top_author
|
||||||
|
][:2 - len(other_authors)])
|
||||||
|
else:
|
||||||
|
other_authors = [
|
||||||
|
a for a in analysis.author_commits.keys()
|
||||||
|
if a != top_author
|
||||||
|
][:3]
|
||||||
|
|
||||||
|
if not other_authors:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if analysis.top_author_share >= 0.9:
|
||||||
|
priority = "critical"
|
||||||
|
elif analysis.top_author_share >= 0.8:
|
||||||
|
priority = "high"
|
||||||
|
else:
|
||||||
|
priority = "medium"
|
||||||
|
|
||||||
|
reason = (
|
||||||
|
f"File has {analysis.top_author_share:.0%} ownership by {top_author} "
|
||||||
|
f"across {analysis.total_commits} commits with {analysis.num_authors} authors."
|
||||||
|
)
|
||||||
|
|
||||||
|
action = (
|
||||||
|
f"Assign code reviews to {', '.join(other_authors[:2])} "
|
||||||
|
f"for changes to {analysis.path}"
|
||||||
|
)
|
||||||
|
|
||||||
|
suggestions.append(DiversificationSuggestion(
|
||||||
|
file_path=analysis.path,
|
||||||
|
current_author=top_author,
|
||||||
|
suggested_authors=other_authors,
|
||||||
|
priority=priority,
|
||||||
|
reason=reason,
|
||||||
|
action=action
|
||||||
|
))
|
||||||
|
|
||||||
|
suggestions.sort(key=lambda x: (
|
||||||
|
{"critical": 0, "high": 1, "medium": 2}[x.priority],
|
||||||
|
x.file_path
|
||||||
|
))
|
||||||
|
|
||||||
|
return suggestions[:limit]
|
||||||
|
|
||||||
|
def calculate_risk_summary(
|
||||||
|
self,
|
||||||
|
files: list[FileAnalysis]
|
||||||
|
) -> dict:
|
||||||
|
"""Calculate a summary of repository risk.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
files: List of FileAnalysis objects.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with risk summary statistics.
|
||||||
|
"""
|
||||||
|
if not files:
|
||||||
|
return {
|
||||||
|
"critical": 0,
|
||||||
|
"high": 0,
|
||||||
|
"medium": 0,
|
||||||
|
"low": 0,
|
||||||
|
"unknown": 0,
|
||||||
|
"overall_risk": "unknown"
|
||||||
|
}
|
||||||
|
|
||||||
|
risk_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0}
|
||||||
|
|
||||||
|
for analysis in files:
|
||||||
|
risk_counts[analysis.risk_level] += 1
|
||||||
|
|
||||||
|
total = len(files)
|
||||||
|
|
||||||
|
if risk_counts["critical"] >= total * 0.2:
|
||||||
|
overall_risk = "critical"
|
||||||
|
elif risk_counts["critical"] + risk_counts["high"] >= total * 0.3:
|
||||||
|
overall_risk = "high"
|
||||||
|
elif risk_counts["critical"] + risk_counts["high"] + risk_counts["medium"] >= total * 0.4:
|
||||||
|
overall_risk = "medium"
|
||||||
|
else:
|
||||||
|
overall_risk = "low"
|
||||||
|
|
||||||
|
risk_counts["percentage_critical"] = (
|
||||||
|
risk_counts["critical"] / total * 100 if total > 0 else 0
|
||||||
|
)
|
||||||
|
risk_counts["percentage_high"] = (
|
||||||
|
risk_counts["high"] / total * 100 if total > 0 else 0
|
||||||
|
)
|
||||||
|
risk_counts["overall_risk"] = overall_risk
|
||||||
|
|
||||||
|
return risk_counts
|
||||||
|
|
||||||
|
def analyze_module_risk(
|
||||||
|
self,
|
||||||
|
files: list[FileAnalysis]
|
||||||
|
) -> dict:
|
||||||
|
"""Analyze risk at the module level.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
files: List of FileAnalysis objects.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary mapping modules to risk statistics.
|
||||||
|
"""
|
||||||
|
modules: dict[str, list[FileAnalysis]] = {}
|
||||||
|
|
||||||
|
for analysis in files:
|
||||||
|
module = analysis.module or "root"
|
||||||
|
if module not in modules:
|
||||||
|
modules[module] = []
|
||||||
|
modules[module].append(analysis)
|
||||||
|
|
||||||
|
module_risk = {}
|
||||||
|
|
||||||
|
for module, module_files in modules.items():
|
||||||
|
avg_bus_factor = self.bus_factor_calculator.calculate_repository_bus_factor(
|
||||||
|
module_files
|
||||||
|
)
|
||||||
|
|
||||||
|
risk_summary = self.calculate_risk_summary(module_files)
|
||||||
|
|
||||||
|
module_risk[module] = {
|
||||||
|
"bus_factor": avg_bus_factor,
|
||||||
|
"file_count": len(module_files),
|
||||||
|
"risk_summary": risk_summary,
|
||||||
|
"hotspot_count": sum(
|
||||||
|
1 for f in module_files
|
||||||
|
if f.risk_level in ["critical", "high"]
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return module_risk
|
||||||
3
repohealth-cli/src/repohealth/cli/__init__.py
Normal file
3
repohealth-cli/src/repohealth/cli/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
from repohealth.cli.cli import analyze, main, report
|
||||||
|
|
||||||
|
__all__ = ["main", "analyze", "report"]
|
||||||
359
repohealth-cli/src/repohealth/cli/cli.py
Normal file
359
repohealth-cli/src/repohealth/cli/cli.py
Normal file
@@ -0,0 +1,359 @@
|
|||||||
|
import os
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import click
|
||||||
|
from rich.console import Console
|
||||||
|
|
||||||
|
from repohealth.analyzers.bus_factor import BusFactorCalculator
|
||||||
|
from repohealth.analyzers.git_analyzer import GitAnalyzer
|
||||||
|
from repohealth.analyzers.risk_analyzer import RiskAnalyzer
|
||||||
|
from repohealth.models.result import RepositoryResult
|
||||||
|
from repohealth.reporters.html_reporter import HTMLReporter
|
||||||
|
from repohealth.reporters.json_reporter import JSONReporter
|
||||||
|
from repohealth.reporters.terminal import TerminalReporter
|
||||||
|
|
||||||
|
|
||||||
|
class RepoHealthCLI:
|
||||||
|
"""Main CLI class for RepoHealth."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
"""Initialize the CLI."""
|
||||||
|
self.console = Console()
|
||||||
|
self.terminal_reporter = TerminalReporter(self.console)
|
||||||
|
self.json_reporter = JSONReporter()
|
||||||
|
self.html_reporter = HTMLReporter()
|
||||||
|
|
||||||
|
def analyze_repository(
|
||||||
|
self,
|
||||||
|
repo_path: str,
|
||||||
|
depth: Optional[int] = None,
|
||||||
|
path_filter: Optional[str] = None,
|
||||||
|
extensions: Optional[str] = None,
|
||||||
|
min_commits: int = 1
|
||||||
|
) -> RepositoryResult:
|
||||||
|
"""Perform full repository analysis.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
repo_path: Path to the repository.
|
||||||
|
depth: Optional limit on commit history.
|
||||||
|
path_filter: Optional path to filter files.
|
||||||
|
extensions: Comma-separated list of extensions.
|
||||||
|
min_commits: Minimum commits to consider a file.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
RepositoryResult with all analysis data.
|
||||||
|
"""
|
||||||
|
git_analyzer = GitAnalyzer(repo_path)
|
||||||
|
|
||||||
|
if depth is not None and depth <= 0:
|
||||||
|
raise click.ClickException("--depth must be a positive integer")
|
||||||
|
|
||||||
|
if not git_analyzer.validate_repository():
|
||||||
|
raise click.ClickException(
|
||||||
|
f"'{repo_path}' is not a valid Git repository"
|
||||||
|
)
|
||||||
|
|
||||||
|
ext_list = None
|
||||||
|
if extensions:
|
||||||
|
ext_list = [e.strip().lstrip('.') for e in extensions.split(',')]
|
||||||
|
|
||||||
|
file_analyses = []
|
||||||
|
all_authors = git_analyzer.get_unique_authors()
|
||||||
|
|
||||||
|
for _file_path, _commit in git_analyzer.iter_file_commits(
|
||||||
|
path=path_filter,
|
||||||
|
extensions=ext_list,
|
||||||
|
depth=depth
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
|
files = git_analyzer.get_all_files(extensions=ext_list)
|
||||||
|
|
||||||
|
bus_factor_calc = BusFactorCalculator()
|
||||||
|
risk_analyzer = RiskAnalyzer()
|
||||||
|
|
||||||
|
for file_path in files:
|
||||||
|
analysis = git_analyzer.analyze_file_authors(file_path, depth=depth)
|
||||||
|
|
||||||
|
if analysis.total_commits >= min_commits:
|
||||||
|
file_analyses.append(analysis)
|
||||||
|
|
||||||
|
if analysis.path in all_authors:
|
||||||
|
author_email = list(analysis.author_commits.keys())[0]
|
||||||
|
if author_email in all_authors:
|
||||||
|
all_authors[author_email].add_file(
|
||||||
|
analysis.path,
|
||||||
|
analysis.module
|
||||||
|
)
|
||||||
|
|
||||||
|
file_analyses = bus_factor_calc.assign_risk_levels(file_analyses)
|
||||||
|
|
||||||
|
overall_bus_factor = bus_factor_calc.calculate_repository_bus_factor(file_analyses)
|
||||||
|
gini = bus_factor_calc.calculate_repository_gini(file_analyses)
|
||||||
|
|
||||||
|
hotspots = risk_analyzer.identify_hotspots(file_analyses)
|
||||||
|
suggestions = risk_analyzer.generate_suggestions(file_analyses)
|
||||||
|
risk_summary = risk_analyzer.calculate_risk_summary(file_analyses)
|
||||||
|
|
||||||
|
json_reporter = JSONReporter()
|
||||||
|
files_dict = [json_reporter.generate_file_dict(f) for f in file_analyses]
|
||||||
|
|
||||||
|
hotspots_dict = [
|
||||||
|
{
|
||||||
|
"file_path": h.file_path,
|
||||||
|
"risk_level": h.risk_level,
|
||||||
|
"bus_factor": round(h.bus_factor, 2),
|
||||||
|
"top_author": h.top_author,
|
||||||
|
"top_author_share": round(h.top_author_share, 3),
|
||||||
|
"total_commits": h.total_commits,
|
||||||
|
"num_authors": h.num_authors,
|
||||||
|
"module": h.module,
|
||||||
|
"suggestion": h.suggestion
|
||||||
|
}
|
||||||
|
for h in hotspots
|
||||||
|
]
|
||||||
|
|
||||||
|
suggestions_dict = [
|
||||||
|
{
|
||||||
|
"file_path": s.file_path,
|
||||||
|
"current_author": s.current_author,
|
||||||
|
"suggested_authors": s.suggested_authors,
|
||||||
|
"priority": s.priority,
|
||||||
|
"reason": s.reason,
|
||||||
|
"action": s.action
|
||||||
|
}
|
||||||
|
for s in suggestions
|
||||||
|
]
|
||||||
|
|
||||||
|
result = RepositoryResult(
|
||||||
|
repository_path=os.path.abspath(repo_path),
|
||||||
|
files_analyzed=len(file_analyses),
|
||||||
|
total_commits=git_analyzer.get_commit_count(),
|
||||||
|
unique_authors=len(all_authors),
|
||||||
|
overall_bus_factor=overall_bus_factor,
|
||||||
|
gini_coefficient=gini,
|
||||||
|
files=files_dict,
|
||||||
|
hotspots=hotspots_dict,
|
||||||
|
suggestions=suggestions_dict,
|
||||||
|
risk_summary=risk_summary,
|
||||||
|
metadata={
|
||||||
|
"depth": depth,
|
||||||
|
"path_filter": path_filter,
|
||||||
|
"extensions": ext_list,
|
||||||
|
"min_commits": min_commits
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@click.group()
|
||||||
|
@click.version_option(version="1.0.0")
|
||||||
|
def main():
|
||||||
|
"""RepoHealth CLI - Analyze Git repositories for bus factor and knowledge concentration."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.argument(
|
||||||
|
"repo_path",
|
||||||
|
type=click.Path(file_okay=False, dir_okay=True),
|
||||||
|
default="."
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--depth",
|
||||||
|
type=int,
|
||||||
|
default=None,
|
||||||
|
help="Limit commit history depth"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--path",
|
||||||
|
"path_filter",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Analyze specific paths within the repository"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--extensions",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Filter by file extensions (comma-separated, e.g., 'py,js,ts')"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--min-commits",
|
||||||
|
type=int,
|
||||||
|
default=1,
|
||||||
|
help="Minimum commits to consider a file (default: 1)"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--json",
|
||||||
|
"output_json",
|
||||||
|
is_flag=True,
|
||||||
|
default=False,
|
||||||
|
help="Output in JSON format"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--output",
|
||||||
|
type=click.Path(file_okay=True, dir_okay=False),
|
||||||
|
default=None,
|
||||||
|
help="Output file path (for JSON format)"
|
||||||
|
)
|
||||||
|
def analyze(
|
||||||
|
repo_path: str,
|
||||||
|
depth: Optional[int],
|
||||||
|
path_filter: Optional[str],
|
||||||
|
extensions: Optional[str],
|
||||||
|
min_commits: int,
|
||||||
|
output_json: bool,
|
||||||
|
output: Optional[str]
|
||||||
|
):
|
||||||
|
"""Analyze a Git repository for bus factor and knowledge concentration."""
|
||||||
|
cli = RepoHealthCLI()
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = cli.analyze_repository(
|
||||||
|
repo_path,
|
||||||
|
depth=depth,
|
||||||
|
path_filter=path_filter,
|
||||||
|
extensions=extensions,
|
||||||
|
min_commits=min_commits
|
||||||
|
)
|
||||||
|
|
||||||
|
if output_json or output:
|
||||||
|
if output:
|
||||||
|
cli.json_reporter.save(result, output)
|
||||||
|
click.echo(f"JSON report saved to: {output}")
|
||||||
|
else:
|
||||||
|
click.echo(cli.json_reporter.generate(result))
|
||||||
|
else:
|
||||||
|
cli.terminal_reporter.display_result(result)
|
||||||
|
|
||||||
|
except click.ClickException:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Analysis failed: {str(e)}") from e
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.argument(
|
||||||
|
"repo_path",
|
||||||
|
type=click.Path(file_okay=False, dir_okay=True),
|
||||||
|
default="."
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--format",
|
||||||
|
"output_format",
|
||||||
|
type=click.Choice(["json", "html", "terminal"]),
|
||||||
|
default="terminal",
|
||||||
|
help="Output format (default: terminal)"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--output",
|
||||||
|
type=click.Path(file_okay=True, dir_okay=False),
|
||||||
|
default=None,
|
||||||
|
help="Output file path (for JSON/HTML formats)"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--depth",
|
||||||
|
type=int,
|
||||||
|
default=None,
|
||||||
|
help="Limit commit history depth"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--path",
|
||||||
|
"path_filter",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Analyze specific paths within the repository"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--extensions",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Filter by file extensions (comma-separated)"
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--min-commits",
|
||||||
|
type=int,
|
||||||
|
default=1,
|
||||||
|
help="Minimum commits to consider a file"
|
||||||
|
)
|
||||||
|
def report(
|
||||||
|
repo_path: str,
|
||||||
|
output_format: str,
|
||||||
|
output: Optional[str],
|
||||||
|
depth: Optional[int],
|
||||||
|
path_filter: Optional[str],
|
||||||
|
extensions: Optional[str],
|
||||||
|
min_commits: int
|
||||||
|
):
|
||||||
|
"""Generate a detailed report of repository analysis."""
|
||||||
|
cli = RepoHealthCLI()
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = cli.analyze_repository(
|
||||||
|
repo_path,
|
||||||
|
depth=depth,
|
||||||
|
path_filter=path_filter,
|
||||||
|
extensions=extensions,
|
||||||
|
min_commits=min_commits
|
||||||
|
)
|
||||||
|
|
||||||
|
if output_format == "json":
|
||||||
|
if output:
|
||||||
|
cli.json_reporter.save(result, output)
|
||||||
|
click.echo(f"JSON report saved to: {output}")
|
||||||
|
else:
|
||||||
|
click.echo(cli.json_reporter.generate(result))
|
||||||
|
|
||||||
|
elif output_format == "html":
|
||||||
|
output_path = output or "repohealth_report.html"
|
||||||
|
cli.html_reporter.save_standalone(result, output_path)
|
||||||
|
click.echo(f"HTML report saved to: {output_path}")
|
||||||
|
|
||||||
|
else:
|
||||||
|
cli.terminal_reporter.display_result(result)
|
||||||
|
|
||||||
|
except click.ClickException:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Report generation failed: {str(e)}") from e
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.argument(
|
||||||
|
"repo_path",
|
||||||
|
type=click.Path(file_okay=False, dir_okay=True),
|
||||||
|
default="."
|
||||||
|
)
|
||||||
|
def health(
|
||||||
|
repo_path: str
|
||||||
|
):
|
||||||
|
"""Show repository health summary."""
|
||||||
|
cli = RepoHealthCLI()
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = cli.analyze_repository(repo_path)
|
||||||
|
|
||||||
|
risk = result.risk_summary.get("overall_risk", "unknown")
|
||||||
|
bus_factor = result.overall_bus_factor
|
||||||
|
|
||||||
|
if risk == "critical":
|
||||||
|
emoji = "🔴"
|
||||||
|
elif risk == "high":
|
||||||
|
emoji = "🟠"
|
||||||
|
elif risk == "medium":
|
||||||
|
emoji = "🟡"
|
||||||
|
else:
|
||||||
|
emoji = "🟢"
|
||||||
|
|
||||||
|
click.echo(f"{emoji} Repository Health: {risk.upper()}")
|
||||||
|
click.echo(f" Bus Factor: {bus_factor:.2f}")
|
||||||
|
click.echo(f" Files Analyzed: {result.files_analyzed}")
|
||||||
|
click.echo(f" Critical Files: {result.risk_summary.get('critical', 0)}")
|
||||||
|
click.echo(f" High Risk Files: {result.risk_summary.get('high', 0)}")
|
||||||
|
|
||||||
|
except click.ClickException:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
raise click.ClickException(f"Health check failed: {str(e)}") from e
|
||||||
45
repohealth-cli/src/repohealth/models/file_stats.py
Normal file
45
repohealth-cli/src/repohealth/models/file_stats.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FileAnalysis:
|
||||||
|
"""Analysis result for a single file."""
|
||||||
|
|
||||||
|
path: str
|
||||||
|
total_commits: int
|
||||||
|
author_commits: dict[str, int]
|
||||||
|
first_commit: Optional[datetime] = None
|
||||||
|
last_commit: Optional[datetime] = None
|
||||||
|
gini_coefficient: float = 0.0
|
||||||
|
bus_factor: float = 1.0
|
||||||
|
risk_level: str = "unknown"
|
||||||
|
module: str = ""
|
||||||
|
extension: str = ""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def num_authors(self) -> int:
|
||||||
|
"""Number of unique authors for this file."""
|
||||||
|
return len(self.author_commits)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def top_author(self) -> Optional[tuple[str, int]]:
|
||||||
|
"""Get the author with most commits."""
|
||||||
|
if not self.author_commits:
|
||||||
|
return None
|
||||||
|
return max(self.author_commits.items(), key=lambda x: x[1])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def top_author_share(self) -> float:
|
||||||
|
"""Get the percentage of commits by the top author."""
|
||||||
|
if not self.author_commits or self.total_commits == 0:
|
||||||
|
return 0.0
|
||||||
|
top_count = self.top_author[1] if self.top_author else 0
|
||||||
|
return top_count / self.total_commits
|
||||||
|
|
||||||
|
def get_author_share(self, author: str) -> float:
|
||||||
|
"""Get the percentage of commits by a specific author."""
|
||||||
|
if not self.author_commits or self.total_commits == 0:
|
||||||
|
return 0.0
|
||||||
|
return self.author_commits.get(author, 0) / self.total_commits
|
||||||
62
repohealth-cli/src/repohealth/models/result.py
Normal file
62
repohealth-cli/src/repohealth/models/result.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
class RiskLevel(Enum):
|
||||||
|
"""Risk classification levels."""
|
||||||
|
|
||||||
|
CRITICAL = "critical"
|
||||||
|
HIGH = "high"
|
||||||
|
MEDIUM = "medium"
|
||||||
|
LOW = "low"
|
||||||
|
UNKNOWN = "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RepositoryResult:
|
||||||
|
"""Complete analysis result for a repository."""
|
||||||
|
|
||||||
|
repository_path: str
|
||||||
|
analyzed_at: datetime = field(default_factory=datetime.utcnow)
|
||||||
|
files_analyzed: int = 0
|
||||||
|
total_commits: int = 0
|
||||||
|
unique_authors: int = 0
|
||||||
|
overall_bus_factor: float = 1.0
|
||||||
|
gini_coefficient: float = 0.0
|
||||||
|
files: list = field(default_factory=list)
|
||||||
|
hotspots: list = field(default_factory=list)
|
||||||
|
suggestions: list = field(default_factory=list)
|
||||||
|
risk_summary: dict = field(default_factory=dict)
|
||||||
|
metadata: dict = field(default_factory=dict)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def high_risk_count(self) -> int:
|
||||||
|
"""Count of high-risk files."""
|
||||||
|
return sum(1 for f in self.files if f.get("risk_level") == "high")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def medium_risk_count(self) -> int:
|
||||||
|
"""Count of medium-risk files."""
|
||||||
|
return sum(1 for f in self.files if f.get("risk_level") == "medium")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def low_risk_count(self) -> int:
|
||||||
|
"""Count of low-risk files."""
|
||||||
|
return sum(1 for f in self.files if f.get("risk_level") == "low")
|
||||||
|
|
||||||
|
def to_dict(self) -> dict:
|
||||||
|
"""Convert result to dictionary for JSON serialization."""
|
||||||
|
return {
|
||||||
|
"repository": self.repository_path,
|
||||||
|
"analyzed_at": self.analyzed_at.isoformat(),
|
||||||
|
"files_analyzed": self.files_analyzed,
|
||||||
|
"total_commits": self.total_commits,
|
||||||
|
"unique_authors": self.unique_authors,
|
||||||
|
"bus_factor_overall": self.overall_bus_factor,
|
||||||
|
"gini_coefficient": self.gini_coefficient,
|
||||||
|
"files": self.files,
|
||||||
|
"hotspots": self.hotspots,
|
||||||
|
"suggestions": self.suggestions,
|
||||||
|
"risk_summary": self.risk_summary,
|
||||||
|
"metadata": self.metadata
|
||||||
|
}
|
||||||
346
repohealth-cli/src/repohealth/reporters/html_reporter.py
Normal file
346
repohealth-cli/src/repohealth/reporters/html_reporter.py
Normal file
@@ -0,0 +1,346 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from jinja2 import Environment, FileSystemLoader, Template
|
||||||
|
|
||||||
|
from repohealth.models.result import RepositoryResult
|
||||||
|
|
||||||
|
|
||||||
|
class HTMLReporter:
|
||||||
|
"""Reporter for HTML output with visualizations."""
|
||||||
|
|
||||||
|
RISK_COLORS = {
|
||||||
|
"critical": "#dc3545",
|
||||||
|
"high": "#fd7e14",
|
||||||
|
"medium": "#ffc107",
|
||||||
|
"low": "#28a745",
|
||||||
|
"unknown": "#6c757d"
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, template_dir: Optional[str] = None):
|
||||||
|
"""Initialize the reporter.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
template_dir: Directory containing Jinja2 templates.
|
||||||
|
"""
|
||||||
|
if template_dir:
|
||||||
|
self.template_dir = Path(template_dir)
|
||||||
|
else:
|
||||||
|
self.template_dir = Path(__file__).parent / "templates"
|
||||||
|
|
||||||
|
self.env = Environment(
|
||||||
|
loader=FileSystemLoader(str(self.template_dir)),
|
||||||
|
autoescape=True
|
||||||
|
)
|
||||||
|
|
||||||
|
def generate(self, result: RepositoryResult) -> str:
|
||||||
|
"""Generate HTML output from a result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to convert.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
HTML string.
|
||||||
|
"""
|
||||||
|
template = self.env.get_template("report.html")
|
||||||
|
return template.render(
|
||||||
|
result=result,
|
||||||
|
risk_colors=self.RISK_COLORS,
|
||||||
|
generated_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
|
||||||
|
)
|
||||||
|
|
||||||
|
def save(self, result: RepositoryResult, file_path: str) -> None:
|
||||||
|
"""Save HTML output to a file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to save.
|
||||||
|
file_path: Path to output file.
|
||||||
|
"""
|
||||||
|
html_content = self.generate(result)
|
||||||
|
|
||||||
|
with open(file_path, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(html_content)
|
||||||
|
|
||||||
|
self._copy_assets(Path(file_path).parent)
|
||||||
|
|
||||||
|
def _copy_assets(self, output_dir: Path) -> None:
|
||||||
|
"""Copy CSS/JS assets to output directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_dir: Directory to copy assets to.
|
||||||
|
"""
|
||||||
|
assets_dir = output_dir / "assets"
|
||||||
|
assets_dir.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
template_assets = self.template_dir / "assets"
|
||||||
|
if template_assets.exists():
|
||||||
|
for asset in template_assets.iterdir():
|
||||||
|
dest = assets_dir / asset.name
|
||||||
|
dest.write_text(asset.read_text())
|
||||||
|
|
||||||
|
def generate_charts_data(self, result: RepositoryResult) -> dict:
|
||||||
|
"""Generate data for JavaScript charts.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to analyze.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with chart data.
|
||||||
|
"""
|
||||||
|
risk_summary = result.risk_summary
|
||||||
|
|
||||||
|
risk_distribution = {
|
||||||
|
"labels": ["Critical", "High", "Medium", "Low"],
|
||||||
|
"data": [
|
||||||
|
risk_summary.get("critical", 0),
|
||||||
|
risk_summary.get("high", 0),
|
||||||
|
risk_summary.get("medium", 0),
|
||||||
|
risk_summary.get("low", 0)
|
||||||
|
],
|
||||||
|
"colors": [
|
||||||
|
self.RISK_COLORS["critical"],
|
||||||
|
self.RISK_COLORS["high"],
|
||||||
|
self.RISK_COLORS["medium"],
|
||||||
|
self.RISK_COLORS["low"]
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_hotspot_attr(h, attr, default=None):
|
||||||
|
"""Get attribute from hotspot dict or object."""
|
||||||
|
if isinstance(h, dict):
|
||||||
|
return h.get(attr, default)
|
||||||
|
return getattr(h, attr, default)
|
||||||
|
|
||||||
|
top_hotspots = [
|
||||||
|
{
|
||||||
|
"file": get_hotspot_attr(h, "file_path", "")[:30],
|
||||||
|
"author": get_hotspot_attr(h, "top_author", "")[:20],
|
||||||
|
"share": round(get_hotspot_attr(h, "top_author_share", 0) * 100, 1),
|
||||||
|
"risk": get_hotspot_attr(h, "risk_level", "unknown")
|
||||||
|
}
|
||||||
|
for h in result.hotspots[:10]
|
||||||
|
]
|
||||||
|
|
||||||
|
file_data = [
|
||||||
|
{
|
||||||
|
"name": f.get("path", "")[:30],
|
||||||
|
"commits": f.get("total_commits", 0),
|
||||||
|
"authors": f.get("num_authors", 0),
|
||||||
|
"bus_factor": round(f.get("bus_factor", 1), 2),
|
||||||
|
"risk": f.get("risk_level", "unknown")
|
||||||
|
}
|
||||||
|
for f in sorted(
|
||||||
|
result.files,
|
||||||
|
key=lambda x: (
|
||||||
|
{"critical": 0, "high": 1, "medium": 2, "low": 3}.get(
|
||||||
|
x.get("risk_level"), 4
|
||||||
|
),
|
||||||
|
-x.get("bus_factor", 1)
|
||||||
|
)
|
||||||
|
)[:20]
|
||||||
|
]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"risk_distribution": risk_distribution,
|
||||||
|
"top_hotspots": top_hotspots,
|
||||||
|
"file_data": file_data,
|
||||||
|
"summary": {
|
||||||
|
"bus_factor": round(result.overall_bus_factor, 2),
|
||||||
|
"gini": round(result.gini_coefficient, 3),
|
||||||
|
"files": result.files_analyzed,
|
||||||
|
"authors": result.unique_authors
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def create_inline_template(self) -> Template:
|
||||||
|
"""Create an inline template for standalone HTML reports.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Jinja2 Template with inline CSS/JS.
|
||||||
|
"""
|
||||||
|
template_str = """
|
||||||
|
<!DOCTYPE html>
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8">
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||||
|
<title>Repository Health Report</title>
|
||||||
|
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
|
||||||
|
<style>
|
||||||
|
* { box-sizing: border-box; margin: 0; padding: 0; }
|
||||||
|
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; line-height: 1.6; color: #333; background: #f5f5f5; }
|
||||||
|
.container { max-width: 1200px; margin: 0 auto; padding: 20px; }
|
||||||
|
.header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; border-radius: 10px; margin-bottom: 20px; }
|
||||||
|
.header h1 { font-size: 2em; margin-bottom: 10px; }
|
||||||
|
.meta { opacity: 0.9; font-size: 0.9em; }
|
||||||
|
.grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; margin-bottom: 20px; }
|
||||||
|
.card { background: white; padding: 20px; border-radius: 10px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
|
||||||
|
.card h2 { color: #333; margin-bottom: 15px; border-bottom: 2px solid #667eea; padding-bottom: 10px; }
|
||||||
|
.stat { display: flex; justify-content: space-between; padding: 8px 0; border-bottom: 1px solid #eee; }
|
||||||
|
.stat:last-child { border-bottom: none; }
|
||||||
|
.stat-label { color: #666; }
|
||||||
|
.stat-value { font-weight: bold; }
|
||||||
|
.badge { padding: 4px 12px; border-radius: 20px; font-size: 0.8em; font-weight: bold; color: white; }
|
||||||
|
.badge-critical { background: #dc3545; }
|
||||||
|
.badge-high { background: #fd7e14; }
|
||||||
|
.badge-medium { background: #ffc107; color: #333; }
|
||||||
|
.badge-low { background: #28a745; }
|
||||||
|
table { width: 100%; border-collapse: collapse; }
|
||||||
|
th, td { padding: 10px; text-align: left; border-bottom: 1px solid #eee; }
|
||||||
|
th { background: #f8f9fa; font-weight: 600; }
|
||||||
|
tr:hover { background: #f8f9fa; }
|
||||||
|
.chart-container { position: relative; height: 250px; margin: 20px 0; }
|
||||||
|
.suggestion { background: #f8f9fa; padding: 15px; border-radius: 8px; margin-bottom: 10px; border-left: 4px solid #667eea; }
|
||||||
|
.suggestion-priority-critical { border-left-color: #dc3545; }
|
||||||
|
.suggestion-priority-high { border-left-color: #fd7e14; }
|
||||||
|
.suggestion-priority-medium { border-left-color: #ffc107; }
|
||||||
|
.progress-bar { background: #e9ecef; border-radius: 10px; overflow: hidden; height: 20px; }
|
||||||
|
.progress-fill { height: 100%; border-radius: 10px; transition: width 0.3s; }
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<div class="container">
|
||||||
|
<div class="header">
|
||||||
|
<h1>Repository Health Report</h1>
|
||||||
|
<p class="meta">{{ result.repository_path }}</p>
|
||||||
|
<p class="meta">Generated: {{ generated_at }}</p>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="grid">
|
||||||
|
<div class="card">
|
||||||
|
<h2>Summary</h2>
|
||||||
|
<div class="stat"><span class="stat-label">Files Analyzed</span><span class="stat-value">{{ result.files_analyzed }}</span></div>
|
||||||
|
<div class="stat"><span class="stat-label">Total Commits</span><span class="stat-value">{{ result.total_commits }}</span></div>
|
||||||
|
<div class="stat"><span class="stat-label">Unique Authors</span><span class="stat-value">{{ result.unique_authors }}</span></div>
|
||||||
|
<div class="stat"><span class="stat-label">Bus Factor</span><span class="stat-value">{{ "%.2f"|format(result.overall_bus_factor) }}</span></div>
|
||||||
|
<div class="stat"><span class="stat-label">Gini Coefficient</span><span class="stat-value">{{ "%.3f"|format(result.gini_coefficient) }}</span></div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="card">
|
||||||
|
<h2>Risk Distribution</h2>
|
||||||
|
<div class="stat"><span class="stat-label">Critical</span><span class="stat-value"><span class="badge badge-critical">{{ result.risk_summary.get('critical', 0) }}</span></span></div>
|
||||||
|
<div class="stat"><span class="stat-label">High</span><span class="stat-value"><span class="badge badge-high">{{ result.risk_summary.get('high', 0) }}</span></span></div>
|
||||||
|
<div class="stat"><span class="stat-label">Medium</span><span class="stat-value"><span class="badge badge-medium">{{ result.risk_summary.get('medium', 0) }}</span></span></div>
|
||||||
|
<div class="stat"><span class="stat-label">Low</span><span class="stat-value"><span class="badge badge-low">{{ result.risk_summary.get('low', 0) }}</span></span></div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="card">
|
||||||
|
<h2>Risk by Percentage</h2>
|
||||||
|
<p style="margin-bottom: 10px;">Critical: {{ "%.1f"|format(result.risk_summary.get('percentage_critical', 0)) }}%</p>
|
||||||
|
<div class="progress-bar"><div class="progress-fill" style="width: {{ result.risk_summary.get('percentage_critical', 0) }}%; background: #dc3545;"></div></div>
|
||||||
|
<p style="margin: 10px 0 5px;">High: {{ "%.1f"|format(result.risk_summary.get('percentage_high', 0)) }}%</p>
|
||||||
|
<div class="progress-bar"><div class="progress-fill" style="width: {{ result.risk_summary.get('percentage_high', 0) }}%; background: #fd7e14;"></div></div>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="grid">
|
||||||
|
<div class="card">
|
||||||
|
<h2>Risk Distribution Chart</h2>
|
||||||
|
<div class="chart-container">
|
||||||
|
<canvas id="riskChart"></canvas>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="card">
|
||||||
|
<h2>Top Knowledge Hotspots</h2>
|
||||||
|
<table>
|
||||||
|
<thead><tr><th>File</th><th>Author</th><th>Share</th><th>Risk</th></tr></thead>
|
||||||
|
<tbody>
|
||||||
|
{% for hotspot in result.hotspots[:10] %}
|
||||||
|
<tr>
|
||||||
|
<td>{{ hotspot.file_path[:30] }}</td>
|
||||||
|
<td>{{ hotspot.top_author[:15] }}</td>
|
||||||
|
<td>{{ "%.0f"|format(hotspot.top_author_share * 100) }}%</td>
|
||||||
|
<td><span class="badge badge-{{ hotspot.risk_level }}">{{ hotspot.risk_level }}</span></td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
{% if result.suggestions %}
|
||||||
|
<div class="card">
|
||||||
|
<h2>Diversification Suggestions</h2>
|
||||||
|
{% for suggestion in result.suggestions %}
|
||||||
|
<div class="suggestion suggestion-priority-{{ suggestion.priority }}">
|
||||||
|
<strong>{{ suggestion.priority|upper }}</strong>: {{ suggestion.action }}
|
||||||
|
</div>
|
||||||
|
{% endfor %}
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<div class="card">
|
||||||
|
<h2>All Analyzed Files</h2>
|
||||||
|
<table>
|
||||||
|
<thead><tr><th>File</th><th>Commits</th><th>Authors</th><th>Bus Factor</th><th>Risk</th></tr></thead>
|
||||||
|
<tbody>
|
||||||
|
{% for file in result.files[:30] %}
|
||||||
|
<tr>
|
||||||
|
<td>{{ file.path[:40] }}</td>
|
||||||
|
<td>{{ file.total_commits }}</td>
|
||||||
|
<td>{{ file.num_authors }}</td>
|
||||||
|
<td>{{ "%.2f"|format(file.bus_factor) }}</td>
|
||||||
|
<td><span class="badge badge-{{ file.risk_level }}">{{ file.risk_level }}</span></td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<script>
|
||||||
|
const riskData = {
|
||||||
|
labels: ['Critical', 'High', 'Medium', 'Low'],
|
||||||
|
datasets: [{
|
||||||
|
data: [
|
||||||
|
{{ result.risk_summary.get('critical', 0) }},
|
||||||
|
{{ result.risk_summary.get('high', 0) }},
|
||||||
|
{{ result.risk_summary.get('medium', 0) }},
|
||||||
|
{{ result.risk_summary.get('low', 0) }}
|
||||||
|
],
|
||||||
|
backgroundColor: ['#dc3545', '#fd7e14', '#ffc107', '#28a745']
|
||||||
|
}]
|
||||||
|
};
|
||||||
|
|
||||||
|
new Chart(document.getElementById('riskChart'), {
|
||||||
|
type: 'doughnut',
|
||||||
|
data: riskData,
|
||||||
|
options: { responsive: true, maintainAspectRatio: false }
|
||||||
|
});
|
||||||
|
</script>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
return self.env.from_string(template_str)
|
||||||
|
|
||||||
|
def generate_standalone(self, result: RepositoryResult) -> str:
|
||||||
|
"""Generate standalone HTML with inline resources.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to convert.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Complete HTML string.
|
||||||
|
"""
|
||||||
|
template = self.create_inline_template()
|
||||||
|
charts_data = self.generate_charts_data(result)
|
||||||
|
|
||||||
|
return template.render(
|
||||||
|
result=result,
|
||||||
|
generated_at=datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC"),
|
||||||
|
charts_data=charts_data
|
||||||
|
)
|
||||||
|
|
||||||
|
def save_standalone(self, result: RepositoryResult, file_path: str) -> None:
|
||||||
|
"""Save standalone HTML to a file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to save.
|
||||||
|
file_path: Path to output file.
|
||||||
|
"""
|
||||||
|
html_content = self.generate_standalone(result)
|
||||||
|
|
||||||
|
with open(file_path, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(html_content)
|
||||||
130
repohealth-cli/src/repohealth/reporters/json_reporter.py
Normal file
130
repohealth-cli/src/repohealth/reporters/json_reporter.py
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
from repohealth.analyzers.risk_analyzer import DiversificationSuggestion, Hotspot
|
||||||
|
from repohealth.models.file_stats import FileAnalysis
|
||||||
|
from repohealth.models.result import RepositoryResult
|
||||||
|
|
||||||
|
|
||||||
|
class JSONReporter:
|
||||||
|
"""Reporter for JSON output."""
|
||||||
|
|
||||||
|
def __init__(self, indent: int = 2):
|
||||||
|
"""Initialize the reporter.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
indent: JSON indentation level.
|
||||||
|
"""
|
||||||
|
self.indent = indent
|
||||||
|
|
||||||
|
def generate(self, result: RepositoryResult) -> str:
|
||||||
|
"""Generate JSON output from a result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to convert.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
JSON string.
|
||||||
|
"""
|
||||||
|
output = {
|
||||||
|
"version": "1.0",
|
||||||
|
"repository": result.repository_path,
|
||||||
|
"analyzed_at": result.analyzed_at.isoformat(),
|
||||||
|
"files_analyzed": result.files_analyzed,
|
||||||
|
"summary": {
|
||||||
|
"files_analyzed": result.files_analyzed,
|
||||||
|
"total_commits": result.total_commits,
|
||||||
|
"unique_authors": result.unique_authors,
|
||||||
|
"overall_bus_factor": round(result.overall_bus_factor, 2),
|
||||||
|
"gini_coefficient": round(result.gini_coefficient, 3),
|
||||||
|
"overall_risk": result.risk_summary.get("overall_risk", "unknown")
|
||||||
|
},
|
||||||
|
"risk_summary": result.risk_summary,
|
||||||
|
"files": result.files,
|
||||||
|
"hotspots": result.hotspots,
|
||||||
|
"suggestions": result.suggestions,
|
||||||
|
"metadata": result.metadata
|
||||||
|
}
|
||||||
|
|
||||||
|
indent = self.indent if self.indent else None
|
||||||
|
return json.dumps(output, indent=indent, default=str)
|
||||||
|
|
||||||
|
def save(self, result: RepositoryResult, file_path: str) -> None:
|
||||||
|
"""Save JSON output to a file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to save.
|
||||||
|
file_path: Path to output file.
|
||||||
|
"""
|
||||||
|
json_str = self.generate(result)
|
||||||
|
|
||||||
|
with open(file_path, 'w') as f:
|
||||||
|
f.write(json_str)
|
||||||
|
|
||||||
|
def generate_file_dict(self, analysis: FileAnalysis) -> dict:
|
||||||
|
"""Convert a FileAnalysis to a dictionary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
analysis: FileAnalysis to convert.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary representation.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"path": analysis.path,
|
||||||
|
"total_commits": analysis.total_commits,
|
||||||
|
"num_authors": analysis.num_authors,
|
||||||
|
"author_commits": analysis.author_commits,
|
||||||
|
"gini_coefficient": round(analysis.gini_coefficient, 3),
|
||||||
|
"bus_factor": round(analysis.bus_factor, 2),
|
||||||
|
"risk_level": analysis.risk_level,
|
||||||
|
"top_author_share": round(analysis.top_author_share, 3),
|
||||||
|
"module": analysis.module,
|
||||||
|
"extension": analysis.extension,
|
||||||
|
"first_commit": (
|
||||||
|
analysis.first_commit.isoformat()
|
||||||
|
if analysis.first_commit else None
|
||||||
|
),
|
||||||
|
"last_commit": (
|
||||||
|
analysis.last_commit.isoformat()
|
||||||
|
if analysis.last_commit else None
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def generate_hotspot_dict(self, hotspot: Hotspot) -> dict:
|
||||||
|
"""Convert a Hotspot to a dictionary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
hotspot: Hotspot to convert.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary representation.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"file_path": hotspot.file_path,
|
||||||
|
"risk_level": hotspot.risk_level,
|
||||||
|
"bus_factor": round(hotspot.bus_factor, 2),
|
||||||
|
"top_author": hotspot.top_author,
|
||||||
|
"top_author_share": round(hotspot.top_author_share, 3),
|
||||||
|
"total_commits": hotspot.total_commits,
|
||||||
|
"num_authors": hotspot.num_authors,
|
||||||
|
"module": hotspot.module,
|
||||||
|
"suggestion": hotspot.suggestion
|
||||||
|
}
|
||||||
|
|
||||||
|
def generate_suggestion_dict(self, suggestion: DiversificationSuggestion) -> dict:
|
||||||
|
"""Convert a DiversificationSuggestion to a dictionary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
suggestion: Suggestion to convert.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary representation.
|
||||||
|
"""
|
||||||
|
return {
|
||||||
|
"file_path": suggestion.file_path,
|
||||||
|
"current_author": suggestion.current_author,
|
||||||
|
"suggested_authors": suggestion.suggested_authors,
|
||||||
|
"priority": suggestion.priority,
|
||||||
|
"reason": suggestion.reason,
|
||||||
|
"action": suggestion.action
|
||||||
|
}
|
||||||
251
repohealth-cli/src/repohealth/reporters/terminal.py
Normal file
251
repohealth-cli/src/repohealth/reporters/terminal.py
Normal file
@@ -0,0 +1,251 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from rich.box import ROUNDED
|
||||||
|
from rich.console import Console
|
||||||
|
from rich.panel import Panel
|
||||||
|
from rich.progress import BarColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn
|
||||||
|
from rich.table import Table
|
||||||
|
from rich.text import Text
|
||||||
|
|
||||||
|
from repohealth.models.result import RepositoryResult
|
||||||
|
|
||||||
|
|
||||||
|
class TerminalReporter:
|
||||||
|
"""Reporter for terminal output using Rich."""
|
||||||
|
|
||||||
|
RISK_COLORS = {
|
||||||
|
"critical": "red",
|
||||||
|
"high": "orange3",
|
||||||
|
"medium": "yellow",
|
||||||
|
"low": "green",
|
||||||
|
"unknown": "grey"
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, console: Optional[Console] = None):
|
||||||
|
"""Initialize the reporter.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
console: Rich Console instance.
|
||||||
|
"""
|
||||||
|
self.console = console or Console()
|
||||||
|
|
||||||
|
def display_result(self, result: RepositoryResult) -> None:
|
||||||
|
"""Display a complete analysis result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to display.
|
||||||
|
"""
|
||||||
|
self.console.print(Panel(
|
||||||
|
self._get_overview_text(result),
|
||||||
|
title="Repository Health Analysis",
|
||||||
|
subtitle=f"Analyzed: {result.analyzed_at.strftime('%Y-%m-%d %H:%M')}",
|
||||||
|
expand=False
|
||||||
|
))
|
||||||
|
|
||||||
|
self._display_risk_summary(result)
|
||||||
|
self._display_file_stats(result)
|
||||||
|
self._display_hotspots(result)
|
||||||
|
self._display_suggestions(result)
|
||||||
|
|
||||||
|
def _get_overview_text(self, result: RepositoryResult) -> Text:
|
||||||
|
"""Get overview text for the result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to display.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Rich Text object.
|
||||||
|
"""
|
||||||
|
text = Text()
|
||||||
|
text.append("Repository: ", style="bold")
|
||||||
|
text.append(f"{result.repository_path}\n")
|
||||||
|
text.append("Files Analyzed: ", style="bold")
|
||||||
|
text.append(f"{result.files_analyzed}\n")
|
||||||
|
text.append("Total Commits: ", style="bold")
|
||||||
|
text.append(f"{result.total_commits}\n")
|
||||||
|
text.append("Unique Authors: ", style="bold")
|
||||||
|
text.append(f"{result.unique_authors}\n")
|
||||||
|
text.append("Overall Bus Factor: ", style="bold")
|
||||||
|
text.append(f"{result.overall_bus_factor:.2f}\n")
|
||||||
|
text.append("Gini Coefficient: ", style="bold")
|
||||||
|
text.append(f"{result.gini_coefficient:.3f}\n")
|
||||||
|
return text
|
||||||
|
|
||||||
|
def _display_risk_summary(self, result: RepositoryResult) -> None:
|
||||||
|
"""Display risk summary.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to display.
|
||||||
|
"""
|
||||||
|
summary = result.risk_summary
|
||||||
|
if not summary:
|
||||||
|
return
|
||||||
|
|
||||||
|
table = Table(title="Risk Summary", box=ROUNDED)
|
||||||
|
table.add_column("Risk Level", justify="center")
|
||||||
|
table.add_column("Count", justify="center")
|
||||||
|
table.add_column("Percentage", justify="center")
|
||||||
|
|
||||||
|
levels = ["critical", "high", "medium", "low"]
|
||||||
|
for level in levels:
|
||||||
|
count = summary.get(level, 0)
|
||||||
|
pct = summary.get(f"percentage_{level}", 0)
|
||||||
|
color = self.RISK_COLORS.get(level, "grey")
|
||||||
|
table.add_row(
|
||||||
|
f"[{color}]{level.upper()}[/]",
|
||||||
|
str(count),
|
||||||
|
f"{pct:.1f}%"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.console.print(Panel(table, title="Risk Overview", expand=False))
|
||||||
|
|
||||||
|
def _display_file_stats(self, result: RepositoryResult) -> None:
|
||||||
|
"""Display file statistics table.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to display.
|
||||||
|
"""
|
||||||
|
if not result.files:
|
||||||
|
return
|
||||||
|
|
||||||
|
table = Table(title="Top Files by Risk", box=ROUNDED)
|
||||||
|
table.add_column("File", style="dim", width=40)
|
||||||
|
table.add_column("Commits", justify="right")
|
||||||
|
table.add_column("Authors", justify="right")
|
||||||
|
table.add_column("Bus Factor", justify="right")
|
||||||
|
table.add_column("Risk", justify="center")
|
||||||
|
table.add_column("Top Author %", justify="right")
|
||||||
|
|
||||||
|
sorted_files = sorted(
|
||||||
|
result.files,
|
||||||
|
key=lambda x: (
|
||||||
|
{"critical": 0, "high": 1, "medium": 2, "low": 3}.get(x.get("risk_level"), 4),
|
||||||
|
-x.get("bus_factor", 1)
|
||||||
|
)
|
||||||
|
)[:15]
|
||||||
|
|
||||||
|
for file_data in sorted_files:
|
||||||
|
risk_level = file_data.get("risk_level", "unknown")
|
||||||
|
color = self.RISK_COLORS.get(risk_level, "grey")
|
||||||
|
|
||||||
|
table.add_row(
|
||||||
|
file_data.get("path", "")[:40],
|
||||||
|
str(file_data.get("total_commits", 0)),
|
||||||
|
str(file_data.get("num_authors", 0)),
|
||||||
|
f"{file_data.get('bus_factor', 1):.2f}",
|
||||||
|
f"[{color}]{risk_level.upper()}[/]",
|
||||||
|
f"{file_data.get('top_author_share', 0):%}"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.console.print(Panel(table, title="File Analysis", expand=False))
|
||||||
|
|
||||||
|
def _display_hotspots(self, result: RepositoryResult) -> None:
|
||||||
|
"""Display knowledge hotspots.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to display.
|
||||||
|
"""
|
||||||
|
if not result.hotspots:
|
||||||
|
return
|
||||||
|
|
||||||
|
table = Table(title="Knowledge Hotspots", box=ROUNDED)
|
||||||
|
table.add_column("File", style="dim", width=35)
|
||||||
|
table.add_column("Top Author", width=20)
|
||||||
|
table.add_column("Ownership", justify="right")
|
||||||
|
table.add_column("Bus Factor", justify="right")
|
||||||
|
table.add_column("Risk", justify="center")
|
||||||
|
|
||||||
|
for hotspot in result.hotspots[:10]:
|
||||||
|
color = self.RISK_COLORS.get(hotspot.risk_level, "grey")
|
||||||
|
table.add_row(
|
||||||
|
hotspot.file_path[:35],
|
||||||
|
hotspot.top_author[:20],
|
||||||
|
f"{hotspot.top_author_share:.0%}",
|
||||||
|
f"{hotspot.bus_factor:.2f}",
|
||||||
|
f"[{color}]{hotspot.risk_level.upper()}[/]"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.console.print(Panel(table, title="Hotspots", expand=False))
|
||||||
|
|
||||||
|
def _display_suggestions(self, result: RepositoryResult) -> None:
|
||||||
|
"""Display diversification suggestions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: RepositoryResult to display.
|
||||||
|
"""
|
||||||
|
if not result.suggestions:
|
||||||
|
return
|
||||||
|
|
||||||
|
table = Table(title="Diversification Suggestions", box=ROUNDED)
|
||||||
|
table.add_column("Priority", width=10)
|
||||||
|
table.add_column("File", style="dim", width=30)
|
||||||
|
table.add_column("Action", width=40)
|
||||||
|
|
||||||
|
priority_colors = {
|
||||||
|
"critical": "red",
|
||||||
|
"high": "orange3",
|
||||||
|
"medium": "yellow"
|
||||||
|
}
|
||||||
|
|
||||||
|
for suggestion in result.suggestions[:10]:
|
||||||
|
color = priority_colors.get(suggestion.priority, "grey")
|
||||||
|
table.add_row(
|
||||||
|
f"[{color}]{suggestion.priority.upper()}[/]",
|
||||||
|
suggestion.file_path[:30],
|
||||||
|
suggestion.action[:40]
|
||||||
|
)
|
||||||
|
|
||||||
|
self.console.print(Panel(table, title="Suggestions", expand=False))
|
||||||
|
|
||||||
|
def display_progress(self, message: str) -> Progress:
|
||||||
|
"""Display a progress indicator.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: Progress message.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Progress instance for updating.
|
||||||
|
"""
|
||||||
|
return Progress(
|
||||||
|
SpinnerColumn(),
|
||||||
|
TextColumn("[progress.description]{task.description}"),
|
||||||
|
BarColumn(),
|
||||||
|
TaskProgressColumn(),
|
||||||
|
console=self.console
|
||||||
|
)
|
||||||
|
|
||||||
|
def display_error(self, message: str) -> None:
|
||||||
|
"""Display an error message.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: Error message to display.
|
||||||
|
"""
|
||||||
|
self.console.print(Panel(
|
||||||
|
Text(message, style="red"),
|
||||||
|
title="Error",
|
||||||
|
expand=False
|
||||||
|
))
|
||||||
|
|
||||||
|
def display_warning(self, message: str) -> None:
|
||||||
|
"""Display a warning message.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: Warning message to display.
|
||||||
|
"""
|
||||||
|
self.console.print(Panel(
|
||||||
|
Text(message, style="yellow"),
|
||||||
|
title="Warning",
|
||||||
|
expand=False
|
||||||
|
))
|
||||||
|
|
||||||
|
def display_info(self, message: str) -> None:
|
||||||
|
"""Display an info message.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message: Info message to display.
|
||||||
|
"""
|
||||||
|
self.console.print(Panel(
|
||||||
|
Text(message, style="blue"),
|
||||||
|
title="Info",
|
||||||
|
expand=False
|
||||||
|
))
|
||||||
Reference in New Issue
Block a user