Initial upload: git-insights-cli with CI/CD workflow
This commit is contained in:
138
src/analyzers/git_repository.py
Normal file
138
src/analyzers/git_repository.py
Normal file
@@ -0,0 +1,138 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from git import Repo, Commit as GitCommit
|
||||
from git.exc import GitCommandError
|
||||
|
||||
from src.models.data_structures import Author, Commit, FileChange
|
||||
|
||||
|
||||
class GitRepository:
|
||||
"""Wrapper for git repository operations."""
|
||||
|
||||
def __init__(self, repo_path: str) -> None:
|
||||
"""Initialize git repository wrapper."""
|
||||
self.repo_path = repo_path
|
||||
self.repo: Optional[Repo] = None
|
||||
self._load_repo()
|
||||
|
||||
def _load_repo(self) -> None:
|
||||
"""Load the git repository."""
|
||||
try:
|
||||
self.repo = Repo(self.repo_path)
|
||||
except GitCommandError as e:
|
||||
raise ValueError(f"Not a valid git repository: {self.repo_path}") from e
|
||||
|
||||
def get_commits(
|
||||
self,
|
||||
since: Optional[datetime] = None,
|
||||
until: Optional[datetime] = None,
|
||||
) -> List[Commit]:
|
||||
"""Get list of commits in the repository."""
|
||||
if not self.repo:
|
||||
return []
|
||||
|
||||
commits = []
|
||||
try:
|
||||
git_commits = list(self.repo.iter_commits("HEAD"))
|
||||
|
||||
for gc in git_commits:
|
||||
if since and gc.committed_datetime < since:
|
||||
continue
|
||||
if until and gc.committed_datetime > until:
|
||||
continue
|
||||
|
||||
commit = self._convert_git_commit(gc)
|
||||
commits.append(commit)
|
||||
|
||||
except GitCommandError:
|
||||
pass
|
||||
|
||||
return commits
|
||||
|
||||
def _convert_git_commit(self, gc: GitCommit) -> Commit:
|
||||
"""Convert GitPython commit to our Commit model."""
|
||||
message = gc.message.strip() if gc.message else ""
|
||||
is_merge = "Merge" in message
|
||||
is_revert = message.lower().startswith("revert")
|
||||
|
||||
try:
|
||||
lines_added = 0
|
||||
lines_deleted = 0
|
||||
files_changed = []
|
||||
|
||||
if gc.parents:
|
||||
diff = gc.parents[0].diff(gc, create_patch=True)
|
||||
for d in diff:
|
||||
lines_added += d.change_type == "A" and 1 or 0
|
||||
lines_deleted += d.change_type == "D" and 1 or 0
|
||||
files_changed.append(d.b_path)
|
||||
except Exception:
|
||||
lines_added = 0
|
||||
lines_deleted = 0
|
||||
files_changed = []
|
||||
|
||||
return Commit(
|
||||
sha=gc.hexsha,
|
||||
message=message,
|
||||
author=gc.author.name or "Unknown",
|
||||
author_email=gc.author.email or "unknown@example.com",
|
||||
timestamp=gc.committed_datetime,
|
||||
lines_added=lines_added,
|
||||
lines_deleted=lines_deleted,
|
||||
files_changed=files_changed,
|
||||
is_merge=is_merge,
|
||||
is_revert=is_revert,
|
||||
)
|
||||
|
||||
def get_authors(self) -> List[Author]:
|
||||
"""Get list of authors in the repository."""
|
||||
if not self.repo:
|
||||
return []
|
||||
|
||||
authors = {}
|
||||
for commit in self.get_commits():
|
||||
key = commit.author_email
|
||||
if key not in authors:
|
||||
authors[key] = Author(
|
||||
name=commit.author,
|
||||
email=commit.author_email,
|
||||
)
|
||||
authors[key].commit_count += 1
|
||||
authors[key].lines_added += commit.lines_added
|
||||
authors[key].lines_deleted += commit.lines_deleted
|
||||
|
||||
return list(authors.values())
|
||||
|
||||
def get_file_changes(self, commit: Commit) -> List[FileChange]:
|
||||
"""Get file changes for a commit."""
|
||||
changes = []
|
||||
try:
|
||||
gc = self.repo.commit(commit.sha)
|
||||
if gc.parents:
|
||||
diff = gc.parents[0].diff(gc)
|
||||
for d in diff:
|
||||
change = FileChange(
|
||||
filepath=d.b_path,
|
||||
lines_added=0,
|
||||
lines_deleted=0,
|
||||
change_type=d.change_type,
|
||||
)
|
||||
changes.append(change)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return changes
|
||||
|
||||
def get_commit_count(self) -> int:
|
||||
"""Get total commit count."""
|
||||
if not self.repo:
|
||||
return 0
|
||||
try:
|
||||
return sum(1 for _ in self.repo.iter_commits("HEAD"))
|
||||
except GitCommandError:
|
||||
return 0
|
||||
|
||||
def is_valid(self) -> bool:
|
||||
"""Check if the path is a valid git repository."""
|
||||
return self.repo is not None and not self.repo.bare
|
||||
Reference in New Issue
Block a user