diff --git a/src/analyzers/git_repository.py b/src/analyzers/git_repository.py index 200776b..b3e342b 100644 --- a/src/analyzers/git_repository.py +++ b/src/analyzers/git_repository.py @@ -1,102 +1,90 @@ -from datetime import datetime +import os from typing import List, Optional + from git import Repo, GitCommandError + from src.models import Commit class GitRepository: - def __init__(self, path: str): - self.path = path - self._repo: Optional[Repo] = None + """Wrapper for GitPython repository operations.""" - def get_repo(self) -> Repo: - if self._repo is None: - self._repo = Repo(self.path) - return self._repo + def __init__(self, repo_path: str) -> None: + """Initialize GitRepository.""" + self.repo_path = repo_path + self.repo: Optional[Repo] = None + + def connect(self) -> None: + """Connect to the git repository.""" + if not os.path.isdir(self.repo_path): + raise ValueError(f"Directory not found: {self.repo_path}") + + git_path = os.path.join(self.repo_path, ".git") + if not os.path.isdir(git_path): + raise ValueError(f"Not a git repository: {self.repo_path}") + + self.repo = Repo(self.repo_path) + + def get_commits( + self, + since_days: int = 30, + author: Optional[str] = None, + ) -> List[Commit]: + """Get commits from the repository.""" + if self.repo is None: + self.connect() + + from datetime import datetime, timedelta + + since_date = datetime.now() - timedelta(days=since_days) - def get_commits(self, since: Optional[datetime] = None, until: Optional[datetime] = None) -> List[Commit]: - repo = self.get_repo() commits = [] - try: - commit_iter = repo.iter_commits( - rev=None, - since=since.isoformat() if since else None, - until=until.isoformat() if until else None, - max_count=None, - ) - - for git_commit in commit_iter: - commit = self._convert_git_commit(git_commit) - if commit: - commits.append(commit) - - except GitCommandError as e: - raise ValueError(f"Error reading git repository: {e}") - + for commit in self.repo.iter_commits( + since=since_date, + author=author, + all=True, + ): + try: + commit_obj = Commit( + sha=commit.hexsha[:7], + message=commit.message.strip() if commit.message else "", + author_name=commit.author.name if commit.author else "Unknown", + author_email=commit.author.email if commit.author else "", + committed_datetime=commit.committed_datetime, + author_datetime=commit.authored_datetime, + additions=commit.stats.files.get(commit.hexsha, {}).get("insertions", 0) if hasattr(commit, 'stats') else 0, + deletions=commit.stats.files.get(commit.hexsha, {}).get("deletions", 0) if hasattr(commit, 'stats') else 0, + files_changed=list(commit.stats.files.keys()) if hasattr(commit, 'stats') else [], + parents=[p.hexsha for p in commit.parents] if commit.parents else [], + is_merge=len(commit.parents) > 1 if commit.parents else False, + ) + commits.append(commit_obj) + except Exception: + continue + except GitCommandError: + pass + return commits - def _convert_git_commit(self, git_commit) -> Optional[Commit]: - try: - parents = [p.hexsha for p in git_commit.parents] - is_merge = len(parents) > 1 - - is_revert = False - if git_commit.message.lower().startswith(("revert", "reverted")): - is_revert = True - - file_changes = [] - additions = 0 - deletions = 0 - - try: - if git_commit.stats and git_commit.stats.files: - for filepath, stats in git_commit.stats.files.items(): - file_change = self._create_file_change(filepath, stats) - file_changes.append(file_change) - additions += stats.get('insertions', 0) - deletions += stats.get('deletions', 0) - except Exception: - pass - - return Commit( - sha=git_commit.hexsha, - message=git_commit.message.strip(), - author_name=git_commit.author.name or "Unknown", - author_email=git_commit.author.email or "", - committed_datetime=datetime.fromtimestamp(git_commit.committed_date), - author_datetime=datetime.fromtimestamp(git_commit.authored_date), - parents=parents, - additions=additions, - deletions=deletions, - files_changed=len(file_changes), - file_changes=file_changes, - is_merge=is_merge, - is_revert=is_revert, - ) - except Exception: - return None - - def _create_file_change(self, filepath: str, stats: dict) -> "FileChange": # noqa: F821 - from src.models import FileChange - return FileChange( - filepath=filepath, - additions=stats.get('insertions', 0), - deletions=stats.get('deletions', 0), - changes=stats.get('insertions', 0) + stats.get('deletions', 0), - ) - def get_commit_count(self) -> int: - return sum(1 for _ in self.get_repo().iter_commits()) + """Get total commit count.""" + if self.repo is None: + self.connect() + + return sum(1 for _ in self.repo.iter_commits()) + + def get_active_authors(self) -> List[str]: + """Get list of active authors.""" + if self.repo is None: + self.connect() - def get_unique_authors(self) -> set: authors = set() - for commit in self.get_repo().iter_commits(): - if commit.author.name: - authors.add(commit.author.name) - return authors + try: + for commit in self.repo.iter_commits(): + if commit.author: + authors.add(f"{commit.author.name} <{commit.author.email}>") + except GitCommandError: + pass - def close(self): - if self._repo: - self._repo.close() - self._repo = None + return list(authors)