diff --git a/src/git_utils.py b/src/git_utils.py new file mode 100644 index 0000000..1da558d --- /dev/null +++ b/src/git_utils.py @@ -0,0 +1,264 @@ +"""Git operations module using GitPython.""" + +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Optional + +from git import Repo, exc + + +@dataclass +class GitChange: + """Represents a file change in git.""" + + file_path: str + change_type: str + diff_content: str + staged: bool + + +@dataclass +class GitCommit: + """Represents a git commit.""" + + sha: str + message: str + author: str + author_email: str + date: datetime + commit_type: Optional[str] = None + scope: Optional[str] = None + body: Optional[str] = None + + +class GitError(Exception): + """Base exception for git operations.""" + + pass + + +class NotGitRepositoryError(GitError): + """Raised when the directory is not a git repository.""" + + pass + + +def get_repo(path: str | Path | None = None) -> Repo: + """Get a GitPython Repo object for the given path. + + Args: + path: Path to the git repository. Defaults to current directory. + + Returns: + Repo object. + + Raises: + NotGitRepositoryError: If the path is not a git repository. + """ + try: + if path is None: + path = Path.cwd() + return Repo(str(path)) + except exc.InvalidGitRepositoryError: + raise NotGitRepositoryError(f"{path} is not a git repository") + except exc.GitCommandError as e: + raise GitError(f"Git error: {e}") + + +def get_staged_diff(repo: Repo) -> str: + """Get the staged diff (changes ready to be committed). + + Args: + repo: GitPython Repo object. + + Returns: + String containing the staged diff. + """ + try: + index = repo.index + staged_changes = index.diff("HEAD") + diff_str = "" + for change in staged_changes: + diff_str += f"=== {change.a_path} ===\n" + diff_str += change.diff.decode("utf-8", errors="replace") if change.diff else "" + diff_str += "\n" + return diff_str + except exc.GitCommandError as e: + raise GitError(f"Error getting staged diff: {e}") + + +def get_unstaged_diff(repo: Repo) -> str: + """Get the unstaged diff (working directory changes). + + Args: + repo: GitPython Repo object. + + Returns: + String containing the unstaged diff. + """ + try: + diff_str = "" + for item in repo.index.diff("working_tree"): + diff_str += f"=== {item.a_path} ===\n" + diff_str += item.diff.decode("utf-8", errors="replace") if item.diff else "" + diff_str += "\n" + return diff_str + except exc.GitCommandError as e: + raise GitError(f"Error getting unstaged diff: {e}") + + +def get_all_changes(repo: Repo) -> list[GitChange]: + """Get all changes (staged and unstaged). + + Args: + repo: GitPython Repo object. + + Returns: + List of GitChange objects. + """ + changes: list[GitChange] = [] + try: + diff_index = repo.index.diff("HEAD") + for change in diff_index: + changes.append( + GitChange( + file_path=change.a_path, + change_type=change.change_type, + diff_content=change.diff.decode("utf-8", errors="replace") if change.diff else "", + staged=True, + ) + ) + diff_working = repo.index.diff(None) + for change in diff_working: + changes.append( + GitChange( + file_path=change.a_path, + change_type=change.change_type, + diff_content=change.diff.decode("utf-8", errors="replace") if change.diff else "", + staged=False, + ) + ) + return changes + except exc.GitCommandError as e: + raise GitError(f"Error getting changes: {e}") + + +def get_commit_history( + repo: Repo, + from_ref: str | None = None, + to_ref: str | None = None, + limit: int = 100, +) -> list[GitCommit]: + """Get commit history between two refs. + + Args: + repo: GitPython Repo object. + from_ref: Starting ref (commit, tag, branch). Defaults to first commit. + to_ref: Ending ref. Defaults to HEAD. + limit: Maximum number of commits to return. + + Returns: + List of GitCommit objects. + """ + try: + if from_ref and to_ref: + rev = f"{from_ref}..{to_ref}" + elif from_ref: + rev = f"{from_ref}..HEAD" + elif to_ref: + rev = to_ref + else: + rev = "HEAD" + commits = list(repo.iter_commits(rev=rev, max_count=limit)) + result: list[GitCommit] = [] + for commit in commits: + message = commit.message.strip() + commit_type, scope, body = parse_conventional_commit(message) + result.append( + GitCommit( + sha=commit.hexsha[:7], + message=message, + author=commit.author.name, + author_email=commit.author.email, + date=commit.authored_datetime, + commit_type=commit_type, + scope=scope, + body=body, + ) + ) + return result + except exc.GitCommandError: + return [] + + +def parse_conventional_commit(message: str) -> tuple[Optional[str], Optional[str], Optional[str]]: + """Parse a conventional commit message. + + Format: (): + + + Args: + message: The commit message to parse. + + Returns: + Tuple of (type, scope, body). + """ + lines = message.split("\n") + first_line = lines[0] if lines else "" + + if ":" not in first_line: + return None, None, None + + type_scope, subject = first_line.split(":", 1) + type_scope = type_scope.strip() + subject = subject.strip() + + if "(" in type_scope and ")" in type_scope: + type_part = type_scope.split("(")[0].strip() + scope = type_scope.split("(")[1].split(")")[0].strip() + else: + type_part = type_scope.strip() + scope = None + + body = "\n".join(lines[1:]).strip() if len(lines) > 1 else None + + return type_part, scope, body + + +def get_changed_files(repo: Repo, ref: str | None = None) -> list[str]: + """Get list of changed files between ref and current state. + + Args: + repo: GitPython Repo object. + ref: Reference commit/tag to compare against. + + Returns: + List of file paths. + """ + try: + if ref: + diff = repo.head.commit.diff(f"{ref}..HEAD") + else: + diff = repo.head.commit.diff() + return [change.a_path for change in diff] + except exc.GitCommandError as e: + raise GitError(f"Error getting changed files: {e}") + + +def get_file_content_at_ref(repo: Repo, file_path: str, ref: str) -> str: + """Get file content at a specific ref. + + Args: + repo: GitPython Repo object. + file_path: Path to the file. + ref: Git reference (commit, tag, etc.). + + Returns: + File content as string. + """ + try: + commit = repo.commit(ref) + return commit.tree[file_path].data_stream.read().decode("utf-8") + except (exc.GitCommandError, KeyError) as e: + raise GitError(f"Error getting file content at {ref}: {e}")