Add git repository analyzer module
This commit is contained in:
@@ -5,6 +5,8 @@ from src.models import Commit
|
|||||||
|
|
||||||
|
|
||||||
class GitRepository:
|
class GitRepository:
|
||||||
|
"""Wrapper for git repository operations."""
|
||||||
|
|
||||||
def __init__(self, path: str):
|
def __init__(self, path: str):
|
||||||
self.path = path
|
self.path = path
|
||||||
self._repo: Optional[Repo] = None
|
self._repo: Optional[Repo] = None
|
||||||
@@ -19,18 +21,10 @@ class GitRepository:
|
|||||||
commits = []
|
commits = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
commit_iter = repo.iter_commits(
|
for git_commit in repo.iter_commits():
|
||||||
rev=None,
|
|
||||||
since=since.isoformat() if since else None,
|
|
||||||
until=until.isoformat() if until else None,
|
|
||||||
max_count=None,
|
|
||||||
)
|
|
||||||
|
|
||||||
for git_commit in commit_iter:
|
|
||||||
commit = self._convert_git_commit(git_commit)
|
commit = self._convert_git_commit(git_commit)
|
||||||
if commit:
|
if commit:
|
||||||
commits.append(commit)
|
commits.append(commit)
|
||||||
|
|
||||||
except GitCommandError as e:
|
except GitCommandError as e:
|
||||||
raise ValueError(f"Error reading git repository: {e}")
|
raise ValueError(f"Error reading git repository: {e}")
|
||||||
|
|
||||||
@@ -38,12 +32,8 @@ class GitRepository:
|
|||||||
|
|
||||||
def _convert_git_commit(self, git_commit) -> Optional[Commit]:
|
def _convert_git_commit(self, git_commit) -> Optional[Commit]:
|
||||||
try:
|
try:
|
||||||
parents = [p.hexsha for p in git_commit.parents]
|
is_merge = len(git_commit.parents) > 1
|
||||||
is_merge = len(parents) > 1
|
is_revert = git_commit.message.lower().startswith(("revert", "reverted"))
|
||||||
|
|
||||||
is_revert = False
|
|
||||||
if git_commit.message.lower().startswith(("revert", "reverted")):
|
|
||||||
is_revert = True
|
|
||||||
|
|
||||||
file_changes = []
|
file_changes = []
|
||||||
additions = 0
|
additions = 0
|
||||||
@@ -52,8 +42,7 @@ class GitRepository:
|
|||||||
try:
|
try:
|
||||||
if git_commit.stats and git_commit.stats.files:
|
if git_commit.stats and git_commit.stats.files:
|
||||||
for filepath, stats in git_commit.stats.files.items():
|
for filepath, stats in git_commit.stats.files.items():
|
||||||
file_change = self._create_file_change(filepath, stats)
|
file_changes.append(filepath)
|
||||||
file_changes.append(file_change)
|
|
||||||
additions += stats.get('insertions', 0)
|
additions += stats.get('insertions', 0)
|
||||||
deletions += stats.get('deletions', 0)
|
deletions += stats.get('deletions', 0)
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -62,56 +51,28 @@ class GitRepository:
|
|||||||
return Commit(
|
return Commit(
|
||||||
sha=git_commit.hexsha,
|
sha=git_commit.hexsha,
|
||||||
message=git_commit.message.strip(),
|
message=git_commit.message.strip(),
|
||||||
author_name=git_commit.author.name or "Unknown",
|
author=git_commit.author.name or "Unknown",
|
||||||
author_email=git_commit.author.email or "",
|
author_email=git_commit.author.email or "",
|
||||||
committed_datetime=datetime.fromtimestamp(git_commit.committed_date),
|
timestamp=datetime.fromtimestamp(git_commit.committed_date),
|
||||||
author_datetime=datetime.fromtimestamp(git_commit.authored_date),
|
lines_added=additions,
|
||||||
parents=parents,
|
lines_deleted=deletions,
|
||||||
additions=additions,
|
files_changed=file_changes,
|
||||||
deletions=deletions,
|
|
||||||
files_changed=len(file_changes),
|
|
||||||
file_changes=file_changes,
|
|
||||||
is_merge=is_merge,
|
is_merge=is_merge,
|
||||||
is_revert=is_revert,
|
is_revert=is_revert,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _create_file_change(self, filepath: str, stats: dict) -> "FileChange":
|
def get_authors(self) -> List[Author]:
|
||||||
from src.models import FileChange
|
"""Get list of authors from commits."""
|
||||||
return FileChange(
|
authors_dict = {}
|
||||||
filepath=filepath,
|
for commit in self.get_commits():
|
||||||
additions=stats.get('insertions', 0),
|
if commit.author_email not in authors_dict:
|
||||||
deletions=stats.get('deletions', 0),
|
authors_dict[commit.author_email] = Author(
|
||||||
changes=stats.get('insertions', 0) + stats.get('deletions', 0),
|
name=commit.author,
|
||||||
)
|
email=commit.author_email,
|
||||||
|
)
|
||||||
def get_commit_count(self) -> int:
|
return list(authors_dict.values())
|
||||||
return sum(1 for _ in self.get_repo().iter_commits())
|
|
||||||
|
|
||||||
def get_unique_authors(self) -> set:
|
|
||||||
authors = set()
|
|
||||||
for commit in self.get_repo().iter_commits():
|
|
||||||
if commit.author.name:
|
|
||||||
authors.add(commit.author.name)
|
|
||||||
return authors
|
|
||||||
|
|
||||||
def get_authors(self):
|
|
||||||
from src.models import Author
|
|
||||||
authors = {}
|
|
||||||
for commit in self.get_repo().iter_commits():
|
|
||||||
if commit.author.name and commit.author.email:
|
|
||||||
email = commit.author.email
|
|
||||||
if email not in authors:
|
|
||||||
authors[email] = Author(
|
|
||||||
name=commit.author.name,
|
|
||||||
email=email,
|
|
||||||
commit_count=0,
|
|
||||||
lines_added=0,
|
|
||||||
lines_deleted=0,
|
|
||||||
)
|
|
||||||
authors[email].commit_count += 1
|
|
||||||
return list(authors.values())
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
if self._repo:
|
if self._repo:
|
||||||
|
|||||||
Reference in New Issue
Block a user