diff --git a/src/repohealth/analyzers/git_analyzer.py b/src/repohealth/analyzers/git_analyzer.py new file mode 100644 index 0000000..1cd6c88 --- /dev/null +++ b/src/repohealth/analyzers/git_analyzer.py @@ -0,0 +1,230 @@ +"""Git repository analyzer using GitPython.""" + +import os +from pathlib import Path +from typing import Optional, Generator +from datetime import datetime + +from git import Repo, Commit, Diff +from git.exc import InvalidGitRepositoryError, NoSuchPathError + +from repohealth.models.file_stats import FileAnalysis +from repohealth.models.author import AuthorStats + + +class GitAnalyzer: + """Analyzer for Git repository commit and authorship data.""" + + def __init__(self, repo_path: str): + """Initialize the analyzer with a repository path. + + Args: + repo_path: Path to the Git repository. + """ + self.repo_path = Path(repo_path) + self.repo: Optional[Repo] = None + self._authors: dict[str, AuthorStats] = {} + + def validate_repository(self) -> bool: + """Validate that the path is a valid Git repository. + + Returns: + True if valid, False otherwise. + """ + try: + self.repo = Repo(self.repo_path) + return not self.repo.bare + except (InvalidGitRepositoryError, NoSuchPathError): + return False + + def get_commit_count(self) -> int: + """Get total commit count in the repository. + + Returns: + Total number of commits. + """ + if not self.repo: + return 0 + return len(list(self.repo.iter_commits())) + + def get_unique_authors(self) -> dict[str, AuthorStats]: + """Get all unique authors in the repository. + + Returns: + Dictionary mapping author email to AuthorStats. + """ + if not self.repo: + return {} + + authors = {} + for commit in self.repo.iter_commits(): + author_key = commit.author.email + if author_key not in authors: + authors[author_key] = AuthorStats( + name=commit.author.name, + email=commit.author.email + ) + authors[author_key].total_commits += 1 + if not authors[author_key].first_commit: + authors[author_key].first_commit = commit.authored_datetime + authors[author_key].last_commit = commit.authored_datetime + + self._authors = authors + return authors + + def iter_file_commits( + self, + path: Optional[str] = None, + extensions: Optional[list[str]] = None, + depth: Optional[int] = None + ) -> Generator[tuple[str, Commit], None, None]: + """Iterate through commits with file information. + + Args: + path: Optional path to filter files. + extensions: Optional list of file extensions to include. + depth: Optional limit on commit history depth. + + Yields: + Tuples of (file_path, commit). + """ + if not self.repo: + return + + commit_count = 0 + for commit in self.repo.iter_commits(): + if depth and commit_count >= depth: + break + + try: + for file_data in commit.stats.files.keys(): + if path and not file_data.startswith(path): + continue + if extensions: + ext = Path(file_data).suffix.lstrip('.') + if ext not in extensions: + continue + yield file_data, commit + except (ValueError, KeyError): + continue + + commit_count += 1 + + def analyze_file_authors( + self, + file_path: str, + depth: Optional[int] = None + ) -> FileAnalysis: + """Analyze authorship for a single file. + + Args: + file_path: Path to the file. + depth: Optional limit on commit history depth. + + Returns: + FileAnalysis with authorship statistics. + """ + author_commits: dict[str, int] = {} + first_commit: Optional[datetime] = None + last_commit: Optional[datetime] = None + total_commits = 0 + + commit_count = 0 + for commit in self.repo.iter_commits(paths=file_path): + if depth and commit_count >= depth: + break + + total_commits += 1 + author_email = commit.author.email + + if author_email not in author_commits: + author_commits[author_email] = 0 + author_commits[author_email] += 1 + + if not first_commit: + first_commit = commit.authored_datetime + last_commit = commit.authored_datetime + + commit_count += 1 + + module = str(Path(file_path).parent) + extension = Path(file_path).suffix.lstrip('.') + + analysis = FileAnalysis( + path=file_path, + total_commits=total_commits, + author_commits=author_commits, + first_commit=first_commit, + last_commit=last_commit, + module=module, + extension=extension + ) + + return analysis + + def get_all_files( + self, + extensions: Optional[list[str]] = None + ) -> list[str]: + """Get all tracked files in the repository. + + Args: + extensions: Optional list of file extensions to include. + + Returns: + List of file paths. + """ + if not self.repo: + return [] + + files = [] + for item in self.repo.tree().traverse(): + if item.type == 'blob': + if extensions: + ext = Path(item.path).suffix.lstrip('.') + if ext in extensions: + files.append(item.path) + else: + files.append(item.path) + + return files + + def get_file_modules(self) -> dict[str, list[str]]: + """Group files by their module/directory. + + Returns: + Dictionary mapping module to list of files. + """ + files = self.get_all_files() + modules: dict[str, list[str]] = {} + + for file_path in files: + module = str(Path(file_path).parent) + if module not in modules: + modules[module] = [] + modules[module].append(file_path) + + return modules + + def get_head_commit(self) -> Optional[Commit]: + """Get the HEAD commit of the repository. + + Returns: + HEAD Commit or None if repository is empty. + """ + if not self.repo: + return None + try: + return self.repo.head.commit + except ValueError: + return None + + def get_branch_count(self) -> int: + """Get the number of branches in the repository. + + Returns: + Number of branches. + """ + if not self.repo: + return 0 + return len(list(self.repo.branches))