diff --git a/src/monitor/filesystem.py b/src/monitor/filesystem.py new file mode 100644 index 0000000..7cdc69d --- /dev/null +++ b/src/monitor/filesystem.py @@ -0,0 +1,152 @@ +"""Filesystem monitoring using watchdog.""" + +import hashlib +import os +from pathlib import Path +from typing import Set, Optional, Callable + +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + +from ..storage import Database, FileEvent + + +class FileEventHandler(FileSystemEventHandler): + """Handler for filesystem events.""" + + def __init__( + self, + db: Database, + session_id: int, + watch_patterns: Optional[Set[str]] = None, + ignored_patterns: Optional[Set[str]] = None + ): + self.db = db + self.session_id = session_id + self.watch_patterns = watch_patterns or {".py", ".js", ".ts", ".html", ".css", ".md"} + self.ignored_patterns = ignored_patterns or { + "__pycache__", ".git", ".tox", ".venv", "node_modules", ".cache" + } + self._event_callback: Optional[Callable] = None + + def set_event_callback(self, callback: Callable) -> None: + """Set a callback for when events are processed.""" + self._event_callback = callback + + def _should_watch(self, path: str) -> bool: + """Check if a path should be watched.""" + path_obj = Path(path) + + for ignored in self.ignored_patterns: + if ignored in str(path_obj.parts): + return False + + if path_obj.suffix.lower() in self.watch_patterns: + return True + return False + + def _compute_content_hash(self, path: str) -> Optional[str]: + """Compute SHA256 hash of file content.""" + try: + if os.path.isfile(path): + hasher = hashlib.sha256() + with open(path, "rb") as f: + hasher.update(f.read()) + return hasher.hexdigest() + except (IOError, OSError): + pass + return None + + def _add_file_event(self, event_type: str, path: str) -> None: + """Add a file event to the database.""" + if not self._should_watch(path): + return + + content_hash = self._compute_content_hash(path) if event_type != "deleted" else None + + event_id = self.db.add_file_event( + session_id=self.session_id, + event_type=event_type, + file_path=path, + details=None, + content_hash=content_hash + ) + + if self._event_callback: + self._event_callback(event_id, "file", event_type, path) + + def on_created(self, event) -> None: + """Handle file creation events.""" + if not event.is_directory: + self._add_file_event("created", event.src_path) + + def on_modified(self, event) -> None: + """Handle file modification events.""" + if not event.is_directory: + self._add_file_event("modified", event.src_path) + + def on_deleted(self, event) -> None: + """Handle file deletion events.""" + if not event.is_directory: + self._add_file_event("deleted", event.src_path) + + def on_moved(self, event) -> None: + """Handle file move events.""" + if not event.is_directory: + self._add_file_event("moved", event.dest_path) + + +class FileSystemMonitor: + """Monitors filesystem changes in a directory.""" + + def __init__( + self, + db: Database, + session_id: int, + watch_patterns: Optional[Set[str]] = None, + ignored_patterns: Optional[Set[str]] = None + ): + self.db = db + self.session_id = session_id + self.watch_patterns = watch_patterns + self.ignored_patterns = ignored_patterns + self.observer: Optional[Observer] = None + self.handler: Optional[FileEventHandler] = None + self._is_running = False + + def start(self, directory: str) -> bool: + """Start monitoring a directory.""" + watch_dir = Path(directory) + + if not watch_dir.exists(): + raise FileNotFoundError(f"Directory does not exist: {directory}") + + self.handler = FileEventHandler( + db=self.db, + session_id=self.session_id, + watch_patterns=self.watch_patterns, + ignored_patterns=self.ignored_patterns + ) + + self.observer = Observer() + self.observer.schedule(self.handler, str(watch_dir), recursive=True) + self.observer.start() + self._is_running = True + + return True + + def stop(self) -> None: + """Stop monitoring.""" + if self.observer: + self.observer.stop() + self.observer.join() + self._is_running = False + + def is_running(self) -> bool: + """Check if monitoring is running.""" + return self._is_running + + def set_event_callback(self, callback) -> None: + """Set a callback for events.""" + if self.handler: + self.handler.set_event_callback(callback)