Add manager modules: dotfile, backup, package, and editor
Some checks failed
CI / test (push) Has been cancelled
Some checks failed
CI / test (push) Has been cancelled
This commit is contained in:
336
dev_env_sync/managers/backup.py
Normal file
336
dev_env_sync/managers/backup.py
Normal file
@@ -0,0 +1,336 @@
|
||||
"""Backup and restore functionality for dev-env-sync."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
from dataclasses import dataclass
|
||||
|
||||
from dev_env_sync.core.config import ConfigSchema, DotfileConfig
|
||||
from dev_env_sync.core.platform import PlatformDetector
|
||||
from dev_env_sync.utils.file_ops import FileOps, path_expand
|
||||
from dev_env_sync.utils.logging import get_logger
|
||||
|
||||
|
||||
class BackupError(Exception):
|
||||
"""Raised when a backup operation fails."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class BackupEntry:
|
||||
"""Entry in a backup manifest."""
|
||||
source: str
|
||||
backup_path: str
|
||||
timestamp: str
|
||||
file_hash: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class BackupManifest:
|
||||
"""Manifest tracking backed up files."""
|
||||
timestamp: str
|
||||
config_name: Optional[str]
|
||||
entries: List[BackupEntry]
|
||||
platform: str
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"timestamp": self.timestamp,
|
||||
"config_name": self.config_name,
|
||||
"entries": [
|
||||
{
|
||||
"source": entry.source,
|
||||
"backup_path": entry.backup_path,
|
||||
"timestamp": entry.timestamp,
|
||||
"file_hash": entry.file_hash,
|
||||
}
|
||||
for entry in self.entries
|
||||
],
|
||||
"platform": self.platform,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "BackupManifest":
|
||||
return cls(
|
||||
timestamp=data["timestamp"],
|
||||
config_name=data.get("config_name"),
|
||||
entries=[
|
||||
BackupEntry(
|
||||
source=entry["source"],
|
||||
backup_path=entry["backup_path"],
|
||||
timestamp=entry["timestamp"],
|
||||
file_hash=entry["file_hash"],
|
||||
)
|
||||
for entry in data["entries"]
|
||||
],
|
||||
platform=data["platform"],
|
||||
)
|
||||
|
||||
|
||||
class BackupManager:
|
||||
"""Manages backups of dotfiles and configurations."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
backup_dir: Optional[str] = None,
|
||||
dry_run: bool = False,
|
||||
logger=None,
|
||||
):
|
||||
self.backup_dir = path_expand(backup_dir) if backup_dir else PlatformDetector.get_backup_dir()
|
||||
self.dry_run = dry_run
|
||||
self.logger = logger or get_logger(__name__)
|
||||
self.file_ops = FileOps(dry_run=dry_run, logger=logger)
|
||||
self.current_backup_path: Optional[Path] = None
|
||||
|
||||
def create_backup(
|
||||
self,
|
||||
config: ConfigSchema,
|
||||
files: Optional[List[Path]] = None,
|
||||
) -> Path:
|
||||
"""Create a backup of specified files."""
|
||||
timestamp = datetime.now().strftime(config.backup.timestamp_format)
|
||||
backup_root = path_expand(str(self.backup_dir))
|
||||
backup_path = backup_root / timestamp
|
||||
|
||||
self.current_backup_path = backup_path
|
||||
entries = []
|
||||
|
||||
self.logger.info(f"Creating backup at: {backup_path}")
|
||||
|
||||
if not self.dry_run:
|
||||
backup_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
manifest_path = backup_path / "manifest.json"
|
||||
entries_path = backup_path / "entries.json"
|
||||
|
||||
files_to_backup = files or []
|
||||
|
||||
for file_path in files_to_backup:
|
||||
if file_path.exists() and (file_path.is_file() or file_path.is_symlink()):
|
||||
file_hash = self._get_file_hash(file_path)
|
||||
|
||||
relative_path = file_path.name
|
||||
backup_file_path = backup_path / relative_path
|
||||
|
||||
self.logger.info(f" Backing up: {file_path} -> {backup_file_path}")
|
||||
|
||||
if self.dry_run:
|
||||
entry = BackupEntry(
|
||||
source=str(file_path),
|
||||
backup_path=str(backup_file_path),
|
||||
timestamp=timestamp,
|
||||
file_hash=file_hash,
|
||||
)
|
||||
entries.append(entry)
|
||||
else:
|
||||
try:
|
||||
if file_path.is_symlink():
|
||||
target = file_path.resolve()
|
||||
target.symlink_to(backup_file_path)
|
||||
else:
|
||||
shutil.copy2(str(file_path), str(backup_file_path))
|
||||
|
||||
entry = BackupEntry(
|
||||
source=str(file_path),
|
||||
backup_path=str(backup_file_path),
|
||||
timestamp=timestamp,
|
||||
file_hash=file_hash,
|
||||
)
|
||||
entries.append(entry)
|
||||
except OSError as e:
|
||||
self.logger.error(f" Failed to backup {file_path}: {e}")
|
||||
|
||||
manifest = BackupManifest(
|
||||
timestamp=timestamp,
|
||||
config_name=config.name,
|
||||
entries=entries,
|
||||
platform=PlatformDetector.detect().platform.value,
|
||||
)
|
||||
|
||||
if self.dry_run:
|
||||
self.logger.info(f"[DRY-RUN] Would create manifest at {manifest_path}")
|
||||
else:
|
||||
with open(manifest_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(manifest.to_dict(), f, indent=2)
|
||||
|
||||
self.logger.info(f"Backup complete: {len(entries)} files backed up")
|
||||
|
||||
return backup_path
|
||||
|
||||
def _get_file_hash(self, path: Path) -> str:
|
||||
"""Get the hash of a file for integrity verification."""
|
||||
if path.is_symlink():
|
||||
path = path.resolve()
|
||||
|
||||
if not path.exists():
|
||||
return ""
|
||||
|
||||
import hashlib
|
||||
hash_md5 = hashlib.md5()
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
hash_md5.update(chunk)
|
||||
return hash_md5.hexdigest()
|
||||
|
||||
def list_backups(self) -> List[Dict[str, Any]]:
|
||||
"""List all available backups."""
|
||||
backup_root = path_expand(str(self.backup_dir))
|
||||
|
||||
if not backup_root.exists():
|
||||
return []
|
||||
|
||||
backups = []
|
||||
for entry in sorted(backup_root.iterdir(), reverse=True):
|
||||
if entry.is_dir():
|
||||
manifest_path = entry / "manifest.json"
|
||||
if manifest_path.exists():
|
||||
try:
|
||||
with open(manifest_path, 'r', encoding='utf-8') as f:
|
||||
manifest_data = json.load(f)
|
||||
backups.append({
|
||||
"timestamp": entry.name,
|
||||
"path": str(entry),
|
||||
"config_name": manifest_data.get("config_name"),
|
||||
"file_count": len(manifest_data.get("entries", [])),
|
||||
"platform": manifest_data.get("platform"),
|
||||
})
|
||||
except (json.JSONDecodeError, IOError):
|
||||
backups.append({
|
||||
"timestamp": entry.name,
|
||||
"path": str(entry),
|
||||
"config_name": None,
|
||||
"file_count": 0,
|
||||
"platform": None,
|
||||
})
|
||||
|
||||
return backups
|
||||
|
||||
def restore_backup(
|
||||
self,
|
||||
timestamp: str,
|
||||
files: Optional[List[str]] = None,
|
||||
dry_run: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
"""Restore files from a backup."""
|
||||
backup_root = path_expand(str(self.backup_dir))
|
||||
backup_path = backup_root / timestamp
|
||||
|
||||
if not backup_path.exists():
|
||||
raise BackupError(f"Backup not found: {timestamp}")
|
||||
|
||||
manifest_path = backup_path / "manifest.json"
|
||||
if not manifest_path.exists():
|
||||
raise BackupError(f"Backup manifest not found: {manifest_path}")
|
||||
|
||||
with open(manifest_path, 'r', encoding='utf-8') as f:
|
||||
manifest_data = json.load(f)
|
||||
|
||||
manifest = BackupManifest.from_dict(manifest_data)
|
||||
|
||||
restored = []
|
||||
failed = []
|
||||
|
||||
for entry in manifest.entries:
|
||||
if files and entry.source not in files:
|
||||
continue
|
||||
|
||||
backup_file = Path(entry.backup_path)
|
||||
target = Path(entry.source)
|
||||
|
||||
if not backup_file.exists():
|
||||
self.logger.error(f"Backup file missing: {backup_file}")
|
||||
failed.append(entry.source)
|
||||
continue
|
||||
|
||||
self.logger.info(f"Restoring: {entry.source}")
|
||||
|
||||
if dry_run:
|
||||
self.logger.info(f" [DRY-RUN] Would restore to {target}")
|
||||
restored.append(entry.source)
|
||||
continue
|
||||
|
||||
try:
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy2(str(backup_file), str(target))
|
||||
restored.append(entry.source)
|
||||
except OSError as e:
|
||||
self.logger.error(f" Failed to restore {target}: {e}")
|
||||
failed.append(entry.source)
|
||||
|
||||
return {
|
||||
"restored": restored,
|
||||
"failed": failed,
|
||||
"timestamp": timestamp,
|
||||
}
|
||||
|
||||
def cleanup_old_backups(self, keep_count: int = 5) -> int:
|
||||
"""Remove old backups, keeping the most recent ones."""
|
||||
backups = self.list_backups()
|
||||
|
||||
if len(backups) <= keep_count:
|
||||
self.logger.info("No old backups to clean up")
|
||||
return 0
|
||||
|
||||
to_remove = backups[keep_count:]
|
||||
removed = 0
|
||||
|
||||
for backup in to_remove:
|
||||
backup_path = Path(backup["path"])
|
||||
self.logger.info(f"Removing old backup: {backup_path}")
|
||||
|
||||
if self.dry_run:
|
||||
self.logger.info(f" [DRY-RUN] Would remove {backup_path}")
|
||||
else:
|
||||
try:
|
||||
shutil.rmtree(backup_path)
|
||||
removed += 1
|
||||
except OSError as e:
|
||||
self.logger.error(f" Failed to remove {backup_path}: {e}")
|
||||
|
||||
return removed
|
||||
|
||||
def verify_backup(self, timestamp: str) -> Dict[str, Any]:
|
||||
"""Verify the integrity of a backup."""
|
||||
backup_root = path_expand(str(self.backup_dir))
|
||||
backup_path = backup_root / timestamp
|
||||
|
||||
if not backup_path.exists():
|
||||
return {"valid": False, "error": "Backup not found"}
|
||||
|
||||
manifest_path = backup_path / "manifest.json"
|
||||
if not manifest_path.exists():
|
||||
return {"valid": False, "error": "Manifest not found"}
|
||||
|
||||
with open(manifest_path, 'r', encoding='utf-8') as f:
|
||||
manifest_data = json.load(f)
|
||||
|
||||
manifest = BackupManifest.from_dict(manifest_data)
|
||||
|
||||
verified = []
|
||||
failed = []
|
||||
|
||||
for entry in manifest.entries:
|
||||
backup_file = Path(entry.backup_path)
|
||||
|
||||
if not backup_file.exists():
|
||||
failed.append(entry.source)
|
||||
continue
|
||||
|
||||
current_hash = self._get_file_hash(backup_file)
|
||||
if current_hash == entry.file_hash:
|
||||
verified.append(entry.source)
|
||||
else:
|
||||
failed.append(entry.source)
|
||||
|
||||
return {
|
||||
"valid": len(failed) == 0,
|
||||
"verified": len(verified),
|
||||
"failed": len(failed),
|
||||
"details": {
|
||||
"verified_files": verified,
|
||||
"failed_files": failed,
|
||||
},
|
||||
}
|
||||
Reference in New Issue
Block a user