import json import re from dataclasses import dataclass, field from enum import Enum from pathlib import Path from ..config import Config, StrictnessProfile from ..git import FileChange, GitRepo from ..llm import LLMProvider, OllamaProvider from ..llm.templates import ReviewPromptTemplates class IssueSeverity(str, Enum): CRITICAL = "critical" WARNING = "warning" INFO = "info" class IssueCategory(str, Enum): BUG = "bug" SECURITY = "security" STYLE = "style" PERFORMANCE = "performance" DOCUMENTATION = "documentation" @dataclass class Issue: file: str line: int severity: IssueSeverity category: IssueCategory message: str suggestion: str | None = None raw_line: str | None = None def to_dict(self) -> dict: return { "file": self.file, "line": self.line, "severity": self.severity.value, "category": self.category.value, "message": self.message, "suggestion": self.suggestion, "raw_line": self.raw_line } @classmethod def from_dict(cls, data: dict) -> "Issue": return cls( file=data["file"], line=data["line"], severity=IssueSeverity(data["severity"]), category=IssueCategory(data["category"]), message=data["message"], suggestion=data.get("suggestion"), raw_line=data.get("raw_line") ) @dataclass class ReviewSummary: critical_count: int = 0 warning_count: int = 0 info_count: int = 0 files_reviewed: int = 0 lines_changed: int = 0 overall_assessment: str = "" issues_by_category: dict = field(default_factory=dict) issues_by_file: dict = field(default_factory=dict) def to_dict(self) -> dict: return { "critical_count": self.critical_count, "warning_count": self.warning_count, "info_count": self.info_count, "files_reviewed": self.files_reviewed, "lines_changed": self.lines_changed, "overall_assessment": self.overall_assessment, "issues_by_category": self.issues_by_category, "issues_by_file": self.issues_by_file } @dataclass class ReviewResult: issues: list[Issue] = field(default_factory=list) summary: ReviewSummary = field(default_factory=ReviewSummary) model_used: str = "" tokens_used: int = 0 review_mode: str = "" error: str | None = None def has_critical_issues(self) -> bool: return any(issue.severity == IssueSeverity.CRITICAL for issue in self.issues) def has_issues(self) -> bool: return len(self.issues) > 0 def get_issues_by_severity(self, severity: IssueSeverity) -> list[Issue]: return [issue for issue in self.issues if issue.severity == severity] def get_issues_by_file(self, filename: str) -> list[Issue]: return [issue for issue in self.issues if issue.file == filename] def get_issues_by_category(self, category: IssueCategory) -> list[Issue]: return [issue for issue in self.issues if issue.category == category] def to_json(self) -> str: return json.dumps({ "issues": [issue.to_dict() for issue in self.issues], "summary": self.summary.to_dict(), "model_used": self.model_used, "tokens_used": self.tokens_used, "review_mode": self.review_mode }, indent=2) def to_markdown(self) -> str: lines = ["# AI Commit Review Results\n"] lines.append("## Summary\n") lines.append(f"- **Files Reviewed**: {self.summary.files_reviewed}") lines.append(f"- **Lines Changed**: {self.summary.lines_changed}") lines.append(f"- **Critical Issues**: {self.summary.critical_count}") lines.append(f"- **Warnings**: {self.summary.warning_count}") lines.append(f"- **Info**: {self.summary.info_count}") lines.append(f"- **Assessment**: {self.summary.overall_assessment}\n") if self.issues: lines.append("## Issues Found\n") for severity in [IssueSeverity.CRITICAL, IssueSeverity.WARNING, IssueSeverity.INFO]: severity_issues = self.get_issues_by_severity(severity) if severity_issues: lines.append(f"### {severity.value.upper()} ({len(severity_issues)})\n") for issue in severity_issues: lines.append(f"#### {issue.file}:{issue.line}") lines.append(f"- **Category**: {issue.category.value}") lines.append(f"- **Message**: {issue.message}") if issue.suggestion: lines.append(f"- **Suggestion**: {issue.suggestion}") lines.append("") return "\n".join(lines) class ReviewEngine: def __init__( self, config: Config | None = None, llm_provider: LLMProvider | None = None ): self.config = config or Config() self.llm_provider = llm_provider or OllamaProvider( endpoint=self.config.llm.endpoint, model=self.config.llm.model, timeout=self.config.llm.timeout ) self.repo: GitRepo | None = None def set_repo(self, path: Path) -> None: self.repo = GitRepo(path) def _parse_llm_response(self, response_text: str, files: list[FileChange]) -> ReviewResult: result = ReviewResult() try: json_match = re.search(r'\{[\s\S]*\}', response_text) if json_match: json_str = json_match.group() data = json.loads(json_str) issues_data = data.get("issues", []) for issue_data in issues_data: try: issue = Issue.from_dict(issue_data) result.issues.append(issue) except Exception: pass summary_data = data.get("summary", {}) result.summary.critical_count = summary_data.get("critical_count", 0) result.summary.warning_count = summary_data.get("warning_count", 0) result.summary.info_count = summary_data.get("info_count", 0) result.summary.overall_assessment = summary_data.get("overall_assessment", "") else: text_issues = self._parse_text_response(response_text, files) result.issues = text_issues except json.JSONDecodeError: result.issues = self._parse_text_response(response_text, files) return result def _parse_text_response(self, response_text: str, files: list[FileChange]) -> list[Issue]: # noqa: ARG002 issues = [] lines = response_text.split("\n") current_file = "" for line in lines: file_match = re.match(r'^\*\*(.+?)\*\*:\s*(\d+)', line) if file_match: current_file = file_match.group(1) line_num = int(file_match.group(2)) severity = IssueSeverity.WARNING if "critical" in line.lower(): severity = IssueSeverity.CRITICAL elif "security" in line.lower(): severity = IssueSeverity.CRITICAL category = IssueCategory.SECURITY else: category = IssueCategory.BUG message = line suggestion = None if "->" in line: parts = line.split("->") message = parts[0].strip() suggestion = "->".join(parts[1:]).strip() issues.append(Issue( file=current_file, line=line_num, severity=severity, category=category, message=message, suggestion=suggestion )) return issues def _get_strictness_profile(self) -> StrictnessProfile: return self.config.strictness_profiles.get_profile( self.config.review.strictness ) def _filter_issues_by_strictness(self, issues: list[Issue]) -> list[Issue]: profile = self._get_strictness_profile() severity_order = { IssueSeverity.CRITICAL: 0, IssueSeverity.WARNING: 1, IssueSeverity.INFO: 2 } min_severity = profile.min_severity.lower() min_level = 2 if min_severity == "critical": min_level = 0 elif min_severity == "warning": min_level = 1 filtered = [] for issue in issues: level = severity_order.get(issue.severity, 2) if level <= min_level: if issue.category == IssueCategory.SECURITY and not profile.check_security: continue if issue.category == IssueCategory.BUG and not profile.check_bugs: continue if issue.category == IssueCategory.STYLE and not profile.check_style: continue if issue.category == IssueCategory.PERFORMANCE and not profile.check_performance: continue if issue.category == IssueCategory.DOCUMENTATION and not profile.check_documentation: continue filtered.append(issue) return filtered def _aggregate_summary(self, issues: list[Issue], files: list[FileChange]) -> ReviewSummary: summary = ReviewSummary() summary.files_reviewed = len(files) summary.lines_changed = sum( sum(1 for line in f.diff.split("\n") if line.startswith("+") and not line.startswith("+++")) for f in files ) for issue in issues: if issue.severity == IssueSeverity.CRITICAL: summary.critical_count += 1 elif issue.severity == IssueSeverity.WARNING: summary.warning_count += 1 else: summary.info_count += 1 if issue.category.value not in summary.issues_by_category: summary.issues_by_category[issue.category.value] = [] summary.issues_by_category[issue.category.value].append(issue.file) if issue.file not in summary.issues_by_file: summary.issues_by_file[issue.file] = [] summary.issues_by_file[issue.file].append(issue.line) if summary.critical_count > 0: summary.overall_assessment = "Critical issues found. Review recommended before committing." elif summary.warning_count > 0: summary.overall_assessment = "Warnings found. Consider addressing before committing." elif summary.info_count > 0: summary.overall_assessment = "Minor issues found. Ready for commit with optional fixes." else: summary.overall_assessment = "No issues found. Code is ready for commit." return summary def review_staged_changes( self, files: list[FileChange] | None = None, strictness: str | None = None, language: str | None = None ) -> ReviewResult: if files is None: if self.repo is None: self.repo = GitRepo(Path.cwd()) files = self.repo.get_all_staged_changes() if not files: return ReviewResult(error="No staged changes found") result = ReviewResult() result.review_mode = strictness or self.config.review.strictness if strictness is None: strictness = self.config.review.strictness all_issues = [] for file_change in files: if not file_change.diff.strip(): continue file_language = language if not file_language and self.repo is not None: file_language = self.repo.get_file_language(file_change.filename) prompt = ReviewPromptTemplates.get_prompt( diff=file_change.diff, strictness=strictness, language=file_language or "" ) try: if self.llm_provider.is_available(): response = self.llm_provider.generate( prompt, max_tokens=self.config.llm.max_tokens, temperature=self.config.llm.temperature ) result.model_used = response.model result.tokens_used += response.tokens_used file_result = self._parse_llm_response(response.text, [file_change]) all_issues.extend(file_result.issues) else: result.error = "LLM provider is not available" return result except Exception as e: result.error = f"Review failed: {e!s}" return result filtered_issues = self._filter_issues_by_strictness(all_issues) max_issues = self.config.review.max_issues_per_file limited_issues = filtered_issues[:max_issues * len(files)] result.issues = limited_issues result.summary = self._aggregate_summary(limited_issues, files) return result def review_commit( self, sha: str, strictness: str | None = None ) -> ReviewResult: if self.repo is None: self.repo = GitRepo(Path.cwd()) commit_info = self.repo.get_commit_info(sha) if commit_info is None: return ReviewResult(error=f"Commit {sha} not found") result = ReviewResult() result.review_mode = strictness or self.config.review.strictness if strictness is None: strictness = self.config.review.strictness all_issues = [] for file_change in commit_info.changes: if not file_change.diff.strip(): continue prompt = ReviewPromptTemplates.get_commit_review_prompt( diff=file_change.diff, commit_message=commit_info.message, strictness=strictness ) try: if self.llm_provider.is_available(): response = self.llm_provider.generate( prompt, max_tokens=self.config.llm.max_tokens, temperature=self.config.llm.temperature ) result.model_used = response.model result.tokens_used += response.tokens_used file_result = self._parse_llm_response(response.text, [file_change]) all_issues.extend(file_result.issues) else: result.error = "LLM provider is not available" return result except Exception as e: result.error = f"Review failed: {e!s}" return result filtered_issues = self._filter_issues_by_strictness(all_issues) result.issues = filtered_issues result.summary = self._aggregate_summary(filtered_issues, commit_info.changes) return result