322 lines
10 KiB
Python
322 lines
10 KiB
Python
"""File operations utilities for dev-env-sync."""
|
|
|
|
import hashlib
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
def path_expand(path: str) -> Path:
|
|
"""Expand user home directory and environment variables in path."""
|
|
expanded = os.path.expanduser(path)
|
|
expanded = os.path.expandvars(expanded)
|
|
return Path(expanded)
|
|
|
|
|
|
def path_normalize(path: Path, make_absolute: bool = True) -> Path:
|
|
"""Normalize a path for the current platform."""
|
|
normalized = Path(path).resolve() if make_absolute else Path(path)
|
|
return normalized
|
|
|
|
|
|
def compute_file_hash(file_path: Path) -> str:
|
|
"""Compute MD5 hash of a file for change detection."""
|
|
if not file_path.exists() or not file_path.is_file():
|
|
return ""
|
|
|
|
hash_md5 = hashlib.md5()
|
|
with open(file_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
hash_md5.update(chunk)
|
|
return hash_md5.hexdigest()
|
|
|
|
|
|
def compute_content_hash(content: str) -> str:
|
|
"""Compute MD5 hash of a string content."""
|
|
return hashlib.md5(content.encode()).hexdigest()
|
|
|
|
|
|
class FileOps:
|
|
"""Utility class for file operations with dry-run support."""
|
|
|
|
def __init__(self, dry_run: bool = False, logger=None):
|
|
self.dry_run = dry_run
|
|
self.logger = logger
|
|
self._operations: List[Dict] = []
|
|
|
|
def log_operation(self, operation: Dict):
|
|
"""Log an operation for dry-run mode."""
|
|
self._operations.append(operation)
|
|
if self.logger:
|
|
self.logger.debug(f"[DRY-RUN] Would execute: {operation}")
|
|
|
|
def get_operations(self) -> List[Dict]:
|
|
"""Get list of logged operations."""
|
|
return self._operations.copy()
|
|
|
|
def clear_operations(self):
|
|
"""Clear the operations log."""
|
|
self._operations.clear()
|
|
|
|
def create_directory(self, path: Path, exist_ok: bool = True) -> bool:
|
|
"""Create a directory, optionally logging for dry-run."""
|
|
if self.dry_run:
|
|
self.log_operation({
|
|
"action": "create_directory",
|
|
"path": str(path),
|
|
"exist_ok": exist_ok,
|
|
})
|
|
return True
|
|
|
|
try:
|
|
path.mkdir(parents=True, exist_ok=exist_ok)
|
|
return True
|
|
except OSError as e:
|
|
if self.logger:
|
|
self.logger.error(f"Failed to create directory {path}: {e}")
|
|
raise
|
|
|
|
def copy_file(
|
|
self,
|
|
source: Path,
|
|
destination: Path,
|
|
preserve_metadata: bool = True,
|
|
backup: bool = False,
|
|
backup_dir: Optional[Path] = None,
|
|
) -> bool:
|
|
"""Copy a file with optional backup."""
|
|
if not source.exists():
|
|
if self.logger:
|
|
self.logger.error(f"Source file does not exist: {source}")
|
|
return False
|
|
|
|
if backup and destination.exists():
|
|
if backup_dir is None:
|
|
backup_dir = destination.parent
|
|
|
|
if self.dry_run:
|
|
self.log_operation({
|
|
"action": "backup_file",
|
|
"source": str(destination),
|
|
"backup_dir": str(backup_dir),
|
|
})
|
|
else:
|
|
self._backup_file(destination, backup_dir)
|
|
|
|
if self.dry_run:
|
|
self.log_operation({
|
|
"action": "copy_file",
|
|
"source": str(source),
|
|
"destination": str(destination),
|
|
"preserve_metadata": preserve_metadata,
|
|
})
|
|
return True
|
|
|
|
try:
|
|
if preserve_metadata:
|
|
shutil.copy2(str(source), str(destination))
|
|
else:
|
|
shutil.copy(str(source), str(destination))
|
|
return True
|
|
except OSError as e:
|
|
if self.logger:
|
|
self.logger.error(f"Failed to copy {source} to {destination}: {e}")
|
|
raise
|
|
|
|
def _backup_file(self, file_path: Path, backup_dir: Path) -> Path:
|
|
"""Create a backup of a file."""
|
|
backup_dir = path_expand(str(backup_dir))
|
|
backup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = file_path.stat().st_mtime
|
|
backup_name = f"{file_path.name}.{int(timestamp)}"
|
|
backup_path = backup_dir / backup_name
|
|
|
|
shutil.copy2(str(file_path), str(backup_path))
|
|
return backup_path
|
|
|
|
def create_symlink(
|
|
self,
|
|
source: Path,
|
|
target: Path,
|
|
force: bool = False,
|
|
backup: bool = False,
|
|
backup_dir: Optional[Path] = None,
|
|
) -> bool:
|
|
"""Create a symbolic link."""
|
|
source = path_expand(str(source))
|
|
target = path_expand(str(target))
|
|
|
|
if target.is_symlink():
|
|
target.unlink()
|
|
elif target.exists():
|
|
if backup:
|
|
if backup_dir is None:
|
|
backup_dir = target.parent
|
|
if self.dry_run:
|
|
self.log_operation({
|
|
"action": "backup_file",
|
|
"source": str(target),
|
|
"backup_dir": str(backup_dir),
|
|
})
|
|
else:
|
|
self._backup_file(target, backup_dir)
|
|
elif force:
|
|
if self.dry_run:
|
|
self.log_operation({
|
|
"action": "remove_file",
|
|
"path": str(target),
|
|
})
|
|
else:
|
|
target.unlink()
|
|
else:
|
|
if self.logger:
|
|
self.logger.error(f"Target already exists and not forcing: {target}")
|
|
return False
|
|
|
|
if self.dry_run:
|
|
self.log_operation({
|
|
"action": "create_symlink",
|
|
"source": str(source),
|
|
"target": str(target),
|
|
})
|
|
return True
|
|
|
|
try:
|
|
target.symlink_to(source)
|
|
return True
|
|
except OSError as e:
|
|
if self.logger:
|
|
self.logger.error(f"Failed to create symlink {target} -> {source}: {e}")
|
|
raise
|
|
|
|
def remove_file(self, path: Path, force: bool = False) -> bool:
|
|
"""Remove a file or symlink."""
|
|
path = path_expand(str(path))
|
|
|
|
if not path.exists() and not force:
|
|
if self.logger:
|
|
self.logger.warning(f"File does not exist: {path}")
|
|
return False
|
|
|
|
if self.dry_run:
|
|
self.log_operation({
|
|
"action": "remove_file",
|
|
"path": str(path),
|
|
})
|
|
return True
|
|
|
|
try:
|
|
if path.is_symlink():
|
|
path.unlink()
|
|
else:
|
|
path.unlink()
|
|
return True
|
|
except OSError as e:
|
|
if self.logger:
|
|
self.logger.error(f"Failed to remove {path}: {e}")
|
|
raise
|
|
|
|
def compare_files(self, file1: Path, file2: Path) -> bool:
|
|
"""Compare two files for equality."""
|
|
f1 = path_expand(str(file1))
|
|
f2 = path_expand(str(file2))
|
|
|
|
if not f1.exists() or not f2.exists():
|
|
return False
|
|
|
|
return compute_file_hash(f1) == compute_file_hash(f2)
|
|
|
|
def read_file(self, path: Path) -> Optional[str]:
|
|
"""Read file contents."""
|
|
path = path_expand(str(path))
|
|
|
|
try:
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
return f.read()
|
|
except (IOError, UnicodeDecodeError) as e:
|
|
if self.logger:
|
|
self.logger.error(f"Failed to read {path}: {e}")
|
|
return None
|
|
|
|
def write_file(self, path: Path, content: str, overwrite: bool = False) -> bool:
|
|
"""Write content to a file."""
|
|
path = path_expand(str(path))
|
|
|
|
if path.exists() and not overwrite:
|
|
if self.logger:
|
|
self.logger.error(f"File already exists: {path}")
|
|
return False
|
|
|
|
if self.dry_run:
|
|
self.log_operation({
|
|
"action": "write_file",
|
|
"path": str(path),
|
|
"content_length": len(content),
|
|
})
|
|
return True
|
|
|
|
try:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
return True
|
|
except IOError as e:
|
|
if self.logger:
|
|
self.logger.error(f"Failed to write {path}: {e}")
|
|
raise
|
|
|
|
def file_exists(self, path: Path) -> bool:
|
|
"""Check if a file exists."""
|
|
return path_expand(str(path)).exists()
|
|
|
|
def is_symlink(self, path: Path) -> bool:
|
|
"""Check if path is a symlink."""
|
|
return path_expand(str(path)).is_symlink()
|
|
|
|
def get_file_info(self, path: Path) -> Optional[Dict]:
|
|
"""Get information about a file."""
|
|
path = path_expand(str(path))
|
|
|
|
try:
|
|
stat = path.stat()
|
|
return {
|
|
"path": str(path),
|
|
"exists": path.exists(),
|
|
"is_symlink": path.is_symlink(),
|
|
"is_file": path.is_file(),
|
|
"is_dir": path.is_dir(),
|
|
"size": stat.st_size,
|
|
"mtime": stat.st_mtime,
|
|
"mode": stat.st_mode,
|
|
}
|
|
except OSError:
|
|
return None
|
|
|
|
def merge_files(
|
|
self,
|
|
source1: Path,
|
|
source2: Path,
|
|
destination: Path,
|
|
strategy: str = "source2_wins",
|
|
) -> bool:
|
|
"""Merge two files into one."""
|
|
s1 = path_expand(str(source1))
|
|
s2 = path_expand(str(source2))
|
|
dest = path_expand(str(destination))
|
|
|
|
content1 = self.read_file(s1) or ""
|
|
content2 = self.read_file(s2) or ""
|
|
|
|
if strategy == "source1_wins":
|
|
merged = content1 + content2
|
|
elif strategy == "source2_wins":
|
|
merged = content2 + content1
|
|
elif strategy == "append":
|
|
merged = content1 + "\n" + content2
|
|
else:
|
|
merged = content2
|
|
|
|
return self.write_file(dest, merged)
|