Add utility modules: encryption, file_utils, git_utils, path_utils
This commit is contained in:
217
confsync/utils/git_utils.py
Normal file
217
confsync/utils/git_utils.py
Normal file
@@ -0,0 +1,217 @@
|
||||
"""Git utilities for ConfSync."""
|
||||
|
||||
import os
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
|
||||
class GitManager:
|
||||
"""Manages Git operations for sync and history tracking."""
|
||||
|
||||
def __init__(self, repo_path: Optional[str] = None):
|
||||
self.repo_path = repo_path
|
||||
self._repo = None
|
||||
|
||||
@property
|
||||
def repo(self):
|
||||
"""Get the GitPython repo object."""
|
||||
if self._repo is None and self.repo_path:
|
||||
try:
|
||||
import git
|
||||
self._repo = git.Repo(self.repo_path)
|
||||
except ImportError:
|
||||
pass
|
||||
return self._repo
|
||||
|
||||
def init_repo(self, path: str, bare: bool = False) -> bool:
|
||||
"""Initialize a new Git repository."""
|
||||
try:
|
||||
import git
|
||||
if bare:
|
||||
git.Repo.init(path, bare=True)
|
||||
else:
|
||||
git.Repo.init(path)
|
||||
self.repo_path = path
|
||||
return True
|
||||
except ImportError:
|
||||
print("Error: GitPython not installed")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Error initializing repository: {e}")
|
||||
return False
|
||||
|
||||
def clone_repo(self, url: str, path: str, branch: Optional[str] = None) -> bool:
|
||||
"""Clone a remote repository."""
|
||||
try:
|
||||
import git
|
||||
if branch:
|
||||
git.Repo.clone_from(url, to_path=path, branch=branch)
|
||||
else:
|
||||
git.Repo.clone_from(url, to_path=path)
|
||||
self.repo_path = path
|
||||
return True
|
||||
except ImportError:
|
||||
print("Error: GitPython not installed")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Error cloning repository: {e}")
|
||||
return False
|
||||
|
||||
def add_files(self, files: List[str], repo_path: Optional[str] = None) -> bool:
|
||||
"""Add files to staging."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
repo.index.add(files)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error adding files: {e}")
|
||||
return False
|
||||
|
||||
def commit(self, message: str, repo_path: Optional[str] = None) -> bool:
|
||||
"""Commit staged changes."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
if repo.is_dirty() or repo.untracked_files:
|
||||
repo.index.commit(message)
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Error committing: {e}")
|
||||
return False
|
||||
|
||||
def push(self, remote: str = "origin", branch: Optional[str] = None, repo_path: Optional[str] = None) -> bool:
|
||||
"""Push changes to remote."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
push_kwargs = {}
|
||||
if branch:
|
||||
push_kwargs["refspec"] = branch
|
||||
repo.remotes[remote].push(**push_kwargs)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error pushing: {e}")
|
||||
return False
|
||||
|
||||
def pull(self, remote: str = "origin", branch: Optional[str] = None, repo_path: Optional[str] = None) -> bool:
|
||||
"""Pull changes from remote."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
pull_kwargs = {}
|
||||
if branch:
|
||||
pull_kwargs["branch"] = branch
|
||||
repo.remotes[remote].pull(**pull_kwargs)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error pulling: {e}")
|
||||
return False
|
||||
|
||||
def get_status(self, repo_path: Optional[str] = None) -> Dict:
|
||||
"""Get repository status."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
return {
|
||||
"is_dirty": repo.is_dirty(),
|
||||
"untracked_files": repo.untracked_files,
|
||||
"modified_files": [item.a_path for item in repo.index.diff(None)],
|
||||
"staged_files": [item.a_path for item in repo.index.diff("HEAD")],
|
||||
"ahead": 0,
|
||||
"behind": 0,
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"Error getting status: {e}")
|
||||
return {}
|
||||
|
||||
def get_file_diff(self, file_path: str, repo_path: Optional[str] = None) -> str:
|
||||
"""Get diff for a specific file."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
if os.path.exists(os.path.join(repo.working_dir, file_path)):
|
||||
diff = repo.git.diff(file_path)
|
||||
return diff
|
||||
return ""
|
||||
except Exception as e:
|
||||
print(f"Error getting diff: {e}")
|
||||
return ""
|
||||
|
||||
def get_commit_history(self, path: Optional[str] = None, max_count: int = 20, repo_path: Optional[str] = None) -> List[Dict]:
|
||||
"""Get commit history."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
commits = []
|
||||
if path:
|
||||
for commit in repo.iter_commits("HEAD", paths=path, max_count=max_count):
|
||||
commits.append({
|
||||
"hexsha": commit.hexsha,
|
||||
"message": commit.message,
|
||||
"author": str(commit.author),
|
||||
"authored_datetime": commit.authored_datetime.isoformat(),
|
||||
})
|
||||
else:
|
||||
for commit in repo.iter_commits("HEAD", max_count=max_count):
|
||||
commits.append({
|
||||
"hexsha": commit.hexsha,
|
||||
"message": commit.message,
|
||||
"author": str(commit.author),
|
||||
"authored_datetime": commit.authored_datetime.isoformat(),
|
||||
})
|
||||
return commits
|
||||
except Exception as e:
|
||||
print(f"Error getting history: {e}")
|
||||
return []
|
||||
|
||||
def checkout(self, commit_or_branch: str, repo_path: Optional[str] = None) -> bool:
|
||||
"""Checkout a specific commit or branch."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
repo.git.checkout(commit_or_branch)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error checking out: {e}")
|
||||
return False
|
||||
|
||||
def create_branch(self, branch_name: str, repo_path: Optional[str] = None) -> bool:
|
||||
"""Create a new branch."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
repo.create_head(branch_name)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error creating branch: {e}")
|
||||
return False
|
||||
|
||||
def get_branches(self, repo_path: Optional[str] = None) -> List[str]:
|
||||
"""Get list of branches."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
return [head.name for head in repo.heads]
|
||||
except Exception as e:
|
||||
print(f"Error getting branches: {e}")
|
||||
return []
|
||||
|
||||
def add_remote(self, name: str, url: str, repo_path: Optional[str] = None) -> bool:
|
||||
"""Add a remote repository."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
repo.create_remote(name, url)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error adding remote: {e}")
|
||||
return False
|
||||
|
||||
def get_remotes(self, repo_path: Optional[str] = None) -> Dict[str, str]:
|
||||
"""Get list of remotes and their URLs."""
|
||||
try:
|
||||
repo = self._get_repo(repo_path)
|
||||
return {remote.name: remote.url for remote in repo.remotes}
|
||||
except Exception as e:
|
||||
print(f"Error getting remotes: {e}")
|
||||
return {}
|
||||
|
||||
def _get_repo(self, repo_path: Optional[str] = None):
|
||||
"""Get or initialize the repository."""
|
||||
import git
|
||||
path = repo_path or self.repo_path
|
||||
if path is None:
|
||||
raise ValueError("Repository path not specified")
|
||||
if self._repo is None or self._repo.working_dir != os.path.abspath(path):
|
||||
self._repo = git.Repo(path)
|
||||
return self._repo
|
||||
Reference in New Issue
Block a user