From edcedf99602978b5decd0a27e7b272f5f31b5952 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Fri, 30 Jan 2026 04:11:15 +0000 Subject: [PATCH] Add utility modules: file operations and logging --- dev_env_sync/utils/file_ops.py | 321 +++++++++++++++++++++++++++++++++ 1 file changed, 321 insertions(+) create mode 100644 dev_env_sync/utils/file_ops.py diff --git a/dev_env_sync/utils/file_ops.py b/dev_env_sync/utils/file_ops.py new file mode 100644 index 0000000..b9975b1 --- /dev/null +++ b/dev_env_sync/utils/file_ops.py @@ -0,0 +1,321 @@ +"""File operations utilities for dev-env-sync.""" + +import hashlib +import os +import shutil +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +def path_expand(path: str) -> Path: + """Expand user home directory and environment variables in path.""" + expanded = os.path.expanduser(path) + expanded = os.path.expandvars(expanded) + return Path(expanded) + + +def path_normalize(path: Path, make_absolute: bool = True) -> Path: + """Normalize a path for the current platform.""" + normalized = Path(path).resolve() if make_absolute else Path(path) + return normalized + + +def compute_file_hash(file_path: Path) -> str: + """Compute MD5 hash of a file for change detection.""" + if not file_path.exists() or not file_path.is_file(): + return "" + + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + +def compute_content_hash(content: str) -> str: + """Compute MD5 hash of a string content.""" + return hashlib.md5(content.encode()).hexdigest() + + +class FileOps: + """Utility class for file operations with dry-run support.""" + + def __init__(self, dry_run: bool = False, logger=None): + self.dry_run = dry_run + self.logger = logger + self._operations: List[Dict] = [] + + def log_operation(self, operation: Dict): + """Log an operation for dry-run mode.""" + self._operations.append(operation) + if self.logger: + self.logger.debug(f"[DRY-RUN] Would execute: {operation}") + + def get_operations(self) -> List[Dict]: + """Get list of logged operations.""" + return self._operations.copy() + + def clear_operations(self): + """Clear the operations log.""" + self._operations.clear() + + def create_directory(self, path: Path, exist_ok: bool = True) -> bool: + """Create a directory, optionally logging for dry-run.""" + if self.dry_run: + self.log_operation({ + "action": "create_directory", + "path": str(path), + "exist_ok": exist_ok, + }) + return True + + try: + path.mkdir(parents=True, exist_ok=exist_ok) + return True + except OSError as e: + if self.logger: + self.logger.error(f"Failed to create directory {path}: {e}") + raise + + def copy_file( + self, + source: Path, + destination: Path, + preserve_metadata: bool = True, + backup: bool = False, + backup_dir: Optional[Path] = None, + ) -> bool: + """Copy a file with optional backup.""" + if not source.exists(): + if self.logger: + self.logger.error(f"Source file does not exist: {source}") + return False + + if backup and destination.exists(): + if backup_dir is None: + backup_dir = destination.parent + + if self.dry_run: + self.log_operation({ + "action": "backup_file", + "source": str(destination), + "backup_dir": str(backup_dir), + }) + else: + self._backup_file(destination, backup_dir) + + if self.dry_run: + self.log_operation({ + "action": "copy_file", + "source": str(source), + "destination": str(destination), + "preserve_metadata": preserve_metadata, + }) + return True + + try: + if preserve_metadata: + shutil.copy2(str(source), str(destination)) + else: + shutil.copy(str(source), str(destination)) + return True + except OSError as e: + if self.logger: + self.logger.error(f"Failed to copy {source} to {destination}: {e}") + raise + + def _backup_file(self, file_path: Path, backup_dir: Path) -> Path: + """Create a backup of a file.""" + backup_dir = path_expand(str(backup_dir)) + backup_dir.mkdir(parents=True, exist_ok=True) + + timestamp = file_path.stat().st_mtime + backup_name = f"{file_path.name}.{int(timestamp)}" + backup_path = backup_dir / backup_name + + shutil.copy2(str(file_path), str(backup_path)) + return backup_path + + def create_symlink( + self, + source: Path, + target: Path, + force: bool = False, + backup: bool = False, + backup_dir: Optional[Path] = None, + ) -> bool: + """Create a symbolic link.""" + source = path_expand(str(source)) + target = path_expand(str(target)) + + if target.is_symlink(): + target.unlink() + elif target.exists(): + if backup: + if backup_dir is None: + backup_dir = target.parent + if self.dry_run: + self.log_operation({ + "action": "backup_file", + "source": str(target), + "backup_dir": str(backup_dir), + }) + else: + self._backup_file(target, backup_dir) + elif force: + if self.dry_run: + self.log_operation({ + "action": "remove_file", + "path": str(target), + }) + else: + target.unlink() + else: + if self.logger: + self.logger.error(f"Target already exists and not forcing: {target}") + return False + + if self.dry_run: + self.log_operation({ + "action": "create_symlink", + "source": str(source), + "target": str(target), + }) + return True + + try: + target.symlink_to(source) + return True + except OSError as e: + if self.logger: + self.logger.error(f"Failed to create symlink {target} -> {source}: {e}") + raise + + def remove_file(self, path: Path, force: bool = False) -> bool: + """Remove a file or symlink.""" + path = path_expand(str(path)) + + if not path.exists() and not force: + if self.logger: + self.logger.warning(f"File does not exist: {path}") + return False + + if self.dry_run: + self.log_operation({ + "action": "remove_file", + "path": str(path), + }) + return True + + try: + if path.is_symlink(): + path.unlink() + else: + path.unlink() + return True + except OSError as e: + if self.logger: + self.logger.error(f"Failed to remove {path}: {e}") + raise + + def compare_files(self, file1: Path, file2: Path) -> bool: + """Compare two files for equality.""" + f1 = path_expand(str(file1)) + f2 = path_expand(str(file2)) + + if not f1.exists() or not f2.exists(): + return False + + return compute_file_hash(f1) == compute_file_hash(f2) + + def read_file(self, path: Path) -> Optional[str]: + """Read file contents.""" + path = path_expand(str(path)) + + try: + with open(path, 'r', encoding='utf-8') as f: + return f.read() + except (IOError, UnicodeDecodeError) as e: + if self.logger: + self.logger.error(f"Failed to read {path}: {e}") + return None + + def write_file(self, path: Path, content: str, overwrite: bool = False) -> bool: + """Write content to a file.""" + path = path_expand(str(path)) + + if path.exists() and not overwrite: + if self.logger: + self.logger.error(f"File already exists: {path}") + return False + + if self.dry_run: + self.log_operation({ + "action": "write_file", + "path": str(path), + "content_length": len(content), + }) + return True + + try: + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, 'w', encoding='utf-8') as f: + f.write(content) + return True + except IOError as e: + if self.logger: + self.logger.error(f"Failed to write {path}: {e}") + raise + + def file_exists(self, path: Path) -> bool: + """Check if a file exists.""" + return path_expand(str(path)).exists() + + def is_symlink(self, path: Path) -> bool: + """Check if path is a symlink.""" + return path_expand(str(path)).is_symlink() + + def get_file_info(self, path: Path) -> Optional[Dict]: + """Get information about a file.""" + path = path_expand(str(path)) + + try: + stat = path.stat() + return { + "path": str(path), + "exists": path.exists(), + "is_symlink": path.is_symlink(), + "is_file": path.is_file(), + "is_dir": path.is_dir(), + "size": stat.st_size, + "mtime": stat.st_mtime, + "mode": stat.st_mode, + } + except OSError: + return None + + def merge_files( + self, + source1: Path, + source2: Path, + destination: Path, + strategy: str = "source2_wins", + ) -> bool: + """Merge two files into one.""" + s1 = path_expand(str(source1)) + s2 = path_expand(str(source2)) + dest = path_expand(str(destination)) + + content1 = self.read_file(s1) or "" + content2 = self.read_file(s2) or "" + + if strategy == "source1_wins": + merged = content1 + content2 + elif strategy == "source2_wins": + merged = content2 + content1 + elif strategy == "append": + merged = content1 + "\n" + content2 + else: + merged = content2 + + return self.write_file(dest, merged)