Initial upload: Local AI Commit Reviewer CLI with CI/CD workflow
This commit is contained in:
278
src/git/git.py
Normal file
278
src/git/git.py
Normal file
@@ -0,0 +1,278 @@
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileChange:
|
||||
filename: str
|
||||
status: str
|
||||
diff: str
|
||||
old_content: str | None = None
|
||||
new_content: str | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommitInfo:
|
||||
sha: str
|
||||
message: str
|
||||
author: str
|
||||
date: str
|
||||
changes: list[FileChange]
|
||||
|
||||
|
||||
class GitRepo:
|
||||
def __init__(self, path: Path | None = None):
|
||||
self.path = path or Path.cwd()
|
||||
self.repo = self._get_repo()
|
||||
|
||||
def _get_repo(self) -> Path | None:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "rev-parse", "--show-toplevel"],
|
||||
cwd=self.path,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
check=False
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return Path(result.stdout.strip())
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
return None
|
||||
|
||||
def is_valid(self) -> bool:
|
||||
return self.repo is not None and (self.repo / ".git").exists()
|
||||
|
||||
def get_staged_files(self) -> list[str]:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--cached", "--name-only"],
|
||||
cwd=self.repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip().split("\n") if result.stdout.strip() else []
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
return []
|
||||
|
||||
def get_staged_diff(self, filename: str) -> str:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--cached", "--", filename],
|
||||
cwd=self.repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False
|
||||
)
|
||||
return result.stdout if result.returncode == 0 else ""
|
||||
except subprocess.TimeoutExpired:
|
||||
return ""
|
||||
|
||||
def get_all_staged_changes(self) -> list[FileChange]:
|
||||
files = self.get_staged_files()
|
||||
changes = []
|
||||
for filename in files:
|
||||
diff = self.get_staged_diff(filename)
|
||||
status = self._get_file_status(filename)
|
||||
changes.append(FileChange(
|
||||
filename=filename,
|
||||
status=status,
|
||||
diff=diff
|
||||
))
|
||||
return changes
|
||||
|
||||
def _get_file_status(self, filename: str) -> str:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--cached", "--name-status", "--", filename],
|
||||
cwd=self.repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return result.stdout.strip().split()[0]
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
return "M"
|
||||
|
||||
def get_commit_info(self, sha: str) -> CommitInfo | None:
|
||||
try:
|
||||
message_result = subprocess.run(
|
||||
["git", "log", "-1", "--format=%B", sha],
|
||||
cwd=self.repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False
|
||||
)
|
||||
|
||||
author_result = subprocess.run(
|
||||
["git", "log", "-1", "--format=%an|%ae|%ad", "--date=iso", sha],
|
||||
cwd=self.repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False
|
||||
)
|
||||
|
||||
if message_result.returncode == 0 and author_result.returncode == 0:
|
||||
message = message_result.stdout.strip()
|
||||
author_parts = author_result.stdout.strip().split("|")
|
||||
author = author_parts[0] if author_parts else "Unknown"
|
||||
date = author_parts[2] if len(author_parts) > 2 else ""
|
||||
|
||||
changes = self._get_commit_changes(sha)
|
||||
|
||||
return CommitInfo(
|
||||
sha=sha,
|
||||
message=message,
|
||||
author=author,
|
||||
date=date,
|
||||
changes=changes
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _get_commit_changes(self, sha: str) -> list[FileChange]:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "show", "--stat", sha],
|
||||
cwd=self.repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False
|
||||
)
|
||||
files = []
|
||||
if result.returncode == 0:
|
||||
for line in result.stdout.split("\n"):
|
||||
if line.startswith(" ") and "|" in line:
|
||||
filename = line.split("|")[0].strip()
|
||||
diff = self._get_commit_file_diff(sha, filename)
|
||||
files.append(FileChange(
|
||||
filename=filename,
|
||||
status="M",
|
||||
diff=diff
|
||||
))
|
||||
return files
|
||||
except subprocess.TimeoutExpired:
|
||||
return []
|
||||
return []
|
||||
|
||||
def _get_commit_file_diff(self, sha: str, filename: str) -> str:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "show", f"{sha} -- {filename}"],
|
||||
cwd=self.repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
check=False
|
||||
)
|
||||
return result.stdout if result.returncode == 0 else ""
|
||||
except subprocess.TimeoutExpired:
|
||||
return ""
|
||||
|
||||
def get_current_branch(self) -> str:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "rev-parse", "--abbrev-ref", "HEAD"],
|
||||
cwd=self.repo,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
check=False
|
||||
)
|
||||
return result.stdout.strip() if result.returncode == 0 else "unknown"
|
||||
except subprocess.TimeoutExpired:
|
||||
return "unknown"
|
||||
|
||||
def get_file_language(self, filename: str) -> str:
|
||||
ext_map = {
|
||||
".py": "python",
|
||||
".js": "javascript",
|
||||
".ts": "typescript",
|
||||
".go": "go",
|
||||
".rs": "rust",
|
||||
".java": "java",
|
||||
".c": "c",
|
||||
".cpp": "cpp",
|
||||
".h": "c",
|
||||
".hpp": "cpp",
|
||||
".jsx": "javascript",
|
||||
".tsx": "typescript",
|
||||
}
|
||||
ext = Path(filename).suffix.lower()
|
||||
return ext_map.get(ext, "unknown")
|
||||
|
||||
def get_diff_stats(self, diff: str) -> tuple[int, int]:
|
||||
additions = 0
|
||||
deletions = 0
|
||||
for line in diff.split("\n"):
|
||||
if line.startswith("+") and not line.startswith("+++"):
|
||||
additions += 1
|
||||
elif line.startswith("-") and not line.startswith("---"):
|
||||
deletions += 1
|
||||
return additions, deletions
|
||||
|
||||
|
||||
def get_staged_changes(path: Path | None = None) -> list[FileChange]:
|
||||
repo = GitRepo(path)
|
||||
return repo.get_all_staged_changes()
|
||||
|
||||
|
||||
def get_commit_context(sha: str, path: Path | None = None) -> CommitInfo | None:
|
||||
repo = GitRepo(path)
|
||||
return repo.get_commit_info(sha)
|
||||
|
||||
|
||||
def install_hook(repo_path: Path, hook_type: str = "pre-commit", content: str | None = None) -> bool:
|
||||
hooks_dir = repo_path / ".git" / "hooks"
|
||||
hooks_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
hook_path = hooks_dir / hook_type
|
||||
|
||||
if hook_path.exists():
|
||||
backup_path = hooks_dir / f"{hook_type}.backup"
|
||||
hook_path.rename(backup_path)
|
||||
|
||||
if content is None:
|
||||
content = _get_default_hook_script()
|
||||
|
||||
try:
|
||||
hook_path.write_text(content)
|
||||
hook_path.chmod(0o755)
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def _get_default_hook_script() -> str:
|
||||
return """#!/bin/bash
|
||||
# Local AI Commit Reviewer - Pre-commit Hook
|
||||
# This hook runs code review on staged changes before committing
|
||||
|
||||
set -e
|
||||
|
||||
# Check if running with --no-verify
|
||||
if [ "$1" = "--no-verify" ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# Get the directory where this script is located
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
|
||||
# Run the AI commit reviewer
|
||||
cd "$SCRIPT_DIR/../.."
|
||||
python -m aicr review --hook || exit 1
|
||||
"""
|
||||
Reference in New Issue
Block a user