"""Risk analysis and hotspot identification module.""" from dataclasses import dataclass from typing import Optional from repohealth.analyzers.bus_factor import BusFactorCalculator from repohealth.models.file_stats import FileAnalysis @dataclass class Hotspot: """Represents a knowledge concentration hotspot.""" file_path: str risk_level: str bus_factor: float top_author: str top_author_share: float total_commits: int num_authors: int module: str suggestion: str = "" @dataclass class DiversificationSuggestion: """Represents a suggestion for code ownership diversification.""" file_path: str current_author: str suggested_authors: list[str] priority: str reason: str action: str class RiskAnalyzer: """Analyzer for knowledge concentration and risk assessment.""" CRITICAL_THRESHOLD = 0.8 HIGH_THRESHOLD = 0.6 MEDIUM_THRESHOLD = 0.4 def __init__(self, risk_threshold: float = 0.7): """Initialize the analyzer. Args: risk_threshold: Threshold for risk detection. """ self.risk_threshold = risk_threshold self.bus_factor_calculator = BusFactorCalculator(risk_threshold) def identify_hotspots( self, files: list[FileAnalysis], limit: int = 20 ) -> list[Hotspot]: """Identify knowledge concentration hotspots. Args: files: List of FileAnalysis objects. limit: Maximum number of hotspots to return. Returns: List of Hotspot objects sorted by risk. """ hotspots = [] for analysis in files: if analysis.total_commits == 0: continue top_author_data = analysis.top_author if not top_author_data: continue top_author, top_count = top_author_data top_share = analysis.top_author_share if top_share >= self.CRITICAL_THRESHOLD: risk_level = "critical" elif top_share >= self.HIGH_THRESHOLD: risk_level = "high" elif top_share >= self.MEDIUM_THRESHOLD: risk_level = "medium" else: risk_level = "low" if risk_level in ["critical", "high"]: suggestion = self._generate_suggestion(analysis, top_author) hotspots.append(Hotspot( file_path=analysis.path, risk_level=risk_level, bus_factor=analysis.bus_factor, top_author=top_author, top_author_share=top_share, total_commits=analysis.total_commits, num_authors=analysis.num_authors, module=analysis.module, suggestion=suggestion )) hotspots.sort(key=lambda x: (x.risk_level, -x.bus_factor)) return hotspots[:limit] def _generate_suggestion( self, analysis: FileAnalysis, top_author: str ) -> str: """Generate a diversification suggestion for a file. Args: analysis: FileAnalysis for the file. top_author: The primary author. Returns: Suggestion string. """ if analysis.num_authors == 1: return ( f"This file is entirely owned by {top_author}. " "Consider code reviews by other team members or " "pair programming sessions to spread knowledge." ) elif analysis.top_author_share >= 0.8: return ( f"This file is {analysis.top_author_share:.0%} owned by {top_author}. " "Encourage other developers to contribute to this file." ) else: return ( f"Primary ownership by {top_author} at {analysis.top_author_share:.0%}. " "Gradually increase contributions from other team members." ) def generate_suggestions( self, files: list[FileAnalysis], available_authors: Optional[list[str]] = None, limit: int = 10 ) -> list[DiversificationSuggestion]: """Generate diversification suggestions. Args: files: List of FileAnalysis objects. available_authors: List of available authors to suggest. limit: Maximum number of suggestions to return. Returns: List of DiversificationSuggestion objects. """ suggestions = [] for analysis in files: if analysis.total_commits == 0: continue top_author_data = analysis.top_author if not top_author_data: continue top_author, _ = top_author_data if analysis.top_author_share < self.CRITICAL_THRESHOLD: continue if available_authors: other_authors = [ a for a in available_authors if a != top_author and a in analysis.author_commits ] if len(other_authors) < 2: other_authors.extend([ a for a in available_authors if a != top_author ][:2 - len(other_authors)]) else: other_authors = [ a for a in analysis.author_commits.keys() if a != top_author ][:3] if not other_authors: continue if analysis.top_author_share >= 0.9: priority = "critical" elif analysis.top_author_share >= 0.8: priority = "high" else: priority = "medium" reason = ( f"File has {analysis.top_author_share:.0%} ownership by {top_author} " f"across {analysis.total_commits} commits with {analysis.num_authors} authors." ) action = ( f"Assign code reviews to {', '.join(other_authors[:2])} " f"for changes to {analysis.path}" ) suggestions.append(DiversificationSuggestion( file_path=analysis.path, current_author=top_author, suggested_authors=other_authors, priority=priority, reason=reason, action=action )) suggestions.sort(key=lambda x: ( {"critical": 0, "high": 1, "medium": 2}[x.priority], x.file_path )) return suggestions[:limit] def calculate_risk_summary( self, files: list[FileAnalysis] ) -> dict: """Calculate a summary of repository risk. Args: files: List of FileAnalysis objects. Returns: Dictionary with risk summary statistics. """ if not files: return { "critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0, "overall_risk": "unknown" } risk_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "unknown": 0} for analysis in files: risk_counts[analysis.risk_level] += 1 total = len(files) if risk_counts["critical"] >= total * 0.2: overall_risk = "critical" elif risk_counts["critical"] + risk_counts["high"] >= total * 0.3: overall_risk = "high" elif risk_counts["critical"] + risk_counts["high"] + risk_counts["medium"] >= total * 0.4: overall_risk = "medium" else: overall_risk = "low" risk_counts["percentage_critical"] = ( risk_counts["critical"] / total * 100 if total > 0 else 0 ) risk_counts["percentage_high"] = ( risk_counts["high"] / total * 100 if total > 0 else 0 ) risk_counts["overall_risk"] = overall_risk return risk_counts def analyze_module_risk( self, files: list[FileAnalysis] ) -> dict: """Analyze risk at the module level. Args: files: List of FileAnalysis objects. Returns: Dictionary mapping modules to risk statistics. """ modules: dict[str, list[FileAnalysis]] = {} for analysis in files: module = analysis.module or "root" if module not in modules: modules[module] = [] modules[module].append(analysis) module_risk = {} for module, module_files in modules.items(): avg_bus_factor = self.bus_factor_calculator.calculate_repository_bus_factor( module_files ) risk_summary = self.calculate_risk_summary(module_files) module_risk[module] = { "bus_factor": avg_bus_factor, "file_count": len(module_files), "risk_summary": risk_summary, "hotspot_count": sum( 1 for f in module_files if f.risk_level in ["critical", "high"] ) } return module_risk