diff --git a/src/watchers/file_watcher.py b/src/watchers/file_watcher.py new file mode 100644 index 0000000..3fbee71 --- /dev/null +++ b/src/watchers/file_watcher.py @@ -0,0 +1,250 @@ +"""File watcher module for monitoring file changes.""" + +import os +import time +import threading +from typing import Callable, List, Optional, Set +from pathlib import Path +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler, FileSystemEvent +from rich.console import Console + + +class FileWatcherHandler(FileSystemEventHandler): + """Handler for file system events.""" + + def __init__(self, callback: Callable[[str], None], debounce_seconds: float = 1.0): + """Initialize the handler. + + Args: + callback: Function to call when file changes + debounce_seconds: Debounce delay in seconds + """ + self.callback = callback + self.debounce_seconds = debounce_seconds + self.last_event_time: dict = {} + self.lock = threading.Lock() + self._pending_files: Set[str] = set() + + def _should_process(self, filepath: str) -> bool: + """Check if file should be processed (debounced). + + Args: + filepath: Path to the file + + Returns: + True if file should be processed + """ + current_time = time.time() + with self.lock: + if filepath in self._pending_files: + return False + + last_time = self.last_event_time.get(filepath, 0) + if current_time - last_time < self.debounce_seconds: + self._pending_files.add(filepath) + return False + + self.last_event_time[filepath] = current_time + return True + + def on_modified(self, event: FileSystemEvent) -> None: + """Handle file modification events. + + Args: + event: File system event + """ + if event.is_directory: + return + + filepath = str(event.src_path) + if self._should_process(filepath): + self.callback(filepath) + + def on_created(self, event: FileSystemEvent) -> None: + """Handle file creation events. + + Args: + event: File system event + """ + if event.is_directory: + return + + filepath = str(event.src_path) + if self._should_process(filepath): + self.callback(filepath) + + def cleanup(self) -> None: + """Clean up pending files.""" + with self.lock: + self._pending_files.clear() + + +class FileWatcher: + """Watches files for changes and triggers callbacks.""" + + def __init__(self, console: Console = None): + """Initialize the file watcher. + + Args: + console: Rich Console instance for output + """ + self.console = console or Console() + self.observer = Observer() + self.handler: Optional[FileWatcherHandler] = None + self.running = False + self._thread: Optional[threading.Thread] = None + self._convert_callback: Optional[Callable] = None + + def _default_callback(self, filepath: str) -> None: + """Default callback for file changes. + + Args: + filepath: Path to the changed file + """ + if self._convert_callback: + try: + self._convert_callback(filepath) + self.console.print(f"Processed: {filepath}", style='green') + except Exception as e: + self.console.print(f"Error processing {filepath}: {e}", style='red') + + def watch( + self, + paths: List[str], + callback: Callable[[str], None] = None, + debounce_seconds: float = 1.0 + ) -> None: + """Start watching files for changes. + + Args: + paths: List of paths to watch (files or directories) + callback: Optional callback function + debounce_seconds: Debounce delay in seconds + """ + handler_callback = callback or self._default_callback + self.handler = FileWatcherHandler(handler_callback, debounce_seconds) + + for path in paths: + path_obj = Path(path) + if path_obj.exists(): + self.observer.schedule(self.handler, str(path_obj), recursive=False) + self.console.print(f"Watching: {path}", style='cyan') + else: + self.console.print(f"Path not found: {path}", style='red') + + self.running = True + self.observer.start() + + def watch_with_conversion( + self, + paths: List[str], + converter_func: Callable[[str], None], + debounce_seconds: float = 1.0 + ) -> None: + """Watch files and convert on changes. + + Args: + paths: List of paths to watch + converter_func: Function that converts a file + debounce_seconds: Debounce delay in seconds + """ + self._convert_callback = converter_func + self.watch(paths, debounce_seconds=debounce_seconds) + + def stop(self) -> None: + """Stop watching files.""" + if self.running: + self.observer.stop() + self.observer.join() + self.running = False + if self.handler: + self.handler.cleanup() + self.console.print("File watcher stopped", style='yellow') + + def run_forever(self) -> None: + """Run the watcher in the foreground until interrupted.""" + self.console.print("Press Ctrl+C to stop watching", style='dim') + try: + while self.running: + time.sleep(0.5) + except KeyboardInterrupt: + self.stop() + + @staticmethod + def convert_file( + filepath: str, + output_dir: str, + target_format: str, + source_format: str = None + ) -> Optional[str]: + """Convert a single file. + + Args: + filepath: Path to source file + output_dir: Output directory + target_format: Target format + source_format: Source format (auto-detected if None) + + Returns: + Path to output file or None on error + """ + from src.validators import detect_format + from src.converters import get_converter as gc + + if not source_format: + source_format = detect_format(filepath) + if not source_format: + return None + + try: + converter = gc(source_format) + target_converter = gc(target_format) + + content = converter.read_file(filepath) + result = converter.convert(content, target_converter) + + if result.success: + filename = os.path.basename(filepath) + name_without_ext = os.path.splitext(filename)[0] + ext = '.' + target_format + output_path = os.path.join(output_dir, name_without_ext + ext) + + target_converter.write_file(output_path, result.data) + return output_path + except Exception: + pass + + return None + + +def watch_files( + paths: List[str], + output_dir: str, + target_format: str, + debounce_seconds: float = 1.0, + console: Console = None +) -> None: + """Convenience function to watch files and convert on change. + + Args: + paths: List of paths to watch + output_dir: Output directory for converted files + target_format: Target format + debounce_seconds: Debounce delay in seconds + console: Rich Console instance + """ + console = console or Console() + + def convert_func(filepath: str) -> None: + output = FileWatcher.convert_file( + filepath, output_dir, target_format + ) + if output: + console.print(f"Converted: {filepath} -> {output}", style='green') + else: + console.print(f"Conversion failed: {filepath}", style='red') + + watcher = FileWatcher(console) + watcher.watch_with_conversion(paths, convert_func, debounce_seconds) + watcher.run_forever()