This commit is contained in:
250
src/watchers/file_watcher.py
Normal file
250
src/watchers/file_watcher.py
Normal file
@@ -0,0 +1,250 @@
|
||||
"""File watcher module for monitoring file changes."""
|
||||
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
from typing import Callable, List, Optional, Set
|
||||
from pathlib import Path
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import FileSystemEventHandler, FileSystemEvent
|
||||
from rich.console import Console
|
||||
|
||||
|
||||
class FileWatcherHandler(FileSystemEventHandler):
|
||||
"""Handler for file system events."""
|
||||
|
||||
def __init__(self, callback: Callable[[str], None], debounce_seconds: float = 1.0):
|
||||
"""Initialize the handler.
|
||||
|
||||
Args:
|
||||
callback: Function to call when file changes
|
||||
debounce_seconds: Debounce delay in seconds
|
||||
"""
|
||||
self.callback = callback
|
||||
self.debounce_seconds = debounce_seconds
|
||||
self.last_event_time: dict = {}
|
||||
self.lock = threading.Lock()
|
||||
self._pending_files: Set[str] = set()
|
||||
|
||||
def _should_process(self, filepath: str) -> bool:
|
||||
"""Check if file should be processed (debounced).
|
||||
|
||||
Args:
|
||||
filepath: Path to the file
|
||||
|
||||
Returns:
|
||||
True if file should be processed
|
||||
"""
|
||||
current_time = time.time()
|
||||
with self.lock:
|
||||
if filepath in self._pending_files:
|
||||
return False
|
||||
|
||||
last_time = self.last_event_time.get(filepath, 0)
|
||||
if current_time - last_time < self.debounce_seconds:
|
||||
self._pending_files.add(filepath)
|
||||
return False
|
||||
|
||||
self.last_event_time[filepath] = current_time
|
||||
return True
|
||||
|
||||
def on_modified(self, event: FileSystemEvent) -> None:
|
||||
"""Handle file modification events.
|
||||
|
||||
Args:
|
||||
event: File system event
|
||||
"""
|
||||
if event.is_directory:
|
||||
return
|
||||
|
||||
filepath = str(event.src_path)
|
||||
if self._should_process(filepath):
|
||||
self.callback(filepath)
|
||||
|
||||
def on_created(self, event: FileSystemEvent) -> None:
|
||||
"""Handle file creation events.
|
||||
|
||||
Args:
|
||||
event: File system event
|
||||
"""
|
||||
if event.is_directory:
|
||||
return
|
||||
|
||||
filepath = str(event.src_path)
|
||||
if self._should_process(filepath):
|
||||
self.callback(filepath)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Clean up pending files."""
|
||||
with self.lock:
|
||||
self._pending_files.clear()
|
||||
|
||||
|
||||
class FileWatcher:
|
||||
"""Watches files for changes and triggers callbacks."""
|
||||
|
||||
def __init__(self, console: Console = None):
|
||||
"""Initialize the file watcher.
|
||||
|
||||
Args:
|
||||
console: Rich Console instance for output
|
||||
"""
|
||||
self.console = console or Console()
|
||||
self.observer = Observer()
|
||||
self.handler: Optional[FileWatcherHandler] = None
|
||||
self.running = False
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
self._convert_callback: Optional[Callable] = None
|
||||
|
||||
def _default_callback(self, filepath: str) -> None:
|
||||
"""Default callback for file changes.
|
||||
|
||||
Args:
|
||||
filepath: Path to the changed file
|
||||
"""
|
||||
if self._convert_callback:
|
||||
try:
|
||||
self._convert_callback(filepath)
|
||||
self.console.print(f"Processed: {filepath}", style='green')
|
||||
except Exception as e:
|
||||
self.console.print(f"Error processing {filepath}: {e}", style='red')
|
||||
|
||||
def watch(
|
||||
self,
|
||||
paths: List[str],
|
||||
callback: Callable[[str], None] = None,
|
||||
debounce_seconds: float = 1.0
|
||||
) -> None:
|
||||
"""Start watching files for changes.
|
||||
|
||||
Args:
|
||||
paths: List of paths to watch (files or directories)
|
||||
callback: Optional callback function
|
||||
debounce_seconds: Debounce delay in seconds
|
||||
"""
|
||||
handler_callback = callback or self._default_callback
|
||||
self.handler = FileWatcherHandler(handler_callback, debounce_seconds)
|
||||
|
||||
for path in paths:
|
||||
path_obj = Path(path)
|
||||
if path_obj.exists():
|
||||
self.observer.schedule(self.handler, str(path_obj), recursive=False)
|
||||
self.console.print(f"Watching: {path}", style='cyan')
|
||||
else:
|
||||
self.console.print(f"Path not found: {path}", style='red')
|
||||
|
||||
self.running = True
|
||||
self.observer.start()
|
||||
|
||||
def watch_with_conversion(
|
||||
self,
|
||||
paths: List[str],
|
||||
converter_func: Callable[[str], None],
|
||||
debounce_seconds: float = 1.0
|
||||
) -> None:
|
||||
"""Watch files and convert on changes.
|
||||
|
||||
Args:
|
||||
paths: List of paths to watch
|
||||
converter_func: Function that converts a file
|
||||
debounce_seconds: Debounce delay in seconds
|
||||
"""
|
||||
self._convert_callback = converter_func
|
||||
self.watch(paths, debounce_seconds=debounce_seconds)
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop watching files."""
|
||||
if self.running:
|
||||
self.observer.stop()
|
||||
self.observer.join()
|
||||
self.running = False
|
||||
if self.handler:
|
||||
self.handler.cleanup()
|
||||
self.console.print("File watcher stopped", style='yellow')
|
||||
|
||||
def run_forever(self) -> None:
|
||||
"""Run the watcher in the foreground until interrupted."""
|
||||
self.console.print("Press Ctrl+C to stop watching", style='dim')
|
||||
try:
|
||||
while self.running:
|
||||
time.sleep(0.5)
|
||||
except KeyboardInterrupt:
|
||||
self.stop()
|
||||
|
||||
@staticmethod
|
||||
def convert_file(
|
||||
filepath: str,
|
||||
output_dir: str,
|
||||
target_format: str,
|
||||
source_format: str = None
|
||||
) -> Optional[str]:
|
||||
"""Convert a single file.
|
||||
|
||||
Args:
|
||||
filepath: Path to source file
|
||||
output_dir: Output directory
|
||||
target_format: Target format
|
||||
source_format: Source format (auto-detected if None)
|
||||
|
||||
Returns:
|
||||
Path to output file or None on error
|
||||
"""
|
||||
from src.validators import detect_format
|
||||
from src.converters import get_converter as gc
|
||||
|
||||
if not source_format:
|
||||
source_format = detect_format(filepath)
|
||||
if not source_format:
|
||||
return None
|
||||
|
||||
try:
|
||||
converter = gc(source_format)
|
||||
target_converter = gc(target_format)
|
||||
|
||||
content = converter.read_file(filepath)
|
||||
result = converter.convert(content, target_converter)
|
||||
|
||||
if result.success:
|
||||
filename = os.path.basename(filepath)
|
||||
name_without_ext = os.path.splitext(filename)[0]
|
||||
ext = '.' + target_format
|
||||
output_path = os.path.join(output_dir, name_without_ext + ext)
|
||||
|
||||
target_converter.write_file(output_path, result.data)
|
||||
return output_path
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def watch_files(
|
||||
paths: List[str],
|
||||
output_dir: str,
|
||||
target_format: str,
|
||||
debounce_seconds: float = 1.0,
|
||||
console: Console = None
|
||||
) -> None:
|
||||
"""Convenience function to watch files and convert on change.
|
||||
|
||||
Args:
|
||||
paths: List of paths to watch
|
||||
output_dir: Output directory for converted files
|
||||
target_format: Target format
|
||||
debounce_seconds: Debounce delay in seconds
|
||||
console: Rich Console instance
|
||||
"""
|
||||
console = console or Console()
|
||||
|
||||
def convert_func(filepath: str) -> None:
|
||||
output = FileWatcher.convert_file(
|
||||
filepath, output_dir, target_format
|
||||
)
|
||||
if output:
|
||||
console.print(f"Converted: {filepath} -> {output}", style='green')
|
||||
else:
|
||||
console.print(f"Conversion failed: {filepath}", style='red')
|
||||
|
||||
watcher = FileWatcher(console)
|
||||
watcher.watch_with_conversion(paths, convert_func, debounce_seconds)
|
||||
watcher.run_forever()
|
||||
Reference in New Issue
Block a user