Add storage models and monitor modules
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-01-30 09:10:46 +00:00
parent e84b39633c
commit 8627defa85

152
src/monitor/filesystem.py Normal file
View File

@@ -0,0 +1,152 @@
"""Filesystem monitoring using watchdog."""
import hashlib
import os
from pathlib import Path
from typing import Set, Optional, Callable
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from ..storage import Database, FileEvent
class FileEventHandler(FileSystemEventHandler):
"""Handler for filesystem events."""
def __init__(
self,
db: Database,
session_id: int,
watch_patterns: Optional[Set[str]] = None,
ignored_patterns: Optional[Set[str]] = None
):
self.db = db
self.session_id = session_id
self.watch_patterns = watch_patterns or {".py", ".js", ".ts", ".html", ".css", ".md"}
self.ignored_patterns = ignored_patterns or {
"__pycache__", ".git", ".tox", ".venv", "node_modules", ".cache"
}
self._event_callback: Optional[Callable] = None
def set_event_callback(self, callback: Callable) -> None:
"""Set a callback for when events are processed."""
self._event_callback = callback
def _should_watch(self, path: str) -> bool:
"""Check if a path should be watched."""
path_obj = Path(path)
for ignored in self.ignored_patterns:
if ignored in str(path_obj.parts):
return False
if path_obj.suffix.lower() in self.watch_patterns:
return True
return False
def _compute_content_hash(self, path: str) -> Optional[str]:
"""Compute SHA256 hash of file content."""
try:
if os.path.isfile(path):
hasher = hashlib.sha256()
with open(path, "rb") as f:
hasher.update(f.read())
return hasher.hexdigest()
except (IOError, OSError):
pass
return None
def _add_file_event(self, event_type: str, path: str) -> None:
"""Add a file event to the database."""
if not self._should_watch(path):
return
content_hash = self._compute_content_hash(path) if event_type != "deleted" else None
event_id = self.db.add_file_event(
session_id=self.session_id,
event_type=event_type,
file_path=path,
details=None,
content_hash=content_hash
)
if self._event_callback:
self._event_callback(event_id, "file", event_type, path)
def on_created(self, event) -> None:
"""Handle file creation events."""
if not event.is_directory:
self._add_file_event("created", event.src_path)
def on_modified(self, event) -> None:
"""Handle file modification events."""
if not event.is_directory:
self._add_file_event("modified", event.src_path)
def on_deleted(self, event) -> None:
"""Handle file deletion events."""
if not event.is_directory:
self._add_file_event("deleted", event.src_path)
def on_moved(self, event) -> None:
"""Handle file move events."""
if not event.is_directory:
self._add_file_event("moved", event.dest_path)
class FileSystemMonitor:
"""Monitors filesystem changes in a directory."""
def __init__(
self,
db: Database,
session_id: int,
watch_patterns: Optional[Set[str]] = None,
ignored_patterns: Optional[Set[str]] = None
):
self.db = db
self.session_id = session_id
self.watch_patterns = watch_patterns
self.ignored_patterns = ignored_patterns
self.observer: Optional[Observer] = None
self.handler: Optional[FileEventHandler] = None
self._is_running = False
def start(self, directory: str) -> bool:
"""Start monitoring a directory."""
watch_dir = Path(directory)
if not watch_dir.exists():
raise FileNotFoundError(f"Directory does not exist: {directory}")
self.handler = FileEventHandler(
db=self.db,
session_id=self.session_id,
watch_patterns=self.watch_patterns,
ignored_patterns=self.ignored_patterns
)
self.observer = Observer()
self.observer.schedule(self.handler, str(watch_dir), recursive=True)
self.observer.start()
self._is_running = True
return True
def stop(self) -> None:
"""Stop monitoring."""
if self.observer:
self.observer.stop()
self.observer.join()
self._is_running = False
def is_running(self) -> bool:
"""Check if monitoring is running."""
return self._is_running
def set_event_callback(self, callback) -> None:
"""Set a callback for events."""
if self.handler:
self.handler.set_event_callback(callback)