From 909619cfe0f59a0fafdd9ba5367f956923d63b13 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Thu, 5 Feb 2026 06:34:37 +0000 Subject: [PATCH] Initial upload: Local AI Commit Reviewer CLI with CI/CD workflow --- src/git/git.py | 278 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 278 insertions(+) create mode 100644 src/git/git.py diff --git a/src/git/git.py b/src/git/git.py new file mode 100644 index 0000000..b522580 --- /dev/null +++ b/src/git/git.py @@ -0,0 +1,278 @@ +import subprocess +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class FileChange: + filename: str + status: str + diff: str + old_content: str | None = None + new_content: str | None = None + + +@dataclass +class CommitInfo: + sha: str + message: str + author: str + date: str + changes: list[FileChange] + + +class GitRepo: + def __init__(self, path: Path | None = None): + self.path = path or Path.cwd() + self.repo = self._get_repo() + + def _get_repo(self) -> Path | None: + try: + result = subprocess.run( + ["git", "rev-parse", "--show-toplevel"], + cwd=self.path, + capture_output=True, + text=True, + timeout=5, + check=False + ) + if result.returncode == 0: + return Path(result.stdout.strip()) + except subprocess.TimeoutExpired: + pass + return None + + def is_valid(self) -> bool: + return self.repo is not None and (self.repo / ".git").exists() + + def get_staged_files(self) -> list[str]: + try: + result = subprocess.run( + ["git", "diff", "--cached", "--name-only"], + cwd=self.repo, + capture_output=True, + text=True, + timeout=10, + check=False + ) + if result.returncode == 0: + return result.stdout.strip().split("\n") if result.stdout.strip() else [] + except subprocess.TimeoutExpired: + pass + return [] + + def get_staged_diff(self, filename: str) -> str: + try: + result = subprocess.run( + ["git", "diff", "--cached", "--", filename], + cwd=self.repo, + capture_output=True, + text=True, + timeout=10, + check=False + ) + return result.stdout if result.returncode == 0 else "" + except subprocess.TimeoutExpired: + return "" + + def get_all_staged_changes(self) -> list[FileChange]: + files = self.get_staged_files() + changes = [] + for filename in files: + diff = self.get_staged_diff(filename) + status = self._get_file_status(filename) + changes.append(FileChange( + filename=filename, + status=status, + diff=diff + )) + return changes + + def _get_file_status(self, filename: str) -> str: + try: + result = subprocess.run( + ["git", "diff", "--cached", "--name-status", "--", filename], + cwd=self.repo, + capture_output=True, + text=True, + timeout=10, + check=False + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip().split()[0] + except subprocess.TimeoutExpired: + pass + return "M" + + def get_commit_info(self, sha: str) -> CommitInfo | None: + try: + message_result = subprocess.run( + ["git", "log", "-1", "--format=%B", sha], + cwd=self.repo, + capture_output=True, + text=True, + timeout=10, + check=False + ) + + author_result = subprocess.run( + ["git", "log", "-1", "--format=%an|%ae|%ad", "--date=iso", sha], + cwd=self.repo, + capture_output=True, + text=True, + timeout=10, + check=False + ) + + if message_result.returncode == 0 and author_result.returncode == 0: + message = message_result.stdout.strip() + author_parts = author_result.stdout.strip().split("|") + author = author_parts[0] if author_parts else "Unknown" + date = author_parts[2] if len(author_parts) > 2 else "" + + changes = self._get_commit_changes(sha) + + return CommitInfo( + sha=sha, + message=message, + author=author, + date=date, + changes=changes + ) + except subprocess.TimeoutExpired: + pass + return None + + def _get_commit_changes(self, sha: str) -> list[FileChange]: + try: + result = subprocess.run( + ["git", "show", "--stat", sha], + cwd=self.repo, + capture_output=True, + text=True, + timeout=10, + check=False + ) + files = [] + if result.returncode == 0: + for line in result.stdout.split("\n"): + if line.startswith(" ") and "|" in line: + filename = line.split("|")[0].strip() + diff = self._get_commit_file_diff(sha, filename) + files.append(FileChange( + filename=filename, + status="M", + diff=diff + )) + return files + except subprocess.TimeoutExpired: + return [] + return [] + + def _get_commit_file_diff(self, sha: str, filename: str) -> str: + try: + result = subprocess.run( + ["git", "show", f"{sha} -- {filename}"], + cwd=self.repo, + capture_output=True, + text=True, + timeout=10, + check=False + ) + return result.stdout if result.returncode == 0 else "" + except subprocess.TimeoutExpired: + return "" + + def get_current_branch(self) -> str: + try: + result = subprocess.run( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + cwd=self.repo, + capture_output=True, + text=True, + timeout=5, + check=False + ) + return result.stdout.strip() if result.returncode == 0 else "unknown" + except subprocess.TimeoutExpired: + return "unknown" + + def get_file_language(self, filename: str) -> str: + ext_map = { + ".py": "python", + ".js": "javascript", + ".ts": "typescript", + ".go": "go", + ".rs": "rust", + ".java": "java", + ".c": "c", + ".cpp": "cpp", + ".h": "c", + ".hpp": "cpp", + ".jsx": "javascript", + ".tsx": "typescript", + } + ext = Path(filename).suffix.lower() + return ext_map.get(ext, "unknown") + + def get_diff_stats(self, diff: str) -> tuple[int, int]: + additions = 0 + deletions = 0 + for line in diff.split("\n"): + if line.startswith("+") and not line.startswith("+++"): + additions += 1 + elif line.startswith("-") and not line.startswith("---"): + deletions += 1 + return additions, deletions + + +def get_staged_changes(path: Path | None = None) -> list[FileChange]: + repo = GitRepo(path) + return repo.get_all_staged_changes() + + +def get_commit_context(sha: str, path: Path | None = None) -> CommitInfo | None: + repo = GitRepo(path) + return repo.get_commit_info(sha) + + +def install_hook(repo_path: Path, hook_type: str = "pre-commit", content: str | None = None) -> bool: + hooks_dir = repo_path / ".git" / "hooks" + hooks_dir.mkdir(parents=True, exist_ok=True) + + hook_path = hooks_dir / hook_type + + if hook_path.exists(): + backup_path = hooks_dir / f"{hook_type}.backup" + hook_path.rename(backup_path) + + if content is None: + content = _get_default_hook_script() + + try: + hook_path.write_text(content) + hook_path.chmod(0o755) + return True + except OSError: + return False + + +def _get_default_hook_script() -> str: + return """#!/bin/bash +# Local AI Commit Reviewer - Pre-commit Hook +# This hook runs code review on staged changes before committing + +set -e + +# Check if running with --no-verify +if [ "$1" = "--no-verify" ]; then + exit 0 +fi + +# Get the directory where this script is located +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# Run the AI commit reviewer +cd "$SCRIPT_DIR/../.." +python -m aicr review --hook || exit 1 +"""