Add models, analyzers, and formatters
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-02-01 07:57:58 +00:00
parent 3728e92633
commit 568f75ff29

View File

@@ -1,102 +1,90 @@
from datetime import datetime import os
from typing import List, Optional from typing import List, Optional
from git import Repo, GitCommandError from git import Repo, GitCommandError
from src.models import Commit from src.models import Commit
class GitRepository: class GitRepository:
def __init__(self, path: str): """Wrapper for GitPython repository operations."""
self.path = path
self._repo: Optional[Repo] = None
def get_repo(self) -> Repo: def __init__(self, repo_path: str) -> None:
if self._repo is None: """Initialize GitRepository."""
self._repo = Repo(self.path) self.repo_path = repo_path
return self._repo self.repo: Optional[Repo] = None
def connect(self) -> None:
"""Connect to the git repository."""
if not os.path.isdir(self.repo_path):
raise ValueError(f"Directory not found: {self.repo_path}")
git_path = os.path.join(self.repo_path, ".git")
if not os.path.isdir(git_path):
raise ValueError(f"Not a git repository: {self.repo_path}")
self.repo = Repo(self.repo_path)
def get_commits(
self,
since_days: int = 30,
author: Optional[str] = None,
) -> List[Commit]:
"""Get commits from the repository."""
if self.repo is None:
self.connect()
from datetime import datetime, timedelta
since_date = datetime.now() - timedelta(days=since_days)
def get_commits(self, since: Optional[datetime] = None, until: Optional[datetime] = None) -> List[Commit]:
repo = self.get_repo()
commits = [] commits = []
try: try:
commit_iter = repo.iter_commits( for commit in self.repo.iter_commits(
rev=None, since=since_date,
since=since.isoformat() if since else None, author=author,
until=until.isoformat() if until else None, all=True,
max_count=None, ):
) try:
commit_obj = Commit(
for git_commit in commit_iter: sha=commit.hexsha[:7],
commit = self._convert_git_commit(git_commit) message=commit.message.strip() if commit.message else "",
if commit: author_name=commit.author.name if commit.author else "Unknown",
commits.append(commit) author_email=commit.author.email if commit.author else "",
committed_datetime=commit.committed_datetime,
except GitCommandError as e: author_datetime=commit.authored_datetime,
raise ValueError(f"Error reading git repository: {e}") additions=commit.stats.files.get(commit.hexsha, {}).get("insertions", 0) if hasattr(commit, 'stats') else 0,
deletions=commit.stats.files.get(commit.hexsha, {}).get("deletions", 0) if hasattr(commit, 'stats') else 0,
files_changed=list(commit.stats.files.keys()) if hasattr(commit, 'stats') else [],
parents=[p.hexsha for p in commit.parents] if commit.parents else [],
is_merge=len(commit.parents) > 1 if commit.parents else False,
)
commits.append(commit_obj)
except Exception:
continue
except GitCommandError:
pass
return commits return commits
def _convert_git_commit(self, git_commit) -> Optional[Commit]:
try:
parents = [p.hexsha for p in git_commit.parents]
is_merge = len(parents) > 1
is_revert = False
if git_commit.message.lower().startswith(("revert", "reverted")):
is_revert = True
file_changes = []
additions = 0
deletions = 0
try:
if git_commit.stats and git_commit.stats.files:
for filepath, stats in git_commit.stats.files.items():
file_change = self._create_file_change(filepath, stats)
file_changes.append(file_change)
additions += stats.get('insertions', 0)
deletions += stats.get('deletions', 0)
except Exception:
pass
return Commit(
sha=git_commit.hexsha,
message=git_commit.message.strip(),
author_name=git_commit.author.name or "Unknown",
author_email=git_commit.author.email or "",
committed_datetime=datetime.fromtimestamp(git_commit.committed_date),
author_datetime=datetime.fromtimestamp(git_commit.authored_date),
parents=parents,
additions=additions,
deletions=deletions,
files_changed=len(file_changes),
file_changes=file_changes,
is_merge=is_merge,
is_revert=is_revert,
)
except Exception:
return None
def _create_file_change(self, filepath: str, stats: dict) -> "FileChange": # noqa: F821
from src.models import FileChange
return FileChange(
filepath=filepath,
additions=stats.get('insertions', 0),
deletions=stats.get('deletions', 0),
changes=stats.get('insertions', 0) + stats.get('deletions', 0),
)
def get_commit_count(self) -> int: def get_commit_count(self) -> int:
return sum(1 for _ in self.get_repo().iter_commits()) """Get total commit count."""
if self.repo is None:
self.connect()
return sum(1 for _ in self.repo.iter_commits())
def get_active_authors(self) -> List[str]:
"""Get list of active authors."""
if self.repo is None:
self.connect()
def get_unique_authors(self) -> set:
authors = set() authors = set()
for commit in self.get_repo().iter_commits(): try:
if commit.author.name: for commit in self.repo.iter_commits():
authors.add(commit.author.name) if commit.author:
return authors authors.add(f"{commit.author.name} <{commit.author.email}>")
except GitCommandError:
pass
def close(self): return list(authors)
if self._repo:
self._repo.close()
self._repo = None