Initial upload: Add repohealth-cli project with CI/CD workflow
This commit is contained in:
230
src/repohealth/analyzers/git_analyzer.py
Normal file
230
src/repohealth/analyzers/git_analyzer.py
Normal file
@@ -0,0 +1,230 @@
|
||||
"""Git repository analyzer using GitPython."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional, Generator
|
||||
from datetime import datetime
|
||||
|
||||
from git import Repo, Commit, Diff
|
||||
from git.exc import InvalidGitRepositoryError, NoSuchPathError
|
||||
|
||||
from repohealth.models.file_stats import FileAnalysis
|
||||
from repohealth.models.author import AuthorStats
|
||||
|
||||
|
||||
class GitAnalyzer:
|
||||
"""Analyzer for Git repository commit and authorship data."""
|
||||
|
||||
def __init__(self, repo_path: str):
|
||||
"""Initialize the analyzer with a repository path.
|
||||
|
||||
Args:
|
||||
repo_path: Path to the Git repository.
|
||||
"""
|
||||
self.repo_path = Path(repo_path)
|
||||
self.repo: Optional[Repo] = None
|
||||
self._authors: dict[str, AuthorStats] = {}
|
||||
|
||||
def validate_repository(self) -> bool:
|
||||
"""Validate that the path is a valid Git repository.
|
||||
|
||||
Returns:
|
||||
True if valid, False otherwise.
|
||||
"""
|
||||
try:
|
||||
self.repo = Repo(self.repo_path)
|
||||
return not self.repo.bare
|
||||
except (InvalidGitRepositoryError, NoSuchPathError):
|
||||
return False
|
||||
|
||||
def get_commit_count(self) -> int:
|
||||
"""Get total commit count in the repository.
|
||||
|
||||
Returns:
|
||||
Total number of commits.
|
||||
"""
|
||||
if not self.repo:
|
||||
return 0
|
||||
return len(list(self.repo.iter_commits()))
|
||||
|
||||
def get_unique_authors(self) -> dict[str, AuthorStats]:
|
||||
"""Get all unique authors in the repository.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping author email to AuthorStats.
|
||||
"""
|
||||
if not self.repo:
|
||||
return {}
|
||||
|
||||
authors = {}
|
||||
for commit in self.repo.iter_commits():
|
||||
author_key = commit.author.email
|
||||
if author_key not in authors:
|
||||
authors[author_key] = AuthorStats(
|
||||
name=commit.author.name,
|
||||
email=commit.author.email
|
||||
)
|
||||
authors[author_key].total_commits += 1
|
||||
if not authors[author_key].first_commit:
|
||||
authors[author_key].first_commit = commit.authored_datetime
|
||||
authors[author_key].last_commit = commit.authored_datetime
|
||||
|
||||
self._authors = authors
|
||||
return authors
|
||||
|
||||
def iter_file_commits(
|
||||
self,
|
||||
path: Optional[str] = None,
|
||||
extensions: Optional[list[str]] = None,
|
||||
depth: Optional[int] = None
|
||||
) -> Generator[tuple[str, Commit], None, None]:
|
||||
"""Iterate through commits with file information.
|
||||
|
||||
Args:
|
||||
path: Optional path to filter files.
|
||||
extensions: Optional list of file extensions to include.
|
||||
depth: Optional limit on commit history depth.
|
||||
|
||||
Yields:
|
||||
Tuples of (file_path, commit).
|
||||
"""
|
||||
if not self.repo:
|
||||
return
|
||||
|
||||
commit_count = 0
|
||||
for commit in self.repo.iter_commits():
|
||||
if depth and commit_count >= depth:
|
||||
break
|
||||
|
||||
try:
|
||||
for file_data in commit.stats.files.keys():
|
||||
if path and not file_data.startswith(path):
|
||||
continue
|
||||
if extensions:
|
||||
ext = Path(file_data).suffix.lstrip('.')
|
||||
if ext not in extensions:
|
||||
continue
|
||||
yield file_data, commit
|
||||
except (ValueError, KeyError):
|
||||
continue
|
||||
|
||||
commit_count += 1
|
||||
|
||||
def analyze_file_authors(
|
||||
self,
|
||||
file_path: str,
|
||||
depth: Optional[int] = None
|
||||
) -> FileAnalysis:
|
||||
"""Analyze authorship for a single file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file.
|
||||
depth: Optional limit on commit history depth.
|
||||
|
||||
Returns:
|
||||
FileAnalysis with authorship statistics.
|
||||
"""
|
||||
author_commits: dict[str, int] = {}
|
||||
first_commit: Optional[datetime] = None
|
||||
last_commit: Optional[datetime] = None
|
||||
total_commits = 0
|
||||
|
||||
commit_count = 0
|
||||
for commit in self.repo.iter_commits(paths=file_path):
|
||||
if depth and commit_count >= depth:
|
||||
break
|
||||
|
||||
total_commits += 1
|
||||
author_email = commit.author.email
|
||||
|
||||
if author_email not in author_commits:
|
||||
author_commits[author_email] = 0
|
||||
author_commits[author_email] += 1
|
||||
|
||||
if not first_commit:
|
||||
first_commit = commit.authored_datetime
|
||||
last_commit = commit.authored_datetime
|
||||
|
||||
commit_count += 1
|
||||
|
||||
module = str(Path(file_path).parent)
|
||||
extension = Path(file_path).suffix.lstrip('.')
|
||||
|
||||
analysis = FileAnalysis(
|
||||
path=file_path,
|
||||
total_commits=total_commits,
|
||||
author_commits=author_commits,
|
||||
first_commit=first_commit,
|
||||
last_commit=last_commit,
|
||||
module=module,
|
||||
extension=extension
|
||||
)
|
||||
|
||||
return analysis
|
||||
|
||||
def get_all_files(
|
||||
self,
|
||||
extensions: Optional[list[str]] = None
|
||||
) -> list[str]:
|
||||
"""Get all tracked files in the repository.
|
||||
|
||||
Args:
|
||||
extensions: Optional list of file extensions to include.
|
||||
|
||||
Returns:
|
||||
List of file paths.
|
||||
"""
|
||||
if not self.repo:
|
||||
return []
|
||||
|
||||
files = []
|
||||
for item in self.repo.tree().traverse():
|
||||
if item.type == 'blob':
|
||||
if extensions:
|
||||
ext = Path(item.path).suffix.lstrip('.')
|
||||
if ext in extensions:
|
||||
files.append(item.path)
|
||||
else:
|
||||
files.append(item.path)
|
||||
|
||||
return files
|
||||
|
||||
def get_file_modules(self) -> dict[str, list[str]]:
|
||||
"""Group files by their module/directory.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping module to list of files.
|
||||
"""
|
||||
files = self.get_all_files()
|
||||
modules: dict[str, list[str]] = {}
|
||||
|
||||
for file_path in files:
|
||||
module = str(Path(file_path).parent)
|
||||
if module not in modules:
|
||||
modules[module] = []
|
||||
modules[module].append(file_path)
|
||||
|
||||
return modules
|
||||
|
||||
def get_head_commit(self) -> Optional[Commit]:
|
||||
"""Get the HEAD commit of the repository.
|
||||
|
||||
Returns:
|
||||
HEAD Commit or None if repository is empty.
|
||||
"""
|
||||
if not self.repo:
|
||||
return None
|
||||
try:
|
||||
return self.repo.head.commit
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def get_branch_count(self) -> int:
|
||||
"""Get the number of branches in the repository.
|
||||
|
||||
Returns:
|
||||
Number of branches.
|
||||
"""
|
||||
if not self.repo:
|
||||
return 0
|
||||
return len(list(self.repo.branches))
|
||||
Reference in New Issue
Block a user