Initial upload: Add repohealth-cli project with CI/CD workflow
This commit is contained in:
311
src/repohealth/analyzers/risk_analyzer.py
Normal file
311
src/repohealth/analyzers/risk_analyzer.py
Normal file
@@ -0,0 +1,311 @@
|
||||
"""Risk analysis and hotspot identification module."""
|
||||
|
||||
from typing import Optional
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
|
||||
from repohealth.models.file_stats import FileAnalysis
|
||||
from repohealth.models.result import RepositoryResult, RiskLevel
|
||||
from repohealth.analyzers.bus_factor import BusFactorCalculator
|
||||
|
||||
|
||||
@dataclass
|
||||
class Hotspot:
|
||||
"""Represents a knowledge concentration hotspot."""
|
||||
|
||||
file_path: str
|
||||
risk_level: str
|
||||
bus_factor: float
|
||||
top_author: str
|
||||
top_author_share: float
|
||||
total_commits: int
|
||||
num_authors: int
|
||||
module: str
|
||||
suggestion: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiversificationSuggestion:
|
||||
"""Represents a suggestion for code ownership diversification."""
|
||||
|
||||
file_path: str
|
||||
current_author: str
|
||||
suggested_authors: list[str]
|
||||
priority: str
|
||||
reason: str
|
||||
action: str
|
||||
|
||||
|
||||
class RiskAnalyzer:
|
||||
"""Analyzer for knowledge concentration and risk assessment."""
|
||||
|
||||
CRITICAL_THRESHOLD = 0.8
|
||||
HIGH_THRESHOLD = 0.6
|
||||
MEDIUM_THRESHOLD = 0.4
|
||||
|
||||
def __init__(self, risk_threshold: float = 0.7):
|
||||
"""Initialize the analyzer.
|
||||
|
||||
Args:
|
||||
risk_threshold: Threshold for risk detection.
|
||||
"""
|
||||
self.risk_threshold = risk_threshold
|
||||
self.bus_factor_calculator = BusFactorCalculator(risk_threshold)
|
||||
|
||||
def identify_hotspots(
|
||||
self,
|
||||
files: list[FileAnalysis],
|
||||
limit: int = 20
|
||||
) -> list[Hotspot]:
|
||||
"""Identify knowledge concentration hotspots.
|
||||
|
||||
Args:
|
||||
files: List of FileAnalysis objects.
|
||||
limit: Maximum number of hotspots to return.
|
||||
|
||||
Returns:
|
||||
List of Hotspot objects sorted by risk.
|
||||
"""
|
||||
hotspots = []
|
||||
|
||||
for analysis in files:
|
||||
if analysis.total_commits == 0:
|
||||
continue
|
||||
|
||||
top_author_data = analysis.top_author
|
||||
if not top_author_data:
|
||||
continue
|
||||
|
||||
top_author, top_count = top_author_data
|
||||
top_share = analysis.top_author_share
|
||||
|
||||
if top_share >= self.CRITICAL_THRESHOLD:
|
||||
risk_level = "critical"
|
||||
elif top_share >= self.HIGH_THRESHOLD:
|
||||
risk_level = "high"
|
||||
elif top_share >= self.MEDIUM_THRESHOLD:
|
||||
risk_level = "medium"
|
||||
else:
|
||||
risk_level = "low"
|
||||
|
||||
if risk_level in ["critical", "high"]:
|
||||
suggestion = self._generate_suggestion(analysis, top_author)
|
||||
|
||||
hotspots.append(Hotspot(
|
||||
file_path=analysis.path,
|
||||
risk_level=risk_level,
|
||||
bus_factor=analysis.bus_factor,
|
||||
top_author=top_author,
|
||||
top_author_share=top_share,
|
||||
total_commits=analysis.total_commits,
|
||||
num_authors=analysis.num_authors,
|
||||
module=analysis.module,
|
||||
suggestion=suggestion
|
||||
))
|
||||
|
||||
hotspots.sort(key=lambda x: (x.risk_level, -x.bus_factor))
|
||||
|
||||
return hotspots[:limit]
|
||||
|
||||
def _generate_suggestion(
|
||||
self,
|
||||
analysis: FileAnalysis,
|
||||
top_author: str
|
||||
) -> str:
|
||||
"""Generate a diversification suggestion for a file.
|
||||
|
||||
Args:
|
||||
analysis: FileAnalysis for the file.
|
||||
top_author: The primary author.
|
||||
|
||||
Returns:
|
||||
Suggestion string.
|
||||
"""
|
||||
if analysis.num_authors == 1:
|
||||
return (
|
||||
f"This file is entirely owned by {top_author}. "
|
||||
"Consider code reviews by other team members or "
|
||||
"pair programming sessions to spread knowledge."
|
||||
)
|
||||
elif analysis.top_author_share >= 0.8:
|
||||
return (
|
||||
f"This file is {analysis.top_author_share:.0%} owned by {top_author}. "
|
||||
"Encourage other developers to contribute to this file."
|
||||
)
|
||||
else:
|
||||
return (
|
||||
f"Primary ownership by {top_author} at {analysis.top_author_share:.0%}. "
|
||||
"Gradually increase contributions from other team members."
|
||||
)
|
||||
|
||||
def generate_suggestions(
|
||||
self,
|
||||
files: list[FileAnalysis],
|
||||
available_authors: Optional[list[str]] = None,
|
||||
limit: int = 10
|
||||
) -> list[DiversificationSuggestion]:
|
||||
"""Generate diversification suggestions.
|
||||
|
||||
Args:
|
||||
files: List of FileAnalysis objects.
|
||||
available_authors: List of available authors to suggest.
|
||||
limit: Maximum number of suggestions to return.
|
||||
|
||||
Returns:
|
||||
List of DiversificationSuggestion objects.
|
||||
"""
|
||||
suggestions = []
|
||||
|
||||
for analysis in files:
|
||||
if analysis.total_commits == 0:
|
||||
continue
|
||||
|
||||
top_author_data = analysis.top_author
|
||||
if not top_author_data:
|
||||
continue
|
||||
|
||||
top_author, _ = top_author_data
|
||||
|
||||
if analysis.top_author_share < self.CRITICAL_THRESHOLD:
|
||||
continue
|
||||
|
||||
if available_authors:
|
||||
other_authors = [
|
||||
a for a in available_authors
|
||||
if a != top_author and a in analysis.author_commits
|
||||
]
|
||||
if len(other_authors) < 2:
|
||||
other_authors.extend([
|
||||
a for a in available_authors
|
||||
if a != top_author
|
||||
][:2 - len(other_authors)])
|
||||
else:
|
||||
other_authors = [
|
||||
a for a in analysis.author_commits.keys()
|
||||
if a != top_author
|
||||
][:3]
|
||||
|
||||
if not other_authors:
|
||||
continue
|
||||
|
||||
if analysis.top_author_share >= 0.9:
|
||||
priority = "critical"
|
||||
elif analysis.top_author_share >= 0.8:
|
||||
priority = "high"
|
||||
else:
|
||||
priority = "medium"
|
||||
|
||||
reason = (
|
||||
f"File has {analysis.top_author_share:.0%} ownership by {top_author} "
|
||||
f"across {analysis.total_commits} commits with {analysis.num_authors} authors."
|
||||
)
|
||||
|
||||
action = (
|
||||
f"Assign code reviews to {', '.join(other_authors[:2])} "
|
||||
f"for changes to {analysis.path}"
|
||||
)
|
||||
|
||||
suggestions.append(DiversificationSuggestion(
|
||||
file_path=analysis.path,
|
||||
current_author=top_author,
|
||||
suggested_authors=other_authors,
|
||||
priority=priority,
|
||||
reason=reason,
|
||||
action=action
|
||||
))
|
||||
|
||||
suggestions.sort(key=lambda x: (
|
||||
{"critical": 0, "high": 1, "medium": 2}[x.priority],
|
||||
x.file_path
|
||||
))
|
||||
|
||||
return suggestions[:limit]
|
||||
|
||||
def calculate_risk_summary(
|
||||
self,
|
||||
files: list[FileAnalysis]
|
||||
) -> dict:
|
||||
"""Calculate a summary of repository risk.
|
||||
|
||||
Args:
|
||||
files: List of FileAnalysis objects.
|
||||
|
||||
Returns:
|
||||
Dictionary with risk summary statistics.
|
||||
"""
|
||||
if not files:
|
||||
return {
|
||||
"critical": 0,
|
||||
"high": 0,
|
||||
"medium": 0,
|
||||
"low": 0,
|
||||
"unknown": 0,
|
||||
"overall_risk": "unknown"
|
||||
}
|
||||
|
||||
risk_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0}
|
||||
|
||||
for analysis in files:
|
||||
risk_counts[analysis.risk_level] += 1
|
||||
|
||||
total = len(files)
|
||||
|
||||
if risk_counts["critical"] >= total * 0.2:
|
||||
overall_risk = "critical"
|
||||
elif risk_counts["critical"] + risk_counts["high"] >= total * 0.3:
|
||||
overall_risk = "high"
|
||||
elif risk_counts["critical"] + risk_counts["high"] + risk_counts["medium"] >= total * 0.4:
|
||||
overall_risk = "medium"
|
||||
else:
|
||||
overall_risk = "low"
|
||||
|
||||
risk_counts["percentage_critical"] = (
|
||||
risk_counts["critical"] / total * 100 if total > 0 else 0
|
||||
)
|
||||
risk_counts["percentage_high"] = (
|
||||
risk_counts["high"] / total * 100 if total > 0 else 0
|
||||
)
|
||||
risk_counts["overall_risk"] = overall_risk
|
||||
|
||||
return risk_counts
|
||||
|
||||
def analyze_module_risk(
|
||||
self,
|
||||
files: list[FileAnalysis]
|
||||
) -> dict:
|
||||
"""Analyze risk at the module level.
|
||||
|
||||
Args:
|
||||
files: List of FileAnalysis objects.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping modules to risk statistics.
|
||||
"""
|
||||
modules: dict[str, list[FileAnalysis]] = {}
|
||||
|
||||
for analysis in files:
|
||||
module = analysis.module or "root"
|
||||
if module not in modules:
|
||||
modules[module] = []
|
||||
modules[module].append(analysis)
|
||||
|
||||
module_risk = {}
|
||||
|
||||
for module, module_files in modules.items():
|
||||
avg_bus_factor = self.bus_factor_calculator.calculate_repository_bus_factor(
|
||||
module_files
|
||||
)
|
||||
|
||||
risk_summary = self.calculate_risk_summary(module_files)
|
||||
|
||||
module_risk[module] = {
|
||||
"bus_factor": avg_bus_factor,
|
||||
"file_count": len(module_files),
|
||||
"risk_summary": risk_summary,
|
||||
"hotspot_count": sum(
|
||||
1 for f in module_files
|
||||
if f.risk_level in ["critical", "high"]
|
||||
)
|
||||
}
|
||||
|
||||
return module_risk
|
||||
Reference in New Issue
Block a user