Add utility modules: file operations and logging
Some checks failed
CI / test (push) Has been cancelled
Some checks failed
CI / test (push) Has been cancelled
This commit is contained in:
321
dev_env_sync/utils/file_ops.py
Normal file
321
dev_env_sync/utils/file_ops.py
Normal file
@@ -0,0 +1,321 @@
|
||||
"""File operations utilities for dev-env-sync."""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
def path_expand(path: str) -> Path:
|
||||
"""Expand user home directory and environment variables in path."""
|
||||
expanded = os.path.expanduser(path)
|
||||
expanded = os.path.expandvars(expanded)
|
||||
return Path(expanded)
|
||||
|
||||
|
||||
def path_normalize(path: Path, make_absolute: bool = True) -> Path:
|
||||
"""Normalize a path for the current platform."""
|
||||
normalized = Path(path).resolve() if make_absolute else Path(path)
|
||||
return normalized
|
||||
|
||||
|
||||
def compute_file_hash(file_path: Path) -> str:
|
||||
"""Compute MD5 hash of a file for change detection."""
|
||||
if not file_path.exists() or not file_path.is_file():
|
||||
return ""
|
||||
|
||||
hash_md5 = hashlib.md5()
|
||||
with open(file_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
hash_md5.update(chunk)
|
||||
return hash_md5.hexdigest()
|
||||
|
||||
|
||||
def compute_content_hash(content: str) -> str:
|
||||
"""Compute MD5 hash of a string content."""
|
||||
return hashlib.md5(content.encode()).hexdigest()
|
||||
|
||||
|
||||
class FileOps:
|
||||
"""Utility class for file operations with dry-run support."""
|
||||
|
||||
def __init__(self, dry_run: bool = False, logger=None):
|
||||
self.dry_run = dry_run
|
||||
self.logger = logger
|
||||
self._operations: List[Dict] = []
|
||||
|
||||
def log_operation(self, operation: Dict):
|
||||
"""Log an operation for dry-run mode."""
|
||||
self._operations.append(operation)
|
||||
if self.logger:
|
||||
self.logger.debug(f"[DRY-RUN] Would execute: {operation}")
|
||||
|
||||
def get_operations(self) -> List[Dict]:
|
||||
"""Get list of logged operations."""
|
||||
return self._operations.copy()
|
||||
|
||||
def clear_operations(self):
|
||||
"""Clear the operations log."""
|
||||
self._operations.clear()
|
||||
|
||||
def create_directory(self, path: Path, exist_ok: bool = True) -> bool:
|
||||
"""Create a directory, optionally logging for dry-run."""
|
||||
if self.dry_run:
|
||||
self.log_operation({
|
||||
"action": "create_directory",
|
||||
"path": str(path),
|
||||
"exist_ok": exist_ok,
|
||||
})
|
||||
return True
|
||||
|
||||
try:
|
||||
path.mkdir(parents=True, exist_ok=exist_ok)
|
||||
return True
|
||||
except OSError as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Failed to create directory {path}: {e}")
|
||||
raise
|
||||
|
||||
def copy_file(
|
||||
self,
|
||||
source: Path,
|
||||
destination: Path,
|
||||
preserve_metadata: bool = True,
|
||||
backup: bool = False,
|
||||
backup_dir: Optional[Path] = None,
|
||||
) -> bool:
|
||||
"""Copy a file with optional backup."""
|
||||
if not source.exists():
|
||||
if self.logger:
|
||||
self.logger.error(f"Source file does not exist: {source}")
|
||||
return False
|
||||
|
||||
if backup and destination.exists():
|
||||
if backup_dir is None:
|
||||
backup_dir = destination.parent
|
||||
|
||||
if self.dry_run:
|
||||
self.log_operation({
|
||||
"action": "backup_file",
|
||||
"source": str(destination),
|
||||
"backup_dir": str(backup_dir),
|
||||
})
|
||||
else:
|
||||
self._backup_file(destination, backup_dir)
|
||||
|
||||
if self.dry_run:
|
||||
self.log_operation({
|
||||
"action": "copy_file",
|
||||
"source": str(source),
|
||||
"destination": str(destination),
|
||||
"preserve_metadata": preserve_metadata,
|
||||
})
|
||||
return True
|
||||
|
||||
try:
|
||||
if preserve_metadata:
|
||||
shutil.copy2(str(source), str(destination))
|
||||
else:
|
||||
shutil.copy(str(source), str(destination))
|
||||
return True
|
||||
except OSError as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Failed to copy {source} to {destination}: {e}")
|
||||
raise
|
||||
|
||||
def _backup_file(self, file_path: Path, backup_dir: Path) -> Path:
|
||||
"""Create a backup of a file."""
|
||||
backup_dir = path_expand(str(backup_dir))
|
||||
backup_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
timestamp = file_path.stat().st_mtime
|
||||
backup_name = f"{file_path.name}.{int(timestamp)}"
|
||||
backup_path = backup_dir / backup_name
|
||||
|
||||
shutil.copy2(str(file_path), str(backup_path))
|
||||
return backup_path
|
||||
|
||||
def create_symlink(
|
||||
self,
|
||||
source: Path,
|
||||
target: Path,
|
||||
force: bool = False,
|
||||
backup: bool = False,
|
||||
backup_dir: Optional[Path] = None,
|
||||
) -> bool:
|
||||
"""Create a symbolic link."""
|
||||
source = path_expand(str(source))
|
||||
target = path_expand(str(target))
|
||||
|
||||
if target.is_symlink():
|
||||
target.unlink()
|
||||
elif target.exists():
|
||||
if backup:
|
||||
if backup_dir is None:
|
||||
backup_dir = target.parent
|
||||
if self.dry_run:
|
||||
self.log_operation({
|
||||
"action": "backup_file",
|
||||
"source": str(target),
|
||||
"backup_dir": str(backup_dir),
|
||||
})
|
||||
else:
|
||||
self._backup_file(target, backup_dir)
|
||||
elif force:
|
||||
if self.dry_run:
|
||||
self.log_operation({
|
||||
"action": "remove_file",
|
||||
"path": str(target),
|
||||
})
|
||||
else:
|
||||
target.unlink()
|
||||
else:
|
||||
if self.logger:
|
||||
self.logger.error(f"Target already exists and not forcing: {target}")
|
||||
return False
|
||||
|
||||
if self.dry_run:
|
||||
self.log_operation({
|
||||
"action": "create_symlink",
|
||||
"source": str(source),
|
||||
"target": str(target),
|
||||
})
|
||||
return True
|
||||
|
||||
try:
|
||||
target.symlink_to(source)
|
||||
return True
|
||||
except OSError as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Failed to create symlink {target} -> {source}: {e}")
|
||||
raise
|
||||
|
||||
def remove_file(self, path: Path, force: bool = False) -> bool:
|
||||
"""Remove a file or symlink."""
|
||||
path = path_expand(str(path))
|
||||
|
||||
if not path.exists() and not force:
|
||||
if self.logger:
|
||||
self.logger.warning(f"File does not exist: {path}")
|
||||
return False
|
||||
|
||||
if self.dry_run:
|
||||
self.log_operation({
|
||||
"action": "remove_file",
|
||||
"path": str(path),
|
||||
})
|
||||
return True
|
||||
|
||||
try:
|
||||
if path.is_symlink():
|
||||
path.unlink()
|
||||
else:
|
||||
path.unlink()
|
||||
return True
|
||||
except OSError as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Failed to remove {path}: {e}")
|
||||
raise
|
||||
|
||||
def compare_files(self, file1: Path, file2: Path) -> bool:
|
||||
"""Compare two files for equality."""
|
||||
f1 = path_expand(str(file1))
|
||||
f2 = path_expand(str(file2))
|
||||
|
||||
if not f1.exists() or not f2.exists():
|
||||
return False
|
||||
|
||||
return compute_file_hash(f1) == compute_file_hash(f2)
|
||||
|
||||
def read_file(self, path: Path) -> Optional[str]:
|
||||
"""Read file contents."""
|
||||
path = path_expand(str(path))
|
||||
|
||||
try:
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
return f.read()
|
||||
except (IOError, UnicodeDecodeError) as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Failed to read {path}: {e}")
|
||||
return None
|
||||
|
||||
def write_file(self, path: Path, content: str, overwrite: bool = False) -> bool:
|
||||
"""Write content to a file."""
|
||||
path = path_expand(str(path))
|
||||
|
||||
if path.exists() and not overwrite:
|
||||
if self.logger:
|
||||
self.logger.error(f"File already exists: {path}")
|
||||
return False
|
||||
|
||||
if self.dry_run:
|
||||
self.log_operation({
|
||||
"action": "write_file",
|
||||
"path": str(path),
|
||||
"content_length": len(content),
|
||||
})
|
||||
return True
|
||||
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
return True
|
||||
except IOError as e:
|
||||
if self.logger:
|
||||
self.logger.error(f"Failed to write {path}: {e}")
|
||||
raise
|
||||
|
||||
def file_exists(self, path: Path) -> bool:
|
||||
"""Check if a file exists."""
|
||||
return path_expand(str(path)).exists()
|
||||
|
||||
def is_symlink(self, path: Path) -> bool:
|
||||
"""Check if path is a symlink."""
|
||||
return path_expand(str(path)).is_symlink()
|
||||
|
||||
def get_file_info(self, path: Path) -> Optional[Dict]:
|
||||
"""Get information about a file."""
|
||||
path = path_expand(str(path))
|
||||
|
||||
try:
|
||||
stat = path.stat()
|
||||
return {
|
||||
"path": str(path),
|
||||
"exists": path.exists(),
|
||||
"is_symlink": path.is_symlink(),
|
||||
"is_file": path.is_file(),
|
||||
"is_dir": path.is_dir(),
|
||||
"size": stat.st_size,
|
||||
"mtime": stat.st_mtime,
|
||||
"mode": stat.st_mode,
|
||||
}
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
def merge_files(
|
||||
self,
|
||||
source1: Path,
|
||||
source2: Path,
|
||||
destination: Path,
|
||||
strategy: str = "source2_wins",
|
||||
) -> bool:
|
||||
"""Merge two files into one."""
|
||||
s1 = path_expand(str(source1))
|
||||
s2 = path_expand(str(source2))
|
||||
dest = path_expand(str(destination))
|
||||
|
||||
content1 = self.read_file(s1) or ""
|
||||
content2 = self.read_file(s2) or ""
|
||||
|
||||
if strategy == "source1_wins":
|
||||
merged = content1 + content2
|
||||
elif strategy == "source2_wins":
|
||||
merged = content2 + content1
|
||||
elif strategy == "append":
|
||||
merged = content1 + "\n" + content2
|
||||
else:
|
||||
merged = content2
|
||||
|
||||
return self.write_file(dest, merged)
|
||||
Reference in New Issue
Block a user