diff --git a/src/storage/database.py b/src/storage/database.py new file mode 100644 index 0000000..163a834 --- /dev/null +++ b/src/storage/database.py @@ -0,0 +1,325 @@ +"""Database management for DevTrace.""" + +import sqlite3 +from datetime import datetime +from pathlib import Path +from typing import Optional, List, Dict, Any +from dataclasses import dataclass + +from .models import ( + Session, + FileEvent, + CommandEvent, + GitEvent, + EventType +) + + +class Database: + """Manages SQLite database operations for DevTrace.""" + + def __init__(self, db_path: Path): + self.db_path = db_path + self._connection: Optional[sqlite3.Connection] = None + + def _get_connection(self) -> sqlite3.Connection: + """Get database connection.""" + if self._connection is None: + self._connection = sqlite3.connect(str(self.db_path)) + self._connection.row_factory = sqlite3.Row + self._connection.execute("PRAGMA foreign_keys = ON") + return self._connection + + def close(self) -> None: + """Close database connection.""" + if self._connection: + self._connection.close() + self._connection = None + + def initialize(self) -> None: + """Initialize database schema.""" + conn = self._get_connection() + + conn.executescript(""" + CREATE TABLE IF NOT EXISTS sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + directory TEXT NOT NULL, + start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + end_time TIMESTAMP, + notes TEXT + ); + + CREATE TABLE IF NOT EXISTS file_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL, + event_type TEXT NOT NULL, + file_path TEXT NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + details TEXT, + content_hash TEXT, + FOREIGN KEY (session_id) REFERENCES sessions(id) + ); + + CREATE TABLE IF NOT EXISTS command_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL, + command TEXT NOT NULL, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + exit_code INTEGER, + working_directory TEXT, + details TEXT, + FOREIGN KEY (session_id) REFERENCES sessions(id) + ); + + CREATE TABLE IF NOT EXISTS git_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL, + operation TEXT NOT NULL, + branch TEXT, + commit_hash TEXT, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + details TEXT, + diff TEXT, + FOREIGN KEY (session_id) REFERENCES sessions(id) + ); + + CREATE INDEX IF NOT EXISTS idx_file_events_session ON file_events(session_id); + CREATE INDEX IF NOT EXISTS idx_command_events_session ON command_events(session_id); + CREATE INDEX IF NOT EXISTS idx_git_events_session ON git_events(session_id); + """) + + conn.commit() + + def create_session( + self, + name: str, + directory: str, + notes: Optional[str] = None + ) -> Session: + """Create a new session.""" + conn = self._get_connection() + cursor = conn.execute( + """ + INSERT INTO sessions (name, directory, notes) + VALUES (?, ?, ?) + """, + (name, directory, notes) + ) + conn.commit() + + return Session( + id=cursor.lastrowid, + name=name, + directory=directory, + start_time=datetime.now(), + end_time=None, + notes=notes + ) + + def get_session(self, session_id: int) -> Optional[Session]: + """Get a session by ID.""" + conn = self._get_connection() + row = conn.execute( + "SELECT * FROM sessions WHERE id = ?", + (session_id,) + ).fetchone() + + if row is None: + return None + + return Session( + id=row["id"], + name=row["name"], + directory=row["directory"], + start_time=datetime.fromisoformat(row["start_time"]) if row["start_time"] else None, + end_time=datetime.fromisoformat(row["end_time"]) if row["end_time"] else None, + notes=row["notes"] + ) + + def get_all_sessions(self) -> List[Session]: + """Get all sessions.""" + conn = self._get_connection() + rows = conn.execute("SELECT * FROM sessions ORDER BY start_time DESC").fetchall() + + return [ + Session( + id=row["id"], + name=row["name"], + directory=row["directory"], + start_time=datetime.fromisoformat(row["start_time"]) if row["start_time"] else None, + end_time=datetime.fromisoformat(row["end_time"]) if row["end_time"] else None, + notes=row["notes"] + ) + for row in rows + ] + + def end_session(self, session_id: int) -> bool: + """End a session.""" + conn = self._get_connection() + cursor = conn.execute( + "UPDATE sessions SET end_time = CURRENT_TIMESTAMP WHERE id = ?", + (session_id,) + ) + conn.commit() + return cursor.rowcount > 0 + + def add_file_event( + self, + session_id: int, + event_type: str, + file_path: str, + details: Optional[str] = None, + content_hash: Optional[str] = None + ) -> int: + """Add a file event.""" + conn = self._get_connection() + cursor = conn.execute( + """ + INSERT INTO file_events (session_id, event_type, file_path, details, content_hash) + VALUES (?, ?, ?, ?, ?) + """, + (session_id, event_type, file_path, details, content_hash) + ) + conn.commit() + return cursor.lastrowid + + def get_file_events(self, session_id: int, limit: int = 100) -> List[FileEvent]: + """Get file events for a session.""" + conn = self._get_connection() + rows = conn.execute( + "SELECT * FROM file_events WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?", + (session_id, limit) + ).fetchall() + + return [ + FileEvent( + id=row["id"], + session_id=row["session_id"], + event_type=row["event_type"], + file_path=row["file_path"], + timestamp=datetime.fromisoformat(row["timestamp"]) if row["timestamp"] else None, + details=row["details"], + content_hash=row["content_hash"] + ) + for row in rows + ] + + def add_command_event( + self, + session_id: int, + command: str, + exit_code: Optional[int] = None, + working_directory: Optional[str] = None, + details: Optional[str] = None + ) -> int: + """Add a command event.""" + conn = self._get_connection() + cursor = conn.execute( + """ + INSERT INTO command_events (session_id, command, exit_code, working_directory, details) + VALUES (?, ?, ?, ?, ?) + """, + (session_id, command, exit_code, working_directory, details) + ) + conn.commit() + return cursor.lastrowid + + def get_command_events(self, session_id: int, limit: int = 100) -> List[CommandEvent]: + """Get command events for a session.""" + conn = self._get_connection() + rows = conn.execute( + "SELECT * FROM command_events WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?", + (session_id, limit) + ).fetchall() + + return [ + CommandEvent( + id=row["id"], + session_id=row["session_id"], + command=row["command"], + timestamp=datetime.fromisoformat(row["timestamp"]) if row["timestamp"] else None, + exit_code=row["exit_code"], + working_directory=row["working_directory"], + details=row["details"] + ) + for row in rows + ] + + def add_git_event( + self, + session_id: int, + operation: str, + branch: Optional[str] = None, + commit_hash: Optional[str] = None, + details: Optional[str] = None, + diff: Optional[str] = None + ) -> int: + """Add a git event.""" + conn = self._get_connection() + cursor = conn.execute( + """ + INSERT INTO git_events (session_id, operation, branch, commit_hash, details, diff) + VALUES (?, ?, ?, ?, ?, ?) + """, + (session_id, operation, branch, commit_hash, details, diff) + ) + conn.commit() + return cursor.lastrowid + + def get_git_events(self, session_id: int, limit: int = 100) -> List[GitEvent]: + """Get git events for a session.""" + conn = self._get_connection() + rows = conn.execute( + "SELECT * FROM git_events WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?", + (session_id, limit) + ).fetchall() + + return [ + GitEvent( + id=row["id"], + session_id=row["session_id"], + operation=row["operation"], + branch=row["branch"], + commit_hash=row["commit_hash"], + timestamp=datetime.fromisoformat(row["timestamp"]) if row["timestamp"] else None, + details=row["details"], + diff=row["diff"] + ) + for row in rows + ] + + def get_session_events( + self, + session_id: int, + limit: int = 100 + ) -> List[FileEvent | CommandEvent | GitEvent]: + """Get all events for a session.""" + events: List[FileEvent | CommandEvent | GitEvent] = [] + events.extend(self.get_file_events(session_id, limit)) + events.extend(self.get_command_events(session_id, limit)) + events.extend(self.get_git_events(session_id, limit)) + events.sort(key=lambda e: e.timestamp or datetime.min, reverse=True) + return events[:limit] + + def get_stats(self) -> Dict[str, Any]: + """Get database statistics.""" + conn = self._get_connection() + + session_count = conn.execute("SELECT COUNT(*) FROM sessions").fetchone()[0] + active_sessions = conn.execute( + "SELECT COUNT(*) FROM sessions WHERE end_time IS NULL" + ).fetchone()[0] + file_event_count = conn.execute("SELECT COUNT(*) FROM file_events").fetchone()[0] + command_event_count = conn.execute("SELECT COUNT(*) FROM command_events").fetchone()[0] + git_event_count = conn.execute("SELECT COUNT(*) FROM git_events").fetchone()[0] + + return { + "total_sessions": session_count, + "active_sessions": active_sessions, + "file_events": file_event_count, + "command_events": command_event_count, + "git_events": git_event_count, + "total_events": file_event_count + command_event_count + git_event_count, + "db_path": str(self.db_path) + }