from datetime import datetime from typing import List, Optional from git import Repo, GitCommandError from src.models import Commit class GitRepository: def __init__(self, path: str): self.path = path self._repo: Optional[Repo] = None def get_repo(self) -> Repo: if self._repo is None: self._repo = Repo(self.path) return self._repo def get_commits(self, since: Optional[datetime] = None, until: Optional[datetime] = None) -> List[Commit]: repo = self.get_repo() commits = [] try: commit_iter = repo.iter_commits( rev=None, since=since.isoformat() if since else None, until=until.isoformat() if until else None, max_count=None, ) for git_commit in commit_iter: commit = self._convert_git_commit(git_commit) if commit: commits.append(commit) except GitCommandError as e: raise ValueError(f"Error reading git repository: {e}") return commits def _convert_git_commit(self, git_commit) -> Optional[Commit]: try: parents = [p.hexsha for p in git_commit.parents] is_merge = len(parents) > 1 is_revert = False if git_commit.message.lower().startswith(("revert", "reverted")): is_revert = True file_changes = [] additions = 0 deletions = 0 try: if git_commit.stats and git_commit.stats.files: for filepath, stats in git_commit.stats.files.items(): file_change = self._create_file_change(filepath, stats) file_changes.append(file_change) additions += stats.get('insertions', 0) deletions += stats.get('deletions', 0) except Exception: pass return Commit( sha=git_commit.hexsha, message=git_commit.message.strip(), author_name=git_commit.author.name or "Unknown", author_email=git_commit.author.email or "", committed_datetime=datetime.fromtimestamp(git_commit.committed_date), author_datetime=datetime.fromtimestamp(git_commit.authored_date), parents=parents, additions=additions, deletions=deletions, files_changed=len(file_changes), file_changes=file_changes, is_merge=is_merge, is_revert=is_revert, ) except Exception: return None def _create_file_change(self, filepath: str, stats: dict) -> "FileChange": from src.models import FileChange return FileChange( filepath=filepath, additions=stats.get('insertions', 0), deletions=stats.get('deletions', 0), changes=stats.get('insertions', 0) + stats.get('deletions', 0), ) def get_commit_count(self) -> int: return sum(1 for _ in self.get_repo().iter_commits()) def get_unique_authors(self) -> set: authors = set() for commit in self.get_repo().iter_commits(): if commit.author.name: authors.add(commit.author.name) return authors def get_authors(self): from src.models import Author authors = {} for commit in self.get_repo().iter_commits(): if commit.author.name and commit.author.email: email = commit.author.email if email not in authors: authors[email] = Author( name=commit.author.name, email=email, commit_count=0, lines_added=0, lines_deleted=0, ) authors[email].commit_count += 1 return list(authors.values()) def close(self): if self._repo: self._repo.close() self._repo = None