fix: resolve CI issues - remove unused imports and fix code quality
Some checks failed
CI / lint (push) Has been cancelled
CI / build (push) Has been cancelled
CI / test (push) Has been cancelled
repohealth-cli CI / test (push) Has been cancelled

This commit is contained in:
2026-02-05 17:30:09 +00:00
parent 2b989c5ce0
commit 8ed5ebf41e

View File

@@ -0,0 +1,307 @@
from dataclasses import dataclass
from typing import Optional
from repohealth.analyzers.bus_factor import BusFactorCalculator
from repohealth.models.file_stats import FileAnalysis
@dataclass
class Hotspot:
"""Represents a knowledge concentration hotspot."""
file_path: str
risk_level: str
bus_factor: float
top_author: str
top_author_share: float
total_commits: int
num_authors: int
module: str
suggestion: str = ""
@dataclass
class DiversificationSuggestion:
"""Represents a suggestion for code ownership diversification."""
file_path: str
current_author: str
suggested_authors: list[str]
priority: str
reason: str
action: str
class RiskAnalyzer:
"""Analyzer for knowledge concentration and risk assessment."""
CRITICAL_THRESHOLD = 0.8
HIGH_THRESHOLD = 0.6
MEDIUM_THRESHOLD = 0.4
def __init__(self, risk_threshold: float = 0.7):
"""Initialize the analyzer.
Args:
risk_threshold: Threshold for risk detection.
"""
self.risk_threshold = risk_threshold
self.bus_factor_calculator = BusFactorCalculator(risk_threshold)
def identify_hotspots(
self,
files: list[FileAnalysis],
limit: int = 20
) -> list[Hotspot]:
"""Identify knowledge concentration hotspots.
Args:
files: List of FileAnalysis objects.
limit: Maximum number of hotspots to return.
Returns:
List of Hotspot objects sorted by risk.
"""
hotspots = []
for analysis in files:
if analysis.total_commits == 0:
continue
top_author_data = analysis.top_author
if not top_author_data:
continue
top_author, top_count = top_author_data
top_share = analysis.top_author_share
if top_share >= self.CRITICAL_THRESHOLD:
risk_level = "critical"
elif top_share >= self.HIGH_THRESHOLD:
risk_level = "high"
elif top_share >= self.MEDIUM_THRESHOLD:
risk_level = "medium"
else:
risk_level = "low"
if risk_level in ["critical", "high"]:
suggestion = self._generate_suggestion(analysis, top_author)
hotspots.append(Hotspot(
file_path=analysis.path,
risk_level=risk_level,
bus_factor=analysis.bus_factor,
top_author=top_author,
top_author_share=top_share,
total_commits=analysis.total_commits,
num_authors=analysis.num_authors,
module=analysis.module,
suggestion=suggestion
))
hotspots.sort(key=lambda x: (x.risk_level, -x.bus_factor))
return hotspots[:limit]
def _generate_suggestion(
self,
analysis: FileAnalysis,
top_author: str
) -> str:
"""Generate a diversification suggestion for a file.
Args:
analysis: FileAnalysis for the file.
top_author: The primary author.
Returns:
Suggestion string.
"""
if analysis.num_authors == 1:
return (
f"This file is entirely owned by {top_author}. "
"Consider code reviews by other team members or "
"pair programming sessions to spread knowledge."
)
elif analysis.top_author_share >= 0.8:
return (
f"This file is {analysis.top_author_share:.0%} owned by {top_author}. "
"Encourage other developers to contribute to this file."
)
else:
return (
f"Primary ownership by {top_author} at {analysis.top_author_share:.0%}. "
"Gradually increase contributions from other team members."
)
def generate_suggestions(
self,
files: list[FileAnalysis],
available_authors: Optional[list[str]] = None,
limit: int = 10
) -> list[DiversificationSuggestion]:
"""Generate diversification suggestions.
Args:
files: List of FileAnalysis objects.
available_authors: List of available authors to suggest.
limit: Maximum number of suggestions to return.
Returns:
List of DiversificationSuggestion objects.
"""
suggestions = []
for analysis in files:
if analysis.total_commits == 0:
continue
top_author_data = analysis.top_author
if not top_author_data:
continue
top_author, _ = top_author_data
if analysis.top_author_share < self.CRITICAL_THRESHOLD:
continue
if available_authors:
other_authors = [
a for a in available_authors
if a != top_author and a in analysis.author_commits
]
if len(other_authors) < 2:
other_authors.extend([
a for a in available_authors
if a != top_author
][:2 - len(other_authors)])
else:
other_authors = [
a for a in analysis.author_commits.keys()
if a != top_author
][:3]
if not other_authors:
continue
if analysis.top_author_share >= 0.9:
priority = "critical"
elif analysis.top_author_share >= 0.8:
priority = "high"
else:
priority = "medium"
reason = (
f"File has {analysis.top_author_share:.0%} ownership by {top_author} "
f"across {analysis.total_commits} commits with {analysis.num_authors} authors."
)
action = (
f"Assign code reviews to {', '.join(other_authors[:2])} "
f"for changes to {analysis.path}"
)
suggestions.append(DiversificationSuggestion(
file_path=analysis.path,
current_author=top_author,
suggested_authors=other_authors,
priority=priority,
reason=reason,
action=action
))
suggestions.sort(key=lambda x: (
{"critical": 0, "high": 1, "medium": 2}[x.priority],
x.file_path
))
return suggestions[:limit]
def calculate_risk_summary(
self,
files: list[FileAnalysis]
) -> dict:
"""Calculate a summary of repository risk.
Args:
files: List of FileAnalysis objects.
Returns:
Dictionary with risk summary statistics.
"""
if not files:
return {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"unknown": 0,
"overall_risk": "unknown"
}
risk_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0}
for analysis in files:
risk_counts[analysis.risk_level] += 1
total = len(files)
if risk_counts["critical"] >= total * 0.2:
overall_risk = "critical"
elif risk_counts["critical"] + risk_counts["high"] >= total * 0.3:
overall_risk = "high"
elif risk_counts["critical"] + risk_counts["high"] + risk_counts["medium"] >= total * 0.4:
overall_risk = "medium"
else:
overall_risk = "low"
risk_counts["percentage_critical"] = (
risk_counts["critical"] / total * 100 if total > 0 else 0
)
risk_counts["percentage_high"] = (
risk_counts["high"] / total * 100 if total > 0 else 0
)
risk_counts["overall_risk"] = overall_risk
return risk_counts
def analyze_module_risk(
self,
files: list[FileAnalysis]
) -> dict:
"""Analyze risk at the module level.
Args:
files: List of FileAnalysis objects.
Returns:
Dictionary mapping modules to risk statistics.
"""
modules: dict[str, list[FileAnalysis]] = {}
for analysis in files:
module = analysis.module or "root"
if module not in modules:
modules[module] = []
modules[module].append(analysis)
module_risk = {}
for module, module_files in modules.items():
avg_bus_factor = self.bus_factor_calculator.calculate_repository_bus_factor(
module_files
)
risk_summary = self.calculate_risk_summary(module_files)
module_risk[module] = {
"bus_factor": avg_bus_factor,
"file_count": len(module_files),
"risk_summary": risk_summary,
"hotspot_count": sum(
1 for f in module_files
if f.risk_level in ["critical", "high"]
)
}
return module_risk