337 lines
11 KiB
Python
337 lines
11 KiB
Python
"""Backup and restore functionality for dev-env-sync."""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
from dataclasses import dataclass
|
|
|
|
from dev_env_sync.core.config import ConfigSchema, DotfileConfig
|
|
from dev_env_sync.core.platform import PlatformDetector
|
|
from dev_env_sync.utils.file_ops import FileOps, path_expand
|
|
from dev_env_sync.utils.logging import get_logger
|
|
|
|
|
|
class BackupError(Exception):
|
|
"""Raised when a backup operation fails."""
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class BackupEntry:
|
|
"""Entry in a backup manifest."""
|
|
source: str
|
|
backup_path: str
|
|
timestamp: str
|
|
file_hash: str
|
|
|
|
|
|
@dataclass
|
|
class BackupManifest:
|
|
"""Manifest tracking backed up files."""
|
|
timestamp: str
|
|
config_name: Optional[str]
|
|
entries: List[BackupEntry]
|
|
platform: str
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"timestamp": self.timestamp,
|
|
"config_name": self.config_name,
|
|
"entries": [
|
|
{
|
|
"source": entry.source,
|
|
"backup_path": entry.backup_path,
|
|
"timestamp": entry.timestamp,
|
|
"file_hash": entry.file_hash,
|
|
}
|
|
for entry in self.entries
|
|
],
|
|
"platform": self.platform,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "BackupManifest":
|
|
return cls(
|
|
timestamp=data["timestamp"],
|
|
config_name=data.get("config_name"),
|
|
entries=[
|
|
BackupEntry(
|
|
source=entry["source"],
|
|
backup_path=entry["backup_path"],
|
|
timestamp=entry["timestamp"],
|
|
file_hash=entry["file_hash"],
|
|
)
|
|
for entry in data["entries"]
|
|
],
|
|
platform=data["platform"],
|
|
)
|
|
|
|
|
|
class BackupManager:
|
|
"""Manages backups of dotfiles and configurations."""
|
|
|
|
def __init__(
|
|
self,
|
|
backup_dir: Optional[str] = None,
|
|
dry_run: bool = False,
|
|
logger=None,
|
|
):
|
|
self.backup_dir = path_expand(backup_dir) if backup_dir else PlatformDetector.get_backup_dir()
|
|
self.dry_run = dry_run
|
|
self.logger = logger or get_logger(__name__)
|
|
self.file_ops = FileOps(dry_run=dry_run, logger=logger)
|
|
self.current_backup_path: Optional[Path] = None
|
|
|
|
def create_backup(
|
|
self,
|
|
config: ConfigSchema,
|
|
files: Optional[List[Path]] = None,
|
|
) -> Path:
|
|
"""Create a backup of specified files."""
|
|
timestamp = datetime.now().strftime(config.backup.timestamp_format)
|
|
backup_root = path_expand(str(self.backup_dir))
|
|
backup_path = backup_root / timestamp
|
|
|
|
self.current_backup_path = backup_path
|
|
entries = []
|
|
|
|
self.logger.info(f"Creating backup at: {backup_path}")
|
|
|
|
if not self.dry_run:
|
|
backup_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
manifest_path = backup_path / "manifest.json"
|
|
entries_path = backup_path / "entries.json"
|
|
|
|
files_to_backup = files or []
|
|
|
|
for file_path in files_to_backup:
|
|
if file_path.exists() and (file_path.is_file() or file_path.is_symlink()):
|
|
file_hash = self._get_file_hash(file_path)
|
|
|
|
relative_path = file_path.name
|
|
backup_file_path = backup_path / relative_path
|
|
|
|
self.logger.info(f" Backing up: {file_path} -> {backup_file_path}")
|
|
|
|
if self.dry_run:
|
|
entry = BackupEntry(
|
|
source=str(file_path),
|
|
backup_path=str(backup_file_path),
|
|
timestamp=timestamp,
|
|
file_hash=file_hash,
|
|
)
|
|
entries.append(entry)
|
|
else:
|
|
try:
|
|
if file_path.is_symlink():
|
|
target = file_path.resolve()
|
|
target.symlink_to(backup_file_path)
|
|
else:
|
|
shutil.copy2(str(file_path), str(backup_file_path))
|
|
|
|
entry = BackupEntry(
|
|
source=str(file_path),
|
|
backup_path=str(backup_file_path),
|
|
timestamp=timestamp,
|
|
file_hash=file_hash,
|
|
)
|
|
entries.append(entry)
|
|
except OSError as e:
|
|
self.logger.error(f" Failed to backup {file_path}: {e}")
|
|
|
|
manifest = BackupManifest(
|
|
timestamp=timestamp,
|
|
config_name=config.name,
|
|
entries=entries,
|
|
platform=PlatformDetector.detect().platform.value,
|
|
)
|
|
|
|
if self.dry_run:
|
|
self.logger.info(f"[DRY-RUN] Would create manifest at {manifest_path}")
|
|
else:
|
|
with open(manifest_path, 'w', encoding='utf-8') as f:
|
|
json.dump(manifest.to_dict(), f, indent=2)
|
|
|
|
self.logger.info(f"Backup complete: {len(entries)} files backed up")
|
|
|
|
return backup_path
|
|
|
|
def _get_file_hash(self, path: Path) -> str:
|
|
"""Get the hash of a file for integrity verification."""
|
|
if path.is_symlink():
|
|
path = path.resolve()
|
|
|
|
if not path.exists():
|
|
return ""
|
|
|
|
import hashlib
|
|
hash_md5 = hashlib.md5()
|
|
with open(path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
|
|
def list_backups(self) -> List[Dict[str, Any]]:
|
|
"""List all available backups."""
|
|
backup_root = path_expand(str(self.backup_dir))
|
|
|
|
if not backup_root.exists():
|
|
return []
|
|
|
|
backups = []
|
|
for entry in sorted(backup_root.iterdir(), reverse=True):
|
|
if entry.is_dir():
|
|
manifest_path = entry / "manifest.json"
|
|
if manifest_path.exists():
|
|
try:
|
|
with open(manifest_path, 'r', encoding='utf-8') as f:
|
|
manifest_data = json.load(f)
|
|
backups.append({
|
|
"timestamp": entry.name,
|
|
"path": str(entry),
|
|
"config_name": manifest_data.get("config_name"),
|
|
"file_count": len(manifest_data.get("entries", [])),
|
|
"platform": manifest_data.get("platform"),
|
|
})
|
|
except (json.JSONDecodeError, IOError):
|
|
backups.append({
|
|
"timestamp": entry.name,
|
|
"path": str(entry),
|
|
"config_name": None,
|
|
"file_count": 0,
|
|
"platform": None,
|
|
})
|
|
|
|
return backups
|
|
|
|
def restore_backup(
|
|
self,
|
|
timestamp: str,
|
|
files: Optional[List[str]] = None,
|
|
dry_run: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Restore files from a backup."""
|
|
backup_root = path_expand(str(self.backup_dir))
|
|
backup_path = backup_root / timestamp
|
|
|
|
if not backup_path.exists():
|
|
raise BackupError(f"Backup not found: {timestamp}")
|
|
|
|
manifest_path = backup_path / "manifest.json"
|
|
if not manifest_path.exists():
|
|
raise BackupError(f"Backup manifest not found: {manifest_path}")
|
|
|
|
with open(manifest_path, 'r', encoding='utf-8') as f:
|
|
manifest_data = json.load(f)
|
|
|
|
manifest = BackupManifest.from_dict(manifest_data)
|
|
|
|
restored = []
|
|
failed = []
|
|
|
|
for entry in manifest.entries:
|
|
if files and entry.source not in files:
|
|
continue
|
|
|
|
backup_file = Path(entry.backup_path)
|
|
target = Path(entry.source)
|
|
|
|
if not backup_file.exists():
|
|
self.logger.error(f"Backup file missing: {backup_file}")
|
|
failed.append(entry.source)
|
|
continue
|
|
|
|
self.logger.info(f"Restoring: {entry.source}")
|
|
|
|
if dry_run:
|
|
self.logger.info(f" [DRY-RUN] Would restore to {target}")
|
|
restored.append(entry.source)
|
|
continue
|
|
|
|
try:
|
|
target.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(str(backup_file), str(target))
|
|
restored.append(entry.source)
|
|
except OSError as e:
|
|
self.logger.error(f" Failed to restore {target}: {e}")
|
|
failed.append(entry.source)
|
|
|
|
return {
|
|
"restored": restored,
|
|
"failed": failed,
|
|
"timestamp": timestamp,
|
|
}
|
|
|
|
def cleanup_old_backups(self, keep_count: int = 5) -> int:
|
|
"""Remove old backups, keeping the most recent ones."""
|
|
backups = self.list_backups()
|
|
|
|
if len(backups) <= keep_count:
|
|
self.logger.info("No old backups to clean up")
|
|
return 0
|
|
|
|
to_remove = backups[keep_count:]
|
|
removed = 0
|
|
|
|
for backup in to_remove:
|
|
backup_path = Path(backup["path"])
|
|
self.logger.info(f"Removing old backup: {backup_path}")
|
|
|
|
if self.dry_run:
|
|
self.logger.info(f" [DRY-RUN] Would remove {backup_path}")
|
|
else:
|
|
try:
|
|
shutil.rmtree(backup_path)
|
|
removed += 1
|
|
except OSError as e:
|
|
self.logger.error(f" Failed to remove {backup_path}: {e}")
|
|
|
|
return removed
|
|
|
|
def verify_backup(self, timestamp: str) -> Dict[str, Any]:
|
|
"""Verify the integrity of a backup."""
|
|
backup_root = path_expand(str(self.backup_dir))
|
|
backup_path = backup_root / timestamp
|
|
|
|
if not backup_path.exists():
|
|
return {"valid": False, "error": "Backup not found"}
|
|
|
|
manifest_path = backup_path / "manifest.json"
|
|
if not manifest_path.exists():
|
|
return {"valid": False, "error": "Manifest not found"}
|
|
|
|
with open(manifest_path, 'r', encoding='utf-8') as f:
|
|
manifest_data = json.load(f)
|
|
|
|
manifest = BackupManifest.from_dict(manifest_data)
|
|
|
|
verified = []
|
|
failed = []
|
|
|
|
for entry in manifest.entries:
|
|
backup_file = Path(entry.backup_path)
|
|
|
|
if not backup_file.exists():
|
|
failed.append(entry.source)
|
|
continue
|
|
|
|
current_hash = self._get_file_hash(backup_file)
|
|
if current_hash == entry.file_hash:
|
|
verified.append(entry.source)
|
|
else:
|
|
failed.append(entry.source)
|
|
|
|
return {
|
|
"valid": len(failed) == 0,
|
|
"verified": len(verified),
|
|
"failed": len(failed),
|
|
"details": {
|
|
"verified_files": verified,
|
|
"failed_files": failed,
|
|
},
|
|
}
|