Initial upload: Local AI Commit Reviewer CLI with CI/CD workflow
This commit is contained in:
421
src/core/review_engine.py
Normal file
421
src/core/review_engine.py
Normal file
@@ -0,0 +1,421 @@
|
||||
import json
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
from ..config import Config, StrictnessProfile
|
||||
from ..git import FileChange, GitRepo
|
||||
from ..llm import LLMProvider, OllamaProvider
|
||||
from ..llm.templates import ReviewPromptTemplates
|
||||
|
||||
|
||||
class IssueSeverity(str, Enum):
|
||||
CRITICAL = "critical"
|
||||
WARNING = "warning"
|
||||
INFO = "info"
|
||||
|
||||
|
||||
class IssueCategory(str, Enum):
|
||||
BUG = "bug"
|
||||
SECURITY = "security"
|
||||
STYLE = "style"
|
||||
PERFORMANCE = "performance"
|
||||
DOCUMENTATION = "documentation"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Issue:
|
||||
file: str
|
||||
line: int
|
||||
severity: IssueSeverity
|
||||
category: IssueCategory
|
||||
message: str
|
||||
suggestion: str | None = None
|
||||
raw_line: str | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"file": self.file,
|
||||
"line": self.line,
|
||||
"severity": self.severity.value,
|
||||
"category": self.category.value,
|
||||
"message": self.message,
|
||||
"suggestion": self.suggestion,
|
||||
"raw_line": self.raw_line
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "Issue":
|
||||
return cls(
|
||||
file=data["file"],
|
||||
line=data["line"],
|
||||
severity=IssueSeverity(data["severity"]),
|
||||
category=IssueCategory(data["category"]),
|
||||
message=data["message"],
|
||||
suggestion=data.get("suggestion"),
|
||||
raw_line=data.get("raw_line")
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReviewSummary:
|
||||
critical_count: int = 0
|
||||
warning_count: int = 0
|
||||
info_count: int = 0
|
||||
files_reviewed: int = 0
|
||||
lines_changed: int = 0
|
||||
overall_assessment: str = ""
|
||||
issues_by_category: dict = field(default_factory=dict)
|
||||
issues_by_file: dict = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"critical_count": self.critical_count,
|
||||
"warning_count": self.warning_count,
|
||||
"info_count": self.info_count,
|
||||
"files_reviewed": self.files_reviewed,
|
||||
"lines_changed": self.lines_changed,
|
||||
"overall_assessment": self.overall_assessment,
|
||||
"issues_by_category": self.issues_by_category,
|
||||
"issues_by_file": self.issues_by_file
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReviewResult:
|
||||
issues: list[Issue] = field(default_factory=list)
|
||||
summary: ReviewSummary = field(default_factory=ReviewSummary)
|
||||
model_used: str = ""
|
||||
tokens_used: int = 0
|
||||
review_mode: str = ""
|
||||
error: str | None = None
|
||||
|
||||
def has_critical_issues(self) -> bool:
|
||||
return any(issue.severity == IssueSeverity.CRITICAL for issue in self.issues)
|
||||
|
||||
def has_issues(self) -> bool:
|
||||
return len(self.issues) > 0
|
||||
|
||||
def get_issues_by_severity(self, severity: IssueSeverity) -> list[Issue]:
|
||||
return [issue for issue in self.issues if issue.severity == severity]
|
||||
|
||||
def get_issues_by_file(self, filename: str) -> list[Issue]:
|
||||
return [issue for issue in self.issues if issue.file == filename]
|
||||
|
||||
def get_issues_by_category(self, category: IssueCategory) -> list[Issue]:
|
||||
return [issue for issue in self.issues if issue.category == category]
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps({
|
||||
"issues": [issue.to_dict() for issue in self.issues],
|
||||
"summary": self.summary.to_dict(),
|
||||
"model_used": self.model_used,
|
||||
"tokens_used": self.tokens_used,
|
||||
"review_mode": self.review_mode
|
||||
}, indent=2)
|
||||
|
||||
def to_markdown(self) -> str:
|
||||
lines = ["# AI Commit Review Results\n"]
|
||||
|
||||
lines.append("## Summary\n")
|
||||
lines.append(f"- **Files Reviewed**: {self.summary.files_reviewed}")
|
||||
lines.append(f"- **Lines Changed**: {self.summary.lines_changed}")
|
||||
lines.append(f"- **Critical Issues**: {self.summary.critical_count}")
|
||||
lines.append(f"- **Warnings**: {self.summary.warning_count}")
|
||||
lines.append(f"- **Info**: {self.summary.info_count}")
|
||||
lines.append(f"- **Assessment**: {self.summary.overall_assessment}\n")
|
||||
|
||||
if self.issues:
|
||||
lines.append("## Issues Found\n")
|
||||
|
||||
for severity in [IssueSeverity.CRITICAL, IssueSeverity.WARNING, IssueSeverity.INFO]:
|
||||
severity_issues = self.get_issues_by_severity(severity)
|
||||
if severity_issues:
|
||||
lines.append(f"### {severity.value.upper()} ({len(severity_issues)})\n")
|
||||
for issue in severity_issues:
|
||||
lines.append(f"#### {issue.file}:{issue.line}")
|
||||
lines.append(f"- **Category**: {issue.category.value}")
|
||||
lines.append(f"- **Message**: {issue.message}")
|
||||
if issue.suggestion:
|
||||
lines.append(f"- **Suggestion**: {issue.suggestion}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class ReviewEngine:
|
||||
def __init__(
|
||||
self,
|
||||
config: Config | None = None,
|
||||
llm_provider: LLMProvider | None = None
|
||||
):
|
||||
self.config = config or Config()
|
||||
self.llm_provider = llm_provider or OllamaProvider(
|
||||
endpoint=self.config.llm.endpoint,
|
||||
model=self.config.llm.model,
|
||||
timeout=self.config.llm.timeout
|
||||
)
|
||||
self.repo: GitRepo | None = None
|
||||
|
||||
def set_repo(self, path: Path) -> None:
|
||||
self.repo = GitRepo(path)
|
||||
|
||||
def _parse_llm_response(self, response_text: str, files: list[FileChange]) -> ReviewResult:
|
||||
result = ReviewResult()
|
||||
|
||||
try:
|
||||
json_match = re.search(r'\{[\s\S]*\}', response_text)
|
||||
if json_match:
|
||||
json_str = json_match.group()
|
||||
data = json.loads(json_str)
|
||||
|
||||
issues_data = data.get("issues", [])
|
||||
for issue_data in issues_data:
|
||||
try:
|
||||
issue = Issue.from_dict(issue_data)
|
||||
result.issues.append(issue)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
summary_data = data.get("summary", {})
|
||||
result.summary.critical_count = summary_data.get("critical_count", 0)
|
||||
result.summary.warning_count = summary_data.get("warning_count", 0)
|
||||
result.summary.info_count = summary_data.get("info_count", 0)
|
||||
result.summary.overall_assessment = summary_data.get("overall_assessment", "")
|
||||
else:
|
||||
text_issues = self._parse_text_response(response_text, files)
|
||||
result.issues = text_issues
|
||||
|
||||
except json.JSONDecodeError:
|
||||
result.issues = self._parse_text_response(response_text, files)
|
||||
|
||||
return result
|
||||
|
||||
def _parse_text_response(self, response_text: str, files: list[FileChange]) -> list[Issue]:
|
||||
issues = []
|
||||
lines = response_text.split("\n")
|
||||
|
||||
current_file = ""
|
||||
for line in lines:
|
||||
file_match = re.match(r'^\*\*(.+?)\*\*:\s*(\d+)', line)
|
||||
if file_match:
|
||||
current_file = file_match.group(1)
|
||||
line_num = int(file_match.group(2))
|
||||
|
||||
severity = IssueSeverity.WARNING
|
||||
if "critical" in line.lower():
|
||||
severity = IssueSeverity.CRITICAL
|
||||
elif "security" in line.lower():
|
||||
severity = IssueSeverity.CRITICAL
|
||||
category = IssueCategory.SECURITY
|
||||
else:
|
||||
category = IssueCategory.BUG
|
||||
|
||||
message = line
|
||||
suggestion = None
|
||||
if "->" in line:
|
||||
parts = line.split("->")
|
||||
message = parts[0].strip()
|
||||
suggestion = "->".join(parts[1:]).strip()
|
||||
|
||||
issues.append(Issue(
|
||||
file=current_file,
|
||||
line=line_num,
|
||||
severity=severity,
|
||||
category=category,
|
||||
message=message,
|
||||
suggestion=suggestion
|
||||
))
|
||||
|
||||
return issues
|
||||
|
||||
def _get_strictness_profile(self) -> StrictnessProfile:
|
||||
return self.config.strictness_profiles.get_profile(
|
||||
self.config.review.strictness
|
||||
)
|
||||
|
||||
def _filter_issues_by_strictness(self, issues: list[Issue]) -> list[Issue]:
|
||||
profile = self._get_strictness_profile()
|
||||
|
||||
severity_order = {
|
||||
IssueSeverity.CRITICAL: 0,
|
||||
IssueSeverity.WARNING: 1,
|
||||
IssueSeverity.INFO: 2
|
||||
}
|
||||
|
||||
min_severity = profile.min_severity.lower()
|
||||
min_level = 2
|
||||
if min_severity == "critical":
|
||||
min_level = 0
|
||||
elif min_severity == "warning":
|
||||
min_level = 1
|
||||
|
||||
filtered = []
|
||||
for issue in issues:
|
||||
level = severity_order.get(issue.severity, 2)
|
||||
if level <= min_level:
|
||||
if issue.category == IssueCategory.SECURITY and not profile.check_security:
|
||||
continue
|
||||
if issue.category == IssueCategory.BUG and not profile.check_bugs:
|
||||
continue
|
||||
if issue.category == IssueCategory.STYLE and not profile.check_style:
|
||||
continue
|
||||
if issue.category == IssueCategory.PERFORMANCE and not profile.check_performance:
|
||||
continue
|
||||
if issue.category == IssueCategory.DOCUMENTATION and not profile.check_documentation:
|
||||
continue
|
||||
filtered.append(issue)
|
||||
|
||||
return filtered
|
||||
|
||||
def _aggregate_summary(self, issues: list[Issue], files: list[FileChange]) -> ReviewSummary:
|
||||
summary = ReviewSummary()
|
||||
summary.files_reviewed = len(files)
|
||||
summary.lines_changed = sum(
|
||||
sum(1 for line in f.diff.split("\n") if line.startswith("+") and not line.startswith("+++"))
|
||||
for f in files
|
||||
)
|
||||
|
||||
for issue in issues:
|
||||
if issue.severity == IssueSeverity.CRITICAL:
|
||||
summary.critical_count += 1
|
||||
elif issue.severity == IssueSeverity.WARNING:
|
||||
summary.warning_count += 1
|
||||
else:
|
||||
summary.info_count += 1
|
||||
|
||||
if issue.category.value not in summary.issues_by_category:
|
||||
summary.issues_by_category[issue.category.value] = []
|
||||
summary.issues_by_category[issue.category.value].append(issue.file)
|
||||
|
||||
if issue.file not in summary.issues_by_file:
|
||||
summary.issues_by_file[issue.file] = []
|
||||
summary.issues_by_file[issue.file].append(issue.line)
|
||||
|
||||
if summary.critical_count > 0:
|
||||
summary.overall_assessment = "Critical issues found. Review recommended before committing."
|
||||
elif summary.warning_count > 0:
|
||||
summary.overall_assessment = "Warnings found. Consider addressing before committing."
|
||||
elif summary.info_count > 0:
|
||||
summary.overall_assessment = "Minor issues found. Ready for commit with optional fixes."
|
||||
else:
|
||||
summary.overall_assessment = "No issues found. Code is ready for commit."
|
||||
|
||||
return summary
|
||||
|
||||
def review_staged_changes(
|
||||
self,
|
||||
files: list[FileChange] | None = None,
|
||||
strictness: str | None = None,
|
||||
language: str | None = None
|
||||
) -> ReviewResult:
|
||||
if files is None:
|
||||
if self.repo is None:
|
||||
self.repo = GitRepo(Path.cwd())
|
||||
files = self.repo.get_all_staged_changes()
|
||||
|
||||
if not files:
|
||||
return ReviewResult(error="No staged changes found")
|
||||
|
||||
result = ReviewResult()
|
||||
result.review_mode = strictness or self.config.review.strictness
|
||||
|
||||
if strictness is None:
|
||||
strictness = self.config.review.strictness
|
||||
|
||||
all_issues = []
|
||||
|
||||
for file_change in files:
|
||||
if not file_change.diff.strip():
|
||||
continue
|
||||
|
||||
file_language = language or self.repo.get_file_language(file_change.filename)
|
||||
|
||||
prompt = ReviewPromptTemplates.get_prompt(
|
||||
diff=file_change.diff,
|
||||
strictness=strictness,
|
||||
language=file_language
|
||||
)
|
||||
|
||||
try:
|
||||
if self.llm_provider.is_available():
|
||||
response = self.llm_provider.generate(
|
||||
prompt,
|
||||
max_tokens=self.config.llm.max_tokens,
|
||||
temperature=self.config.llm.temperature
|
||||
)
|
||||
result.model_used = response.model
|
||||
result.tokens_used += response.tokens_used
|
||||
|
||||
file_result = self._parse_llm_response(response.text, [file_change])
|
||||
all_issues.extend(file_result.issues)
|
||||
else:
|
||||
result.error = "LLM provider is not available"
|
||||
return result
|
||||
except Exception as e:
|
||||
result.error = f"Review failed: {e!s}"
|
||||
return result
|
||||
|
||||
filtered_issues = self._filter_issues_by_strictness(all_issues)
|
||||
max_issues = self.config.review.max_issues_per_file
|
||||
limited_issues = filtered_issues[:max_issues * len(files)]
|
||||
result.issues = limited_issues
|
||||
result.summary = self._aggregate_summary(limited_issues, files)
|
||||
|
||||
return result
|
||||
|
||||
def review_commit(
|
||||
self,
|
||||
sha: str,
|
||||
strictness: str | None = None
|
||||
) -> ReviewResult:
|
||||
if self.repo is None:
|
||||
self.repo = GitRepo(Path.cwd())
|
||||
|
||||
commit_info = self.repo.get_commit_info(sha)
|
||||
if commit_info is None:
|
||||
return ReviewResult(error=f"Commit {sha} not found")
|
||||
|
||||
result = ReviewResult()
|
||||
result.review_mode = strictness or self.config.review.strictness
|
||||
|
||||
if strictness is None:
|
||||
strictness = self.config.review.strictness
|
||||
|
||||
all_issues = []
|
||||
|
||||
for file_change in commit_info.changes:
|
||||
if not file_change.diff.strip():
|
||||
continue
|
||||
|
||||
prompt = ReviewPromptTemplates.get_commit_review_prompt(
|
||||
diff=file_change.diff,
|
||||
commit_message=commit_info.message,
|
||||
strictness=strictness
|
||||
)
|
||||
|
||||
try:
|
||||
if self.llm_provider.is_available():
|
||||
response = self.llm_provider.generate(
|
||||
prompt,
|
||||
max_tokens=self.config.llm.max_tokens,
|
||||
temperature=self.config.llm.temperature
|
||||
)
|
||||
result.model_used = response.model
|
||||
result.tokens_used += response.tokens_used
|
||||
|
||||
file_result = self._parse_llm_response(response.text, [file_change])
|
||||
all_issues.extend(file_result.issues)
|
||||
else:
|
||||
result.error = "LLM provider is not available"
|
||||
return result
|
||||
except Exception as e:
|
||||
result.error = f"Review failed: {e!s}"
|
||||
return result
|
||||
|
||||
filtered_issues = self._filter_issues_by_strictness(all_issues)
|
||||
result.issues = filtered_issues
|
||||
result.summary = self._aggregate_summary(filtered_issues, commit_info.changes)
|
||||
|
||||
return result
|
||||
Reference in New Issue
Block a user