This commit is contained in:
250
app/src/git_commit_generator/git_utils.py
Normal file
250
app/src/git_commit_generator/git_utils.py
Normal file
@@ -0,0 +1,250 @@
|
||||
"""Git utilities for extracting diffs and commit history."""
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from git import GitCommandError, Repo
|
||||
|
||||
|
||||
class GitUtils:
|
||||
"""Git operations utility class."""
|
||||
|
||||
def __init__(self, repo_path: Optional[str] = None):
|
||||
"""Initialize git utilities.
|
||||
|
||||
Args:
|
||||
repo_path: Path to git repository. Defaults to current directory.
|
||||
"""
|
||||
self.repo_path = repo_path or "."
|
||||
self.repo = Repo(self.repo_path)
|
||||
|
||||
def is_repo(self) -> bool:
|
||||
"""Check if the path is a git repository.
|
||||
|
||||
Returns:
|
||||
True if it's a git repository, False otherwise.
|
||||
"""
|
||||
try:
|
||||
self.repo = Repo(self.repo_path)
|
||||
return not self.repo.bare
|
||||
except (ValueError, GitCommandError):
|
||||
return False
|
||||
|
||||
def get_staged_diff(self) -> str:
|
||||
"""Get staged changes as a diff.
|
||||
|
||||
Returns:
|
||||
String containing staged diff.
|
||||
"""
|
||||
try:
|
||||
diffs = self.repo.index.diff("HEAD")
|
||||
if not diffs:
|
||||
staged = self.repo.index.diff(None)
|
||||
diff_text = "\n".join(
|
||||
d.diff.decode("utf-8", errors="replace") if isinstance(d.diff, bytes)
|
||||
else d.diff
|
||||
for d in staged
|
||||
)
|
||||
return diff_text
|
||||
diff_text = "\n".join(
|
||||
d.diff.decode("utf-8", errors="replace") if isinstance(d.diff, bytes)
|
||||
else d.diff
|
||||
for d in diffs
|
||||
)
|
||||
return diff_text
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def get_unstaged_diff(self) -> str:
|
||||
"""Get unstaged changes as a diff.
|
||||
|
||||
Returns:
|
||||
String containing unstaged diff.
|
||||
"""
|
||||
try:
|
||||
diffs = self.repo.index.diff(None)
|
||||
diff_text = "\n".join(
|
||||
d.diff.decode("utf-8", errors="replace") if isinstance(d.diff, bytes)
|
||||
else d.diff
|
||||
for d in diffs
|
||||
)
|
||||
return diff_text
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def get_all_changes(self, staged: bool = True, unstaged: bool = True) -> str:
|
||||
"""Get all changes (staged and/or unstaged).
|
||||
|
||||
Args:
|
||||
staged: Include staged changes.
|
||||
unstaged: Include unstaged changes.
|
||||
|
||||
Returns:
|
||||
String containing all changes.
|
||||
"""
|
||||
parts = []
|
||||
if staged:
|
||||
staged_diff = self.get_staged_diff()
|
||||
if staged_diff:
|
||||
parts.append(f"=== STAGED CHANGES ===\n{staged_diff}")
|
||||
if unstaged:
|
||||
unstaged_diff = self.get_unstaged_diff()
|
||||
if unstaged_diff:
|
||||
parts.append(f"=== UNSTAGED CHANGES ===\n{unstaged_diff}")
|
||||
return "\n\n".join(parts)
|
||||
|
||||
def get_staged_files(self) -> List[str]:
|
||||
"""Get list of staged files.
|
||||
|
||||
Returns:
|
||||
List of staged file paths.
|
||||
"""
|
||||
try:
|
||||
staged = self.repo.index.diff("HEAD")
|
||||
if not staged:
|
||||
staged = self.repo.index.diff(None)
|
||||
return [d.a_path for d in staged if d.a_path]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def get_changed_files(self) -> List[str]:
|
||||
"""Get list of changed files (both staged and unstaged).
|
||||
|
||||
Returns:
|
||||
List of changed file paths.
|
||||
"""
|
||||
try:
|
||||
changed = self.repo.index.diff(None)
|
||||
return [d.a_path for d in changed if d.a_path]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def get_commit_history(
|
||||
self, since: Optional[str] = None, limit: int = 50
|
||||
) -> List[dict]:
|
||||
"""Get commit history.
|
||||
|
||||
Args:
|
||||
since: Optional date/filter for commits.
|
||||
limit: Maximum number of commits to return.
|
||||
|
||||
Returns:
|
||||
List of commit dictionaries with hash, message, author, date.
|
||||
"""
|
||||
try:
|
||||
commits = list(self.repo.iter_commits("main"))
|
||||
if limit:
|
||||
commits = commits[:limit]
|
||||
return [
|
||||
{
|
||||
"hash": commit.hexsha[:7],
|
||||
"full_hash": commit.hexsha,
|
||||
"message": commit.message.strip(),
|
||||
"author": commit.author.name,
|
||||
"author_email": commit.author.email,
|
||||
"date": commit.committed_datetime.isoformat(),
|
||||
}
|
||||
for commit in commits
|
||||
]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def get_conventional_commits(
|
||||
self, since: Optional[str] = None, limit: int = 100
|
||||
) -> List[dict]:
|
||||
"""Get commits that follow conventional commit format.
|
||||
|
||||
Args:
|
||||
since: Optional date/filter for commits.
|
||||
limit: Maximum number of commits to return.
|
||||
|
||||
Returns:
|
||||
List of conventional commits.
|
||||
"""
|
||||
commits = self.get_commit_history(since=since, limit=limit)
|
||||
conventional = []
|
||||
for commit in commits:
|
||||
message = commit.get("message", "")
|
||||
if self._is_conventional_message(message):
|
||||
parsed = self._parse_conventional_commit(message)
|
||||
parsed.update(
|
||||
{
|
||||
"hash": commit["hash"],
|
||||
"full_hash": commit["full_hash"],
|
||||
"author": commit["author"],
|
||||
"date": commit["date"],
|
||||
}
|
||||
)
|
||||
conventional.append(parsed)
|
||||
return conventional
|
||||
|
||||
def _is_conventional_message(self, message: str) -> bool:
|
||||
"""Check if a message follows conventional commit format.
|
||||
|
||||
Args:
|
||||
message: Commit message to check.
|
||||
|
||||
Returns:
|
||||
True if conventional format detected.
|
||||
"""
|
||||
import re
|
||||
|
||||
pattern = r"^(feat|fix|docs|style|refactor|perf|test|build|ci|chore|revert)(\(.+\))?: .+"
|
||||
return bool(re.match(pattern, message.strip()))
|
||||
|
||||
def _parse_conventional_commit(self, message: str) -> dict:
|
||||
"""Parse a conventional commit message.
|
||||
|
||||
Args:
|
||||
message: Commit message to parse.
|
||||
|
||||
Returns:
|
||||
Dictionary with type, scope, and description.
|
||||
"""
|
||||
import re
|
||||
|
||||
pattern = r"^(feat|fix|docs|style|refactor|perf|test|build|ci|chore|revert)(\((.+)\))?: (.+)"
|
||||
match = re.match(pattern, message.strip())
|
||||
if match:
|
||||
return {
|
||||
"type": match.group(1),
|
||||
"scope": match.group(3) or "",
|
||||
"description": match.group(4),
|
||||
"full_message": message.strip(),
|
||||
}
|
||||
return {"type": "", "scope": "", "description": message.strip(), "full_message": message}
|
||||
|
||||
def get_file_scopes(self, files: Optional[List[str]] = None) -> List[str]:
|
||||
"""Extract potential scopes from file paths.
|
||||
|
||||
Args:
|
||||
files: List of file paths. Uses all changed files if None.
|
||||
|
||||
Returns:
|
||||
List of unique scopes.
|
||||
"""
|
||||
if not files:
|
||||
files = self.get_changed_files()
|
||||
|
||||
scopes = set()
|
||||
for file_path in files:
|
||||
parts = Path(file_path).parts
|
||||
if len(parts) > 1:
|
||||
scope = parts[0]
|
||||
if scope not in ("src", "tests", "docs", "scripts"):
|
||||
scopes.add(scope)
|
||||
else:
|
||||
scopes.add("root")
|
||||
|
||||
return sorted(list(scopes))
|
||||
|
||||
|
||||
def get_git_utils(repo_path: Optional[str] = None) -> GitUtils:
|
||||
"""Get GitUtils instance.
|
||||
|
||||
Args:
|
||||
repo_path: Optional path to repository.
|
||||
|
||||
Returns:
|
||||
GitUtils instance.
|
||||
"""
|
||||
return GitUtils(repo_path)
|
||||
Reference in New Issue
Block a user