"""File operations utilities for dev-env-sync.""" import hashlib import os import shutil from pathlib import Path from typing import Dict, List, Optional, Tuple def path_expand(path: str) -> Path: """Expand user home directory and environment variables in path.""" expanded = os.path.expanduser(path) expanded = os.path.expandvars(expanded) return Path(expanded) def path_normalize(path: Path, make_absolute: bool = True) -> Path: """Normalize a path for the current platform.""" normalized = Path(path).resolve() if make_absolute else Path(path) return normalized def compute_file_hash(file_path: Path) -> str: """Compute MD5 hash of a file for change detection.""" if not file_path.exists() or not file_path.is_file(): return "" hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def compute_content_hash(content: str) -> str: """Compute MD5 hash of a string content.""" return hashlib.md5(content.encode()).hexdigest() class FileOps: """Utility class for file operations with dry-run support.""" def __init__(self, dry_run: bool = False, logger=None): self.dry_run = dry_run self.logger = logger self._operations: List[Dict] = [] def log_operation(self, operation: Dict): """Log an operation for dry-run mode.""" self._operations.append(operation) if self.logger: self.logger.debug(f"[DRY-RUN] Would execute: {operation}") def get_operations(self) -> List[Dict]: """Get list of logged operations.""" return self._operations.copy() def clear_operations(self): """Clear the operations log.""" self._operations.clear() def create_directory(self, path: Path, exist_ok: bool = True) -> bool: """Create a directory, optionally logging for dry-run.""" if self.dry_run: self.log_operation({ "action": "create_directory", "path": str(path), "exist_ok": exist_ok, }) return True try: path.mkdir(parents=True, exist_ok=exist_ok) return True except OSError as e: if self.logger: self.logger.error(f"Failed to create directory {path}: {e}") raise def copy_file( self, source: Path, destination: Path, preserve_metadata: bool = True, backup: bool = False, backup_dir: Optional[Path] = None, ) -> bool: """Copy a file with optional backup.""" if not source.exists(): if self.logger: self.logger.error(f"Source file does not exist: {source}") return False if backup and destination.exists(): if backup_dir is None: backup_dir = destination.parent if self.dry_run: self.log_operation({ "action": "backup_file", "source": str(destination), "backup_dir": str(backup_dir), }) else: self._backup_file(destination, backup_dir) if self.dry_run: self.log_operation({ "action": "copy_file", "source": str(source), "destination": str(destination), "preserve_metadata": preserve_metadata, }) return True try: if preserve_metadata: shutil.copy2(str(source), str(destination)) else: shutil.copy(str(source), str(destination)) return True except OSError as e: if self.logger: self.logger.error(f"Failed to copy {source} to {destination}: {e}") raise def _backup_file(self, file_path: Path, backup_dir: Path) -> Path: """Create a backup of a file.""" backup_dir = path_expand(str(backup_dir)) backup_dir.mkdir(parents=True, exist_ok=True) timestamp = file_path.stat().st_mtime backup_name = f"{file_path.name}.{int(timestamp)}" backup_path = backup_dir / backup_name shutil.copy2(str(file_path), str(backup_path)) return backup_path def create_symlink( self, source: Path, target: Path, force: bool = False, backup: bool = False, backup_dir: Optional[Path] = None, ) -> bool: """Create a symbolic link.""" source = path_expand(str(source)) target = path_expand(str(target)) if target.is_symlink(): target.unlink() elif target.exists(): if backup: if backup_dir is None: backup_dir = target.parent if self.dry_run: self.log_operation({ "action": "backup_file", "source": str(target), "backup_dir": str(backup_dir), }) else: self._backup_file(target, backup_dir) elif force: if self.dry_run: self.log_operation({ "action": "remove_file", "path": str(target), }) else: target.unlink() else: if self.logger: self.logger.error(f"Target already exists and not forcing: {target}") return False if self.dry_run: self.log_operation({ "action": "create_symlink", "source": str(source), "target": str(target), }) return True try: target.symlink_to(source) return True except OSError as e: if self.logger: self.logger.error(f"Failed to create symlink {target} -> {source}: {e}") raise def remove_file(self, path: Path, force: bool = False) -> bool: """Remove a file or symlink.""" path = path_expand(str(path)) if not path.exists() and not force: if self.logger: self.logger.warning(f"File does not exist: {path}") return False if self.dry_run: self.log_operation({ "action": "remove_file", "path": str(path), }) return True try: if path.is_symlink(): path.unlink() else: path.unlink() return True except OSError as e: if self.logger: self.logger.error(f"Failed to remove {path}: {e}") raise def compare_files(self, file1: Path, file2: Path) -> bool: """Compare two files for equality.""" f1 = path_expand(str(file1)) f2 = path_expand(str(file2)) if not f1.exists() or not f2.exists(): return False return compute_file_hash(f1) == compute_file_hash(f2) def read_file(self, path: Path) -> Optional[str]: """Read file contents.""" path = path_expand(str(path)) try: with open(path, 'r', encoding='utf-8') as f: return f.read() except (IOError, UnicodeDecodeError) as e: if self.logger: self.logger.error(f"Failed to read {path}: {e}") return None def write_file(self, path: Path, content: str, overwrite: bool = False) -> bool: """Write content to a file.""" path = path_expand(str(path)) if path.exists() and not overwrite: if self.logger: self.logger.error(f"File already exists: {path}") return False if self.dry_run: self.log_operation({ "action": "write_file", "path": str(path), "content_length": len(content), }) return True try: path.parent.mkdir(parents=True, exist_ok=True) with open(path, 'w', encoding='utf-8') as f: f.write(content) return True except IOError as e: if self.logger: self.logger.error(f"Failed to write {path}: {e}") raise def file_exists(self, path: Path) -> bool: """Check if a file exists.""" return path_expand(str(path)).exists() def is_symlink(self, path: Path) -> bool: """Check if path is a symlink.""" return path_expand(str(path)).is_symlink() def get_file_info(self, path: Path) -> Optional[Dict]: """Get information about a file.""" path = path_expand(str(path)) try: stat = path.stat() return { "path": str(path), "exists": path.exists(), "is_symlink": path.is_symlink(), "is_file": path.is_file(), "is_dir": path.is_dir(), "size": stat.st_size, "mtime": stat.st_mtime, "mode": stat.st_mode, } except OSError: return None def merge_files( self, source1: Path, source2: Path, destination: Path, strategy: str = "source2_wins", ) -> bool: """Merge two files into one.""" s1 = path_expand(str(source1)) s2 = path_expand(str(source2)) dest = path_expand(str(destination)) content1 = self.read_file(s1) or "" content2 = self.read_file(s2) or "" if strategy == "source1_wins": merged = content1 + content2 elif strategy == "source2_wins": merged = content2 + content1 elif strategy == "append": merged = content1 + "\n" + content2 else: merged = content2 return self.write_file(dest, merged)