Initial upload: Git AI Documentation Generator v0.1.0
Some checks failed
CI / test (push) Has been cancelled
Some checks failed
CI / test (push) Has been cancelled
This commit is contained in:
264
src/git_utils.py
Normal file
264
src/git_utils.py
Normal file
@@ -0,0 +1,264 @@
|
||||
"""Git operations module using GitPython."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from git import Repo, exc
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitChange:
|
||||
"""Represents a file change in git."""
|
||||
|
||||
file_path: str
|
||||
change_type: str
|
||||
diff_content: str
|
||||
staged: bool
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitCommit:
|
||||
"""Represents a git commit."""
|
||||
|
||||
sha: str
|
||||
message: str
|
||||
author: str
|
||||
author_email: str
|
||||
date: datetime
|
||||
commit_type: Optional[str] = None
|
||||
scope: Optional[str] = None
|
||||
body: Optional[str] = None
|
||||
|
||||
|
||||
class GitError(Exception):
|
||||
"""Base exception for git operations."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class NotGitRepositoryError(GitError):
|
||||
"""Raised when the directory is not a git repository."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def get_repo(path: str | Path | None = None) -> Repo:
|
||||
"""Get a GitPython Repo object for the given path.
|
||||
|
||||
Args:
|
||||
path: Path to the git repository. Defaults to current directory.
|
||||
|
||||
Returns:
|
||||
Repo object.
|
||||
|
||||
Raises:
|
||||
NotGitRepositoryError: If the path is not a git repository.
|
||||
"""
|
||||
try:
|
||||
if path is None:
|
||||
path = Path.cwd()
|
||||
return Repo(str(path))
|
||||
except exc.InvalidGitRepositoryError:
|
||||
raise NotGitRepositoryError(f"{path} is not a git repository")
|
||||
except exc.GitCommandError as e:
|
||||
raise GitError(f"Git error: {e}")
|
||||
|
||||
|
||||
def get_staged_diff(repo: Repo) -> str:
|
||||
"""Get the staged diff (changes ready to be committed).
|
||||
|
||||
Args:
|
||||
repo: GitPython Repo object.
|
||||
|
||||
Returns:
|
||||
String containing the staged diff.
|
||||
"""
|
||||
try:
|
||||
index = repo.index
|
||||
staged_changes = index.diff("HEAD")
|
||||
diff_str = ""
|
||||
for change in staged_changes:
|
||||
diff_str += f"=== {change.a_path} ===\n"
|
||||
diff_str += change.diff.decode("utf-8", errors="replace") if change.diff else ""
|
||||
diff_str += "\n"
|
||||
return diff_str
|
||||
except exc.GitCommandError as e:
|
||||
raise GitError(f"Error getting staged diff: {e}")
|
||||
|
||||
|
||||
def get_unstaged_diff(repo: Repo) -> str:
|
||||
"""Get the unstaged diff (working directory changes).
|
||||
|
||||
Args:
|
||||
repo: GitPython Repo object.
|
||||
|
||||
Returns:
|
||||
String containing the unstaged diff.
|
||||
"""
|
||||
try:
|
||||
diff_str = ""
|
||||
for item in repo.index.diff("working_tree"):
|
||||
diff_str += f"=== {item.a_path} ===\n"
|
||||
diff_str += item.diff.decode("utf-8", errors="replace") if item.diff else ""
|
||||
diff_str += "\n"
|
||||
return diff_str
|
||||
except exc.GitCommandError as e:
|
||||
raise GitError(f"Error getting unstaged diff: {e}")
|
||||
|
||||
|
||||
def get_all_changes(repo: Repo) -> list[GitChange]:
|
||||
"""Get all changes (staged and unstaged).
|
||||
|
||||
Args:
|
||||
repo: GitPython Repo object.
|
||||
|
||||
Returns:
|
||||
List of GitChange objects.
|
||||
"""
|
||||
changes: list[GitChange] = []
|
||||
try:
|
||||
diff_index = repo.index.diff("HEAD")
|
||||
for change in diff_index:
|
||||
changes.append(
|
||||
GitChange(
|
||||
file_path=change.a_path,
|
||||
change_type=change.change_type,
|
||||
diff_content=change.diff.decode("utf-8", errors="replace") if change.diff else "",
|
||||
staged=True,
|
||||
)
|
||||
)
|
||||
diff_working = repo.index.diff(None)
|
||||
for change in diff_working:
|
||||
changes.append(
|
||||
GitChange(
|
||||
file_path=change.a_path,
|
||||
change_type=change.change_type,
|
||||
diff_content=change.diff.decode("utf-8", errors="replace") if change.diff else "",
|
||||
staged=False,
|
||||
)
|
||||
)
|
||||
return changes
|
||||
except exc.GitCommandError as e:
|
||||
raise GitError(f"Error getting changes: {e}")
|
||||
|
||||
|
||||
def get_commit_history(
|
||||
repo: Repo,
|
||||
from_ref: str | None = None,
|
||||
to_ref: str | None = None,
|
||||
limit: int = 100,
|
||||
) -> list[GitCommit]:
|
||||
"""Get commit history between two refs.
|
||||
|
||||
Args:
|
||||
repo: GitPython Repo object.
|
||||
from_ref: Starting ref (commit, tag, branch). Defaults to first commit.
|
||||
to_ref: Ending ref. Defaults to HEAD.
|
||||
limit: Maximum number of commits to return.
|
||||
|
||||
Returns:
|
||||
List of GitCommit objects.
|
||||
"""
|
||||
try:
|
||||
if from_ref and to_ref:
|
||||
rev = f"{from_ref}..{to_ref}"
|
||||
elif from_ref:
|
||||
rev = f"{from_ref}..HEAD"
|
||||
elif to_ref:
|
||||
rev = to_ref
|
||||
else:
|
||||
rev = "HEAD"
|
||||
commits = list(repo.iter_commits(rev=rev, max_count=limit))
|
||||
result: list[GitCommit] = []
|
||||
for commit in commits:
|
||||
message = commit.message.strip()
|
||||
commit_type, scope, body = parse_conventional_commit(message)
|
||||
result.append(
|
||||
GitCommit(
|
||||
sha=commit.hexsha[:7],
|
||||
message=message,
|
||||
author=commit.author.name,
|
||||
author_email=commit.author.email,
|
||||
date=commit.authored_datetime,
|
||||
commit_type=commit_type,
|
||||
scope=scope,
|
||||
body=body,
|
||||
)
|
||||
)
|
||||
return result
|
||||
except exc.GitCommandError:
|
||||
return []
|
||||
|
||||
|
||||
def parse_conventional_commit(message: str) -> tuple[Optional[str], Optional[str], Optional[str]]:
|
||||
"""Parse a conventional commit message.
|
||||
|
||||
Format: <type>(<scope>): <subject>
|
||||
<body>
|
||||
|
||||
Args:
|
||||
message: The commit message to parse.
|
||||
|
||||
Returns:
|
||||
Tuple of (type, scope, body).
|
||||
"""
|
||||
lines = message.split("\n")
|
||||
first_line = lines[0] if lines else ""
|
||||
|
||||
if ":" not in first_line:
|
||||
return None, None, None
|
||||
|
||||
type_scope, subject = first_line.split(":", 1)
|
||||
type_scope = type_scope.strip()
|
||||
subject = subject.strip()
|
||||
|
||||
if "(" in type_scope and ")" in type_scope:
|
||||
type_part = type_scope.split("(")[0].strip()
|
||||
scope = type_scope.split("(")[1].split(")")[0].strip()
|
||||
else:
|
||||
type_part = type_scope.strip()
|
||||
scope = None
|
||||
|
||||
body = "\n".join(lines[1:]).strip() if len(lines) > 1 else None
|
||||
|
||||
return type_part, scope, body
|
||||
|
||||
|
||||
def get_changed_files(repo: Repo, ref: str | None = None) -> list[str]:
|
||||
"""Get list of changed files between ref and current state.
|
||||
|
||||
Args:
|
||||
repo: GitPython Repo object.
|
||||
ref: Reference commit/tag to compare against.
|
||||
|
||||
Returns:
|
||||
List of file paths.
|
||||
"""
|
||||
try:
|
||||
if ref:
|
||||
diff = repo.head.commit.diff(f"{ref}..HEAD")
|
||||
else:
|
||||
diff = repo.head.commit.diff()
|
||||
return [change.a_path for change in diff]
|
||||
except exc.GitCommandError as e:
|
||||
raise GitError(f"Error getting changed files: {e}")
|
||||
|
||||
|
||||
def get_file_content_at_ref(repo: Repo, file_path: str, ref: str) -> str:
|
||||
"""Get file content at a specific ref.
|
||||
|
||||
Args:
|
||||
repo: GitPython Repo object.
|
||||
file_path: Path to the file.
|
||||
ref: Git reference (commit, tag, etc.).
|
||||
|
||||
Returns:
|
||||
File content as string.
|
||||
"""
|
||||
try:
|
||||
commit = repo.commit(ref)
|
||||
return commit.tree[file_path].data_stream.read().decode("utf-8")
|
||||
except (exc.GitCommandError, KeyError) as e:
|
||||
raise GitError(f"Error getting file content at {ref}: {e}")
|
||||
Reference in New Issue
Block a user