diff --git a/repohealth-cli/src/repohealth/analyzers/risk_analyzer.py b/repohealth-cli/src/repohealth/analyzers/risk_analyzer.py new file mode 100644 index 0000000..f8ccee6 --- /dev/null +++ b/repohealth-cli/src/repohealth/analyzers/risk_analyzer.py @@ -0,0 +1,307 @@ +from dataclasses import dataclass +from typing import Optional + +from repohealth.analyzers.bus_factor import BusFactorCalculator +from repohealth.models.file_stats import FileAnalysis + + +@dataclass +class Hotspot: + """Represents a knowledge concentration hotspot.""" + + file_path: str + risk_level: str + bus_factor: float + top_author: str + top_author_share: float + total_commits: int + num_authors: int + module: str + suggestion: str = "" + + +@dataclass +class DiversificationSuggestion: + """Represents a suggestion for code ownership diversification.""" + + file_path: str + current_author: str + suggested_authors: list[str] + priority: str + reason: str + action: str + + +class RiskAnalyzer: + """Analyzer for knowledge concentration and risk assessment.""" + + CRITICAL_THRESHOLD = 0.8 + HIGH_THRESHOLD = 0.6 + MEDIUM_THRESHOLD = 0.4 + + def __init__(self, risk_threshold: float = 0.7): + """Initialize the analyzer. + + Args: + risk_threshold: Threshold for risk detection. + """ + self.risk_threshold = risk_threshold + self.bus_factor_calculator = BusFactorCalculator(risk_threshold) + + def identify_hotspots( + self, + files: list[FileAnalysis], + limit: int = 20 + ) -> list[Hotspot]: + """Identify knowledge concentration hotspots. + + Args: + files: List of FileAnalysis objects. + limit: Maximum number of hotspots to return. + + Returns: + List of Hotspot objects sorted by risk. + """ + hotspots = [] + + for analysis in files: + if analysis.total_commits == 0: + continue + + top_author_data = analysis.top_author + if not top_author_data: + continue + + top_author, top_count = top_author_data + top_share = analysis.top_author_share + + if top_share >= self.CRITICAL_THRESHOLD: + risk_level = "critical" + elif top_share >= self.HIGH_THRESHOLD: + risk_level = "high" + elif top_share >= self.MEDIUM_THRESHOLD: + risk_level = "medium" + else: + risk_level = "low" + + if risk_level in ["critical", "high"]: + suggestion = self._generate_suggestion(analysis, top_author) + + hotspots.append(Hotspot( + file_path=analysis.path, + risk_level=risk_level, + bus_factor=analysis.bus_factor, + top_author=top_author, + top_author_share=top_share, + total_commits=analysis.total_commits, + num_authors=analysis.num_authors, + module=analysis.module, + suggestion=suggestion + )) + + hotspots.sort(key=lambda x: (x.risk_level, -x.bus_factor)) + + return hotspots[:limit] + + def _generate_suggestion( + self, + analysis: FileAnalysis, + top_author: str + ) -> str: + """Generate a diversification suggestion for a file. + + Args: + analysis: FileAnalysis for the file. + top_author: The primary author. + + Returns: + Suggestion string. + """ + if analysis.num_authors == 1: + return ( + f"This file is entirely owned by {top_author}. " + "Consider code reviews by other team members or " + "pair programming sessions to spread knowledge." + ) + elif analysis.top_author_share >= 0.8: + return ( + f"This file is {analysis.top_author_share:.0%} owned by {top_author}. " + "Encourage other developers to contribute to this file." + ) + else: + return ( + f"Primary ownership by {top_author} at {analysis.top_author_share:.0%}. " + "Gradually increase contributions from other team members." + ) + + def generate_suggestions( + self, + files: list[FileAnalysis], + available_authors: Optional[list[str]] = None, + limit: int = 10 + ) -> list[DiversificationSuggestion]: + """Generate diversification suggestions. + + Args: + files: List of FileAnalysis objects. + available_authors: List of available authors to suggest. + limit: Maximum number of suggestions to return. + + Returns: + List of DiversificationSuggestion objects. + """ + suggestions = [] + + for analysis in files: + if analysis.total_commits == 0: + continue + + top_author_data = analysis.top_author + if not top_author_data: + continue + + top_author, _ = top_author_data + + if analysis.top_author_share < self.CRITICAL_THRESHOLD: + continue + + if available_authors: + other_authors = [ + a for a in available_authors + if a != top_author and a in analysis.author_commits + ] + if len(other_authors) < 2: + other_authors.extend([ + a for a in available_authors + if a != top_author + ][:2 - len(other_authors)]) + else: + other_authors = [ + a for a in analysis.author_commits.keys() + if a != top_author + ][:3] + + if not other_authors: + continue + + if analysis.top_author_share >= 0.9: + priority = "critical" + elif analysis.top_author_share >= 0.8: + priority = "high" + else: + priority = "medium" + + reason = ( + f"File has {analysis.top_author_share:.0%} ownership by {top_author} " + f"across {analysis.total_commits} commits with {analysis.num_authors} authors." + ) + + action = ( + f"Assign code reviews to {', '.join(other_authors[:2])} " + f"for changes to {analysis.path}" + ) + + suggestions.append(DiversificationSuggestion( + file_path=analysis.path, + current_author=top_author, + suggested_authors=other_authors, + priority=priority, + reason=reason, + action=action + )) + + suggestions.sort(key=lambda x: ( + {"critical": 0, "high": 1, "medium": 2}[x.priority], + x.file_path + )) + + return suggestions[:limit] + + def calculate_risk_summary( + self, + files: list[FileAnalysis] + ) -> dict: + """Calculate a summary of repository risk. + + Args: + files: List of FileAnalysis objects. + + Returns: + Dictionary with risk summary statistics. + """ + if not files: + return { + "critical": 0, + "high": 0, + "medium": 0, + "low": 0, + "unknown": 0, + "overall_risk": "unknown" + } + + risk_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0} + + for analysis in files: + risk_counts[analysis.risk_level] += 1 + + total = len(files) + + if risk_counts["critical"] >= total * 0.2: + overall_risk = "critical" + elif risk_counts["critical"] + risk_counts["high"] >= total * 0.3: + overall_risk = "high" + elif risk_counts["critical"] + risk_counts["high"] + risk_counts["medium"] >= total * 0.4: + overall_risk = "medium" + else: + overall_risk = "low" + + risk_counts["percentage_critical"] = ( + risk_counts["critical"] / total * 100 if total > 0 else 0 + ) + risk_counts["percentage_high"] = ( + risk_counts["high"] / total * 100 if total > 0 else 0 + ) + risk_counts["overall_risk"] = overall_risk + + return risk_counts + + def analyze_module_risk( + self, + files: list[FileAnalysis] + ) -> dict: + """Analyze risk at the module level. + + Args: + files: List of FileAnalysis objects. + + Returns: + Dictionary mapping modules to risk statistics. + """ + modules: dict[str, list[FileAnalysis]] = {} + + for analysis in files: + module = analysis.module or "root" + if module not in modules: + modules[module] = [] + modules[module].append(analysis) + + module_risk = {} + + for module, module_files in modules.items(): + avg_bus_factor = self.bus_factor_calculator.calculate_repository_bus_factor( + module_files + ) + + risk_summary = self.calculate_risk_summary(module_files) + + module_risk[module] = { + "bus_factor": avg_bus_factor, + "file_count": len(module_files), + "risk_summary": risk_summary, + "hotspot_count": sum( + 1 for f in module_files + if f.risk_level in ["critical", "high"] + ) + } + + return module_risk