From be4dd0f0a77a1f70b35d7cf1cdb29841ef5724ba Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Fri, 30 Jan 2026 20:35:20 +0000 Subject: [PATCH] Initial upload: git-insights-cli with CI/CD workflow --- src/analyzers/git_repository.py | 138 ++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 src/analyzers/git_repository.py diff --git a/src/analyzers/git_repository.py b/src/analyzers/git_repository.py new file mode 100644 index 0000000..ba5cc5c --- /dev/null +++ b/src/analyzers/git_repository.py @@ -0,0 +1,138 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional + +from git import Repo, Commit as GitCommit +from git.exc import GitCommandError + +from src.models.data_structures import Author, Commit, FileChange + + +class GitRepository: + """Wrapper for git repository operations.""" + + def __init__(self, repo_path: str) -> None: + """Initialize git repository wrapper.""" + self.repo_path = repo_path + self.repo: Optional[Repo] = None + self._load_repo() + + def _load_repo(self) -> None: + """Load the git repository.""" + try: + self.repo = Repo(self.repo_path) + except GitCommandError as e: + raise ValueError(f"Not a valid git repository: {self.repo_path}") from e + + def get_commits( + self, + since: Optional[datetime] = None, + until: Optional[datetime] = None, + ) -> List[Commit]: + """Get list of commits in the repository.""" + if not self.repo: + return [] + + commits = [] + try: + git_commits = list(self.repo.iter_commits("HEAD")) + + for gc in git_commits: + if since and gc.committed_datetime < since: + continue + if until and gc.committed_datetime > until: + continue + + commit = self._convert_git_commit(gc) + commits.append(commit) + + except GitCommandError: + pass + + return commits + + def _convert_git_commit(self, gc: GitCommit) -> Commit: + """Convert GitPython commit to our Commit model.""" + message = gc.message.strip() if gc.message else "" + is_merge = "Merge" in message + is_revert = message.lower().startswith("revert") + + try: + lines_added = 0 + lines_deleted = 0 + files_changed = [] + + if gc.parents: + diff = gc.parents[0].diff(gc, create_patch=True) + for d in diff: + lines_added += d.change_type == "A" and 1 or 0 + lines_deleted += d.change_type == "D" and 1 or 0 + files_changed.append(d.b_path) + except Exception: + lines_added = 0 + lines_deleted = 0 + files_changed = [] + + return Commit( + sha=gc.hexsha, + message=message, + author=gc.author.name or "Unknown", + author_email=gc.author.email or "unknown@example.com", + timestamp=gc.committed_datetime, + lines_added=lines_added, + lines_deleted=lines_deleted, + files_changed=files_changed, + is_merge=is_merge, + is_revert=is_revert, + ) + + def get_authors(self) -> List[Author]: + """Get list of authors in the repository.""" + if not self.repo: + return [] + + authors = {} + for commit in self.get_commits(): + key = commit.author_email + if key not in authors: + authors[key] = Author( + name=commit.author, + email=commit.author_email, + ) + authors[key].commit_count += 1 + authors[key].lines_added += commit.lines_added + authors[key].lines_deleted += commit.lines_deleted + + return list(authors.values()) + + def get_file_changes(self, commit: Commit) -> List[FileChange]: + """Get file changes for a commit.""" + changes = [] + try: + gc = self.repo.commit(commit.sha) + if gc.parents: + diff = gc.parents[0].diff(gc) + for d in diff: + change = FileChange( + filepath=d.b_path, + lines_added=0, + lines_deleted=0, + change_type=d.change_type, + ) + changes.append(change) + except Exception: + pass + + return changes + + def get_commit_count(self) -> int: + """Get total commit count.""" + if not self.repo: + return 0 + try: + return sum(1 for _ in self.repo.iter_commits("HEAD")) + except GitCommandError: + return 0 + + def is_valid(self) -> bool: + """Check if the path is a valid git repository.""" + return self.repo is not None and not self.repo.bare