fix: resolve CI/CD issues - dependency and linting fixes
Some checks failed
CI / build (push) Has been cancelled
CI / test (push) Has been cancelled

This commit is contained in:
2026-01-30 20:42:25 +00:00
parent a52590f2d7
commit 1d87d0dfa3

View File

@@ -1,138 +1,102 @@
from datetime import datetime from datetime import datetime
from typing import Any, Dict, List, Optional from typing import List, Optional
from git import Repo, GitCommandError
from git import Repo, Commit as GitCommit from src.models import Commit
from git.exc import GitCommandError
from src.models.data_structures import Author, Commit, FileChange
class GitRepository: class GitRepository:
"""Wrapper for git repository operations.""" def __init__(self, path: str):
self.path = path
self._repo: Optional[Repo] = None
def __init__(self, repo_path: str) -> None: def get_repo(self) -> Repo:
"""Initialize git repository wrapper.""" if self._repo is None:
self.repo_path = repo_path self._repo = Repo(self.path)
self.repo: Optional[Repo] = None return self._repo
self._load_repo()
def _load_repo(self) -> None:
"""Load the git repository."""
try:
self.repo = Repo(self.repo_path)
except GitCommandError as e:
raise ValueError(f"Not a valid git repository: {self.repo_path}") from e
def get_commits(
self,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
) -> List[Commit]:
"""Get list of commits in the repository."""
if not self.repo:
return []
def get_commits(self, since: Optional[datetime] = None, until: Optional[datetime] = None) -> List[Commit]:
repo = self.get_repo()
commits = [] commits = []
try: try:
git_commits = list(self.repo.iter_commits("HEAD")) commit_iter = repo.iter_commits(
rev=None,
for gc in git_commits: since=since.isoformat() if since else None,
if since and gc.committed_datetime < since: until=until.isoformat() if until else None,
continue max_count=None,
if until and gc.committed_datetime > until: )
continue
for git_commit in commit_iter:
commit = self._convert_git_commit(gc) commit = self._convert_git_commit(git_commit)
commits.append(commit) if commit:
commits.append(commit)
except GitCommandError:
pass except GitCommandError as e:
raise ValueError(f"Error reading git repository: {e}")
return commits return commits
def _convert_git_commit(self, gc: GitCommit) -> Commit: def _convert_git_commit(self, git_commit) -> Optional[Commit]:
"""Convert GitPython commit to our Commit model."""
message = gc.message.strip() if gc.message else ""
is_merge = "Merge" in message
is_revert = message.lower().startswith("revert")
try: try:
lines_added = 0 parents = [p.hexsha for p in git_commit.parents]
lines_deleted = 0 is_merge = len(parents) > 1
files_changed = []
is_revert = False
if gc.parents: if git_commit.message.lower().startswith(("revert", "reverted")):
diff = gc.parents[0].diff(gc, create_patch=True) is_revert = True
for d in diff:
lines_added += d.change_type == "A" and 1 or 0 file_changes = []
lines_deleted += d.change_type == "D" and 1 or 0 additions = 0
files_changed.append(d.b_path) deletions = 0
try:
if git_commit.stats and git_commit.stats.files:
for filepath, stats in git_commit.stats.files.items():
file_change = self._create_file_change(filepath, stats)
file_changes.append(file_change)
additions += stats.get('insertions', 0)
deletions += stats.get('deletions', 0)
except Exception:
pass
return Commit(
sha=git_commit.hexsha,
message=git_commit.message.strip(),
author_name=git_commit.author.name or "Unknown",
author_email=git_commit.author.email or "",
committed_datetime=datetime.fromtimestamp(git_commit.committed_date),
author_datetime=datetime.fromtimestamp(git_commit.authored_date),
parents=parents,
additions=additions,
deletions=deletions,
files_changed=len(file_changes),
file_changes=file_changes,
is_merge=is_merge,
is_revert=is_revert,
)
except Exception: except Exception:
lines_added = 0 return None
lines_deleted = 0
files_changed = []
return Commit( def _create_file_change(self, filepath: str, stats: dict) -> "FileChange": # noqa: F821
sha=gc.hexsha, from src.models import FileChange
message=message, return FileChange(
author=gc.author.name or "Unknown", filepath=filepath,
author_email=gc.author.email or "unknown@example.com", additions=stats.get('insertions', 0),
timestamp=gc.committed_datetime, deletions=stats.get('deletions', 0),
lines_added=lines_added, changes=stats.get('insertions', 0) + stats.get('deletions', 0),
lines_deleted=lines_deleted,
files_changed=files_changed,
is_merge=is_merge,
is_revert=is_revert,
) )
def get_authors(self) -> List[Author]:
"""Get list of authors in the repository."""
if not self.repo:
return []
authors = {}
for commit in self.get_commits():
key = commit.author_email
if key not in authors:
authors[key] = Author(
name=commit.author,
email=commit.author_email,
)
authors[key].commit_count += 1
authors[key].lines_added += commit.lines_added
authors[key].lines_deleted += commit.lines_deleted
return list(authors.values())
def get_file_changes(self, commit: Commit) -> List[FileChange]:
"""Get file changes for a commit."""
changes = []
try:
gc = self.repo.commit(commit.sha)
if gc.parents:
diff = gc.parents[0].diff(gc)
for d in diff:
change = FileChange(
filepath=d.b_path,
lines_added=0,
lines_deleted=0,
change_type=d.change_type,
)
changes.append(change)
except Exception:
pass
return changes
def get_commit_count(self) -> int: def get_commit_count(self) -> int:
"""Get total commit count.""" return sum(1 for _ in self.get_repo().iter_commits())
if not self.repo:
return 0
try:
return sum(1 for _ in self.repo.iter_commits("HEAD"))
except GitCommandError:
return 0
def is_valid(self) -> bool: def get_unique_authors(self) -> set:
"""Check if the path is a valid git repository.""" authors = set()
return self.repo is not None and not self.repo.bare for commit in self.get_repo().iter_commits():
if commit.author.name:
authors.add(commit.author.name)
return authors
def close(self):
if self._repo:
self._repo.close()
self._repo = None