From 8bca577fc69c0508aae7eb2c3d8b9220f75724fd Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Fri, 30 Jan 2026 04:10:20 +0000 Subject: [PATCH] Add manager modules: dotfile, backup, package, and editor --- dev_env_sync/managers/backup.py | 336 ++++++++++++++++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 dev_env_sync/managers/backup.py diff --git a/dev_env_sync/managers/backup.py b/dev_env_sync/managers/backup.py new file mode 100644 index 0000000..d5a7518 --- /dev/null +++ b/dev_env_sync/managers/backup.py @@ -0,0 +1,336 @@ +"""Backup and restore functionality for dev-env-sync.""" + +import json +import os +import shutil +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional +from dataclasses import dataclass + +from dev_env_sync.core.config import ConfigSchema, DotfileConfig +from dev_env_sync.core.platform import PlatformDetector +from dev_env_sync.utils.file_ops import FileOps, path_expand +from dev_env_sync.utils.logging import get_logger + + +class BackupError(Exception): + """Raised when a backup operation fails.""" + pass + + +@dataclass +class BackupEntry: + """Entry in a backup manifest.""" + source: str + backup_path: str + timestamp: str + file_hash: str + + +@dataclass +class BackupManifest: + """Manifest tracking backed up files.""" + timestamp: str + config_name: Optional[str] + entries: List[BackupEntry] + platform: str + + def to_dict(self) -> Dict[str, Any]: + return { + "timestamp": self.timestamp, + "config_name": self.config_name, + "entries": [ + { + "source": entry.source, + "backup_path": entry.backup_path, + "timestamp": entry.timestamp, + "file_hash": entry.file_hash, + } + for entry in self.entries + ], + "platform": self.platform, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "BackupManifest": + return cls( + timestamp=data["timestamp"], + config_name=data.get("config_name"), + entries=[ + BackupEntry( + source=entry["source"], + backup_path=entry["backup_path"], + timestamp=entry["timestamp"], + file_hash=entry["file_hash"], + ) + for entry in data["entries"] + ], + platform=data["platform"], + ) + + +class BackupManager: + """Manages backups of dotfiles and configurations.""" + + def __init__( + self, + backup_dir: Optional[str] = None, + dry_run: bool = False, + logger=None, + ): + self.backup_dir = path_expand(backup_dir) if backup_dir else PlatformDetector.get_backup_dir() + self.dry_run = dry_run + self.logger = logger or get_logger(__name__) + self.file_ops = FileOps(dry_run=dry_run, logger=logger) + self.current_backup_path: Optional[Path] = None + + def create_backup( + self, + config: ConfigSchema, + files: Optional[List[Path]] = None, + ) -> Path: + """Create a backup of specified files.""" + timestamp = datetime.now().strftime(config.backup.timestamp_format) + backup_root = path_expand(str(self.backup_dir)) + backup_path = backup_root / timestamp + + self.current_backup_path = backup_path + entries = [] + + self.logger.info(f"Creating backup at: {backup_path}") + + if not self.dry_run: + backup_path.mkdir(parents=True, exist_ok=True) + + manifest_path = backup_path / "manifest.json" + entries_path = backup_path / "entries.json" + + files_to_backup = files or [] + + for file_path in files_to_backup: + if file_path.exists() and (file_path.is_file() or file_path.is_symlink()): + file_hash = self._get_file_hash(file_path) + + relative_path = file_path.name + backup_file_path = backup_path / relative_path + + self.logger.info(f" Backing up: {file_path} -> {backup_file_path}") + + if self.dry_run: + entry = BackupEntry( + source=str(file_path), + backup_path=str(backup_file_path), + timestamp=timestamp, + file_hash=file_hash, + ) + entries.append(entry) + else: + try: + if file_path.is_symlink(): + target = file_path.resolve() + target.symlink_to(backup_file_path) + else: + shutil.copy2(str(file_path), str(backup_file_path)) + + entry = BackupEntry( + source=str(file_path), + backup_path=str(backup_file_path), + timestamp=timestamp, + file_hash=file_hash, + ) + entries.append(entry) + except OSError as e: + self.logger.error(f" Failed to backup {file_path}: {e}") + + manifest = BackupManifest( + timestamp=timestamp, + config_name=config.name, + entries=entries, + platform=PlatformDetector.detect().platform.value, + ) + + if self.dry_run: + self.logger.info(f"[DRY-RUN] Would create manifest at {manifest_path}") + else: + with open(manifest_path, 'w', encoding='utf-8') as f: + json.dump(manifest.to_dict(), f, indent=2) + + self.logger.info(f"Backup complete: {len(entries)} files backed up") + + return backup_path + + def _get_file_hash(self, path: Path) -> str: + """Get the hash of a file for integrity verification.""" + if path.is_symlink(): + path = path.resolve() + + if not path.exists(): + return "" + + import hashlib + hash_md5 = hashlib.md5() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + def list_backups(self) -> List[Dict[str, Any]]: + """List all available backups.""" + backup_root = path_expand(str(self.backup_dir)) + + if not backup_root.exists(): + return [] + + backups = [] + for entry in sorted(backup_root.iterdir(), reverse=True): + if entry.is_dir(): + manifest_path = entry / "manifest.json" + if manifest_path.exists(): + try: + with open(manifest_path, 'r', encoding='utf-8') as f: + manifest_data = json.load(f) + backups.append({ + "timestamp": entry.name, + "path": str(entry), + "config_name": manifest_data.get("config_name"), + "file_count": len(manifest_data.get("entries", [])), + "platform": manifest_data.get("platform"), + }) + except (json.JSONDecodeError, IOError): + backups.append({ + "timestamp": entry.name, + "path": str(entry), + "config_name": None, + "file_count": 0, + "platform": None, + }) + + return backups + + def restore_backup( + self, + timestamp: str, + files: Optional[List[str]] = None, + dry_run: bool = False, + ) -> Dict[str, Any]: + """Restore files from a backup.""" + backup_root = path_expand(str(self.backup_dir)) + backup_path = backup_root / timestamp + + if not backup_path.exists(): + raise BackupError(f"Backup not found: {timestamp}") + + manifest_path = backup_path / "manifest.json" + if not manifest_path.exists(): + raise BackupError(f"Backup manifest not found: {manifest_path}") + + with open(manifest_path, 'r', encoding='utf-8') as f: + manifest_data = json.load(f) + + manifest = BackupManifest.from_dict(manifest_data) + + restored = [] + failed = [] + + for entry in manifest.entries: + if files and entry.source not in files: + continue + + backup_file = Path(entry.backup_path) + target = Path(entry.source) + + if not backup_file.exists(): + self.logger.error(f"Backup file missing: {backup_file}") + failed.append(entry.source) + continue + + self.logger.info(f"Restoring: {entry.source}") + + if dry_run: + self.logger.info(f" [DRY-RUN] Would restore to {target}") + restored.append(entry.source) + continue + + try: + target.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(str(backup_file), str(target)) + restored.append(entry.source) + except OSError as e: + self.logger.error(f" Failed to restore {target}: {e}") + failed.append(entry.source) + + return { + "restored": restored, + "failed": failed, + "timestamp": timestamp, + } + + def cleanup_old_backups(self, keep_count: int = 5) -> int: + """Remove old backups, keeping the most recent ones.""" + backups = self.list_backups() + + if len(backups) <= keep_count: + self.logger.info("No old backups to clean up") + return 0 + + to_remove = backups[keep_count:] + removed = 0 + + for backup in to_remove: + backup_path = Path(backup["path"]) + self.logger.info(f"Removing old backup: {backup_path}") + + if self.dry_run: + self.logger.info(f" [DRY-RUN] Would remove {backup_path}") + else: + try: + shutil.rmtree(backup_path) + removed += 1 + except OSError as e: + self.logger.error(f" Failed to remove {backup_path}: {e}") + + return removed + + def verify_backup(self, timestamp: str) -> Dict[str, Any]: + """Verify the integrity of a backup.""" + backup_root = path_expand(str(self.backup_dir)) + backup_path = backup_root / timestamp + + if not backup_path.exists(): + return {"valid": False, "error": "Backup not found"} + + manifest_path = backup_path / "manifest.json" + if not manifest_path.exists(): + return {"valid": False, "error": "Manifest not found"} + + with open(manifest_path, 'r', encoding='utf-8') as f: + manifest_data = json.load(f) + + manifest = BackupManifest.from_dict(manifest_data) + + verified = [] + failed = [] + + for entry in manifest.entries: + backup_file = Path(entry.backup_path) + + if not backup_file.exists(): + failed.append(entry.source) + continue + + current_hash = self._get_file_hash(backup_file) + if current_hash == entry.file_hash: + verified.append(entry.source) + else: + failed.append(entry.source) + + return { + "valid": len(failed) == 0, + "verified": len(verified), + "failed": len(failed), + "details": { + "verified_files": verified, + "failed_files": failed, + }, + }