From 9e67d25517e65bf51c6d4985bcf32807ba5b6e57 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Sun, 1 Feb 2026 08:19:56 +0000 Subject: [PATCH] fix: add analyzer modules --- src/analyzers/git_repository.py | 179 +++++++++++++++++++------------- 1 file changed, 104 insertions(+), 75 deletions(-) diff --git a/src/analyzers/git_repository.py b/src/analyzers/git_repository.py index b3e342b..033df58 100644 --- a/src/analyzers/git_repository.py +++ b/src/analyzers/git_repository.py @@ -1,90 +1,119 @@ -import os +from datetime import datetime from typing import List, Optional - from git import Repo, GitCommandError - from src.models import Commit class GitRepository: - """Wrapper for GitPython repository operations.""" + def __init__(self, path: str): + self.path = path + self._repo: Optional[Repo] = None - def __init__(self, repo_path: str) -> None: - """Initialize GitRepository.""" - self.repo_path = repo_path - self.repo: Optional[Repo] = None - - def connect(self) -> None: - """Connect to the git repository.""" - if not os.path.isdir(self.repo_path): - raise ValueError(f"Directory not found: {self.repo_path}") - - git_path = os.path.join(self.repo_path, ".git") - if not os.path.isdir(git_path): - raise ValueError(f"Not a git repository: {self.repo_path}") - - self.repo = Repo(self.repo_path) - - def get_commits( - self, - since_days: int = 30, - author: Optional[str] = None, - ) -> List[Commit]: - """Get commits from the repository.""" - if self.repo is None: - self.connect() - - from datetime import datetime, timedelta - - since_date = datetime.now() - timedelta(days=since_days) + def get_repo(self) -> Repo: + if self._repo is None: + self._repo = Repo(self.path) + return self._repo + def get_commits(self, since: Optional[datetime] = None, until: Optional[datetime] = None) -> List[Commit]: + repo = self.get_repo() commits = [] + try: - for commit in self.repo.iter_commits( - since=since_date, - author=author, - all=True, - ): - try: - commit_obj = Commit( - sha=commit.hexsha[:7], - message=commit.message.strip() if commit.message else "", - author_name=commit.author.name if commit.author else "Unknown", - author_email=commit.author.email if commit.author else "", - committed_datetime=commit.committed_datetime, - author_datetime=commit.authored_datetime, - additions=commit.stats.files.get(commit.hexsha, {}).get("insertions", 0) if hasattr(commit, 'stats') else 0, - deletions=commit.stats.files.get(commit.hexsha, {}).get("deletions", 0) if hasattr(commit, 'stats') else 0, - files_changed=list(commit.stats.files.keys()) if hasattr(commit, 'stats') else [], - parents=[p.hexsha for p in commit.parents] if commit.parents else [], - is_merge=len(commit.parents) > 1 if commit.parents else False, - ) - commits.append(commit_obj) - except Exception: - continue - except GitCommandError: - pass + commit_iter = repo.iter_commits( + rev=None, + since=since.isoformat() if since else None, + until=until.isoformat() if until else None, + max_count=None, + ) + + for git_commit in commit_iter: + commit = self._convert_git_commit(git_commit) + if commit: + commits.append(commit) + + except GitCommandError as e: + raise ValueError(f"Error reading git repository: {e}") return commits - def get_commit_count(self) -> int: - """Get total commit count.""" - if self.repo is None: - self.connect() - - return sum(1 for _ in self.repo.iter_commits()) - - def get_active_authors(self) -> List[str]: - """Get list of active authors.""" - if self.repo is None: - self.connect() - - authors = set() + def _convert_git_commit(self, git_commit) -> Optional[Commit]: try: - for commit in self.repo.iter_commits(): - if commit.author: - authors.add(f"{commit.author.name} <{commit.author.email}>") - except GitCommandError: - pass + parents = [p.hexsha for p in git_commit.parents] + is_merge = len(parents) > 1 - return list(authors) + is_revert = False + if git_commit.message.lower().startswith(("revert", "reverted")): + is_revert = True + + file_changes = [] + additions = 0 + deletions = 0 + + try: + if git_commit.stats and git_commit.stats.files: + for filepath, stats in git_commit.stats.files.items(): + file_change = self._create_file_change(filepath, stats) + file_changes.append(file_change) + additions += stats.get('insertions', 0) + deletions += stats.get('deletions', 0) + except Exception: + pass + + return Commit( + sha=git_commit.hexsha, + message=git_commit.message.strip(), + author_name=git_commit.author.name or "Unknown", + author_email=git_commit.author.email or "", + committed_datetime=datetime.fromtimestamp(git_commit.committed_date), + author_datetime=datetime.fromtimestamp(git_commit.authored_date), + parents=parents, + additions=additions, + deletions=deletions, + files_changed=len(file_changes), + file_changes=file_changes, + is_merge=is_merge, + is_revert=is_revert, + ) + except Exception: + return None + + def _create_file_change(self, filepath: str, stats: dict) -> "FileChange": + from src.models import FileChange + return FileChange( + filepath=filepath, + additions=stats.get('insertions', 0), + deletions=stats.get('deletions', 0), + changes=stats.get('insertions', 0) + stats.get('deletions', 0), + ) + + def get_commit_count(self) -> int: + return sum(1 for _ in self.get_repo().iter_commits()) + + def get_unique_authors(self) -> set: + authors = set() + for commit in self.get_repo().iter_commits(): + if commit.author.name: + authors.add(commit.author.name) + return authors + + def get_authors(self): + from src.models import Author + authors = {} + for commit in self.get_repo().iter_commits(): + if commit.author.name and commit.author.email: + email = commit.author.email + if email not in authors: + authors[email] = Author( + name=commit.author.name, + email=email, + commit_count=0, + lines_added=0, + lines_deleted=0, + ) + authors[email].commit_count += 1 + return list(authors.values()) + + def close(self): + if self._repo: + self._repo.close() + self._repo = None