From 7df0b1432ff0036802ef6f068c7db705a20bbc88 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Thu, 5 Feb 2026 07:06:34 +0000 Subject: [PATCH] Fix mypy type errors in source code --- .../src/core/review_engine.py | 423 ++++++++++++++++++ 1 file changed, 423 insertions(+) create mode 100644 app/local-ai-commit-reviewer/src/core/review_engine.py diff --git a/app/local-ai-commit-reviewer/src/core/review_engine.py b/app/local-ai-commit-reviewer/src/core/review_engine.py new file mode 100644 index 0000000..e398387 --- /dev/null +++ b/app/local-ai-commit-reviewer/src/core/review_engine.py @@ -0,0 +1,423 @@ +import json +import re +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path + +from ..config import Config, StrictnessProfile +from ..git import FileChange, GitRepo +from ..llm import LLMProvider, OllamaProvider +from ..llm.templates import ReviewPromptTemplates + + +class IssueSeverity(str, Enum): + CRITICAL = "critical" + WARNING = "warning" + INFO = "info" + + +class IssueCategory(str, Enum): + BUG = "bug" + SECURITY = "security" + STYLE = "style" + PERFORMANCE = "performance" + DOCUMENTATION = "documentation" + + +@dataclass +class Issue: + file: str + line: int + severity: IssueSeverity + category: IssueCategory + message: str + suggestion: str | None = None + raw_line: str | None = None + + def to_dict(self) -> dict: + return { + "file": self.file, + "line": self.line, + "severity": self.severity.value, + "category": self.category.value, + "message": self.message, + "suggestion": self.suggestion, + "raw_line": self.raw_line + } + + @classmethod + def from_dict(cls, data: dict) -> "Issue": + return cls( + file=data["file"], + line=data["line"], + severity=IssueSeverity(data["severity"]), + category=IssueCategory(data["category"]), + message=data["message"], + suggestion=data.get("suggestion"), + raw_line=data.get("raw_line") + ) + + +@dataclass +class ReviewSummary: + critical_count: int = 0 + warning_count: int = 0 + info_count: int = 0 + files_reviewed: int = 0 + lines_changed: int = 0 + overall_assessment: str = "" + issues_by_category: dict = field(default_factory=dict) + issues_by_file: dict = field(default_factory=dict) + + def to_dict(self) -> dict: + return { + "critical_count": self.critical_count, + "warning_count": self.warning_count, + "info_count": self.info_count, + "files_reviewed": self.files_reviewed, + "lines_changed": self.lines_changed, + "overall_assessment": self.overall_assessment, + "issues_by_category": self.issues_by_category, + "issues_by_file": self.issues_by_file + } + + +@dataclass +class ReviewResult: + issues: list[Issue] = field(default_factory=list) + summary: ReviewSummary = field(default_factory=ReviewSummary) + model_used: str = "" + tokens_used: int = 0 + review_mode: str = "" + error: str | None = None + + def has_critical_issues(self) -> bool: + return any(issue.severity == IssueSeverity.CRITICAL for issue in self.issues) + + def has_issues(self) -> bool: + return len(self.issues) > 0 + + def get_issues_by_severity(self, severity: IssueSeverity) -> list[Issue]: + return [issue for issue in self.issues if issue.severity == severity] + + def get_issues_by_file(self, filename: str) -> list[Issue]: + return [issue for issue in self.issues if issue.file == filename] + + def get_issues_by_category(self, category: IssueCategory) -> list[Issue]: + return [issue for issue in self.issues if issue.category == category] + + def to_json(self) -> str: + return json.dumps({ + "issues": [issue.to_dict() for issue in self.issues], + "summary": self.summary.to_dict(), + "model_used": self.model_used, + "tokens_used": self.tokens_used, + "review_mode": self.review_mode + }, indent=2) + + def to_markdown(self) -> str: + lines = ["# AI Commit Review Results\n"] + + lines.append("## Summary\n") + lines.append(f"- **Files Reviewed**: {self.summary.files_reviewed}") + lines.append(f"- **Lines Changed**: {self.summary.lines_changed}") + lines.append(f"- **Critical Issues**: {self.summary.critical_count}") + lines.append(f"- **Warnings**: {self.summary.warning_count}") + lines.append(f"- **Info**: {self.summary.info_count}") + lines.append(f"- **Assessment**: {self.summary.overall_assessment}\n") + + if self.issues: + lines.append("## Issues Found\n") + + for severity in [IssueSeverity.CRITICAL, IssueSeverity.WARNING, IssueSeverity.INFO]: + severity_issues = self.get_issues_by_severity(severity) + if severity_issues: + lines.append(f"### {severity.value.upper()} ({len(severity_issues)})\n") + for issue in severity_issues: + lines.append(f"#### {issue.file}:{issue.line}") + lines.append(f"- **Category**: {issue.category.value}") + lines.append(f"- **Message**: {issue.message}") + if issue.suggestion: + lines.append(f"- **Suggestion**: {issue.suggestion}") + lines.append("") + + return "\n".join(lines) + + +class ReviewEngine: + def __init__( + self, + config: Config | None = None, + llm_provider: LLMProvider | None = None + ): + self.config = config or Config() + self.llm_provider = llm_provider or OllamaProvider( + endpoint=self.config.llm.endpoint, + model=self.config.llm.model, + timeout=self.config.llm.timeout + ) + self.repo: GitRepo | None = None + + def set_repo(self, path: Path) -> None: + self.repo = GitRepo(path) + + def _parse_llm_response(self, response_text: str, files: list[FileChange]) -> ReviewResult: + result = ReviewResult() + + try: + json_match = re.search(r'\{[\s\S]*\}', response_text) + if json_match: + json_str = json_match.group() + data = json.loads(json_str) + + issues_data = data.get("issues", []) + for issue_data in issues_data: + try: + issue = Issue.from_dict(issue_data) + result.issues.append(issue) + except Exception: + pass + + summary_data = data.get("summary", {}) + result.summary.critical_count = summary_data.get("critical_count", 0) + result.summary.warning_count = summary_data.get("warning_count", 0) + result.summary.info_count = summary_data.get("info_count", 0) + result.summary.overall_assessment = summary_data.get("overall_assessment", "") + else: + text_issues = self._parse_text_response(response_text, files) + result.issues = text_issues + + except json.JSONDecodeError: + result.issues = self._parse_text_response(response_text, files) + + return result + + def _parse_text_response(self, response_text: str, files: list[FileChange]) -> list[Issue]: # noqa: ARG002 + issues = [] + lines = response_text.split("\n") + + current_file = "" + for line in lines: + file_match = re.match(r'^\*\*(.+?)\*\*:\s*(\d+)', line) + if file_match: + current_file = file_match.group(1) + line_num = int(file_match.group(2)) + + severity = IssueSeverity.WARNING + if "critical" in line.lower(): + severity = IssueSeverity.CRITICAL + elif "security" in line.lower(): + severity = IssueSeverity.CRITICAL + category = IssueCategory.SECURITY + else: + category = IssueCategory.BUG + + message = line + suggestion = None + if "->" in line: + parts = line.split("->") + message = parts[0].strip() + suggestion = "->".join(parts[1:]).strip() + + issues.append(Issue( + file=current_file, + line=line_num, + severity=severity, + category=category, + message=message, + suggestion=suggestion + )) + + return issues + + def _get_strictness_profile(self) -> StrictnessProfile: + return self.config.strictness_profiles.get_profile( + self.config.review.strictness + ) + + def _filter_issues_by_strictness(self, issues: list[Issue]) -> list[Issue]: + profile = self._get_strictness_profile() + + severity_order = { + IssueSeverity.CRITICAL: 0, + IssueSeverity.WARNING: 1, + IssueSeverity.INFO: 2 + } + + min_severity = profile.min_severity.lower() + min_level = 2 + if min_severity == "critical": + min_level = 0 + elif min_severity == "warning": + min_level = 1 + + filtered = [] + for issue in issues: + level = severity_order.get(issue.severity, 2) + if level <= min_level: + if issue.category == IssueCategory.SECURITY and not profile.check_security: + continue + if issue.category == IssueCategory.BUG and not profile.check_bugs: + continue + if issue.category == IssueCategory.STYLE and not profile.check_style: + continue + if issue.category == IssueCategory.PERFORMANCE and not profile.check_performance: + continue + if issue.category == IssueCategory.DOCUMENTATION and not profile.check_documentation: + continue + filtered.append(issue) + + return filtered + + def _aggregate_summary(self, issues: list[Issue], files: list[FileChange]) -> ReviewSummary: + summary = ReviewSummary() + summary.files_reviewed = len(files) + summary.lines_changed = sum( + sum(1 for line in f.diff.split("\n") if line.startswith("+") and not line.startswith("+++")) + for f in files + ) + + for issue in issues: + if issue.severity == IssueSeverity.CRITICAL: + summary.critical_count += 1 + elif issue.severity == IssueSeverity.WARNING: + summary.warning_count += 1 + else: + summary.info_count += 1 + + if issue.category.value not in summary.issues_by_category: + summary.issues_by_category[issue.category.value] = [] + summary.issues_by_category[issue.category.value].append(issue.file) + + if issue.file not in summary.issues_by_file: + summary.issues_by_file[issue.file] = [] + summary.issues_by_file[issue.file].append(issue.line) + + if summary.critical_count > 0: + summary.overall_assessment = "Critical issues found. Review recommended before committing." + elif summary.warning_count > 0: + summary.overall_assessment = "Warnings found. Consider addressing before committing." + elif summary.info_count > 0: + summary.overall_assessment = "Minor issues found. Ready for commit with optional fixes." + else: + summary.overall_assessment = "No issues found. Code is ready for commit." + + return summary + + def review_staged_changes( + self, + files: list[FileChange] | None = None, + strictness: str | None = None, + language: str | None = None + ) -> ReviewResult: + if files is None: + if self.repo is None: + self.repo = GitRepo(Path.cwd()) + files = self.repo.get_all_staged_changes() + + if not files: + return ReviewResult(error="No staged changes found") + + result = ReviewResult() + result.review_mode = strictness or self.config.review.strictness + + if strictness is None: + strictness = self.config.review.strictness + + all_issues = [] + + for file_change in files: + if not file_change.diff.strip(): + continue + + file_language = language + if not file_language and self.repo is not None: + file_language = self.repo.get_file_language(file_change.filename) + + prompt = ReviewPromptTemplates.get_prompt( + diff=file_change.diff, + strictness=strictness, + language=file_language or "" + ) + + try: + if self.llm_provider.is_available(): + response = self.llm_provider.generate( + prompt, + max_tokens=self.config.llm.max_tokens, + temperature=self.config.llm.temperature + ) + result.model_used = response.model + result.tokens_used += response.tokens_used + + file_result = self._parse_llm_response(response.text, [file_change]) + all_issues.extend(file_result.issues) + else: + result.error = "LLM provider is not available" + return result + except Exception as e: + result.error = f"Review failed: {e!s}" + return result + + filtered_issues = self._filter_issues_by_strictness(all_issues) + max_issues = self.config.review.max_issues_per_file + limited_issues = filtered_issues[:max_issues * len(files)] + result.issues = limited_issues + result.summary = self._aggregate_summary(limited_issues, files) + + return result + + def review_commit( + self, + sha: str, + strictness: str | None = None + ) -> ReviewResult: + if self.repo is None: + self.repo = GitRepo(Path.cwd()) + + commit_info = self.repo.get_commit_info(sha) + if commit_info is None: + return ReviewResult(error=f"Commit {sha} not found") + + result = ReviewResult() + result.review_mode = strictness or self.config.review.strictness + + if strictness is None: + strictness = self.config.review.strictness + + all_issues = [] + + for file_change in commit_info.changes: + if not file_change.diff.strip(): + continue + + prompt = ReviewPromptTemplates.get_commit_review_prompt( + diff=file_change.diff, + commit_message=commit_info.message, + strictness=strictness + ) + + try: + if self.llm_provider.is_available(): + response = self.llm_provider.generate( + prompt, + max_tokens=self.config.llm.max_tokens, + temperature=self.config.llm.temperature + ) + result.model_used = response.model + result.tokens_used += response.tokens_used + + file_result = self._parse_llm_response(response.text, [file_change]) + all_issues.extend(file_result.issues) + else: + result.error = "LLM provider is not available" + return result + except Exception as e: + result.error = f"Review failed: {e!s}" + return result + + filtered_issues = self._filter_issues_by_strictness(all_issues) + result.issues = filtered_issues + result.summary = self._aggregate_summary(filtered_issues, commit_info.changes) + + return result