Add CLI and storage modules
Some checks failed
CI / test (push) Failing after 9s

This commit is contained in:
2026-01-30 09:09:43 +00:00
parent 88f5af3c88
commit 32ceb1ff80

325
src/storage/database.py Normal file
View File

@@ -0,0 +1,325 @@
"""Database management for DevTrace."""
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from .models import (
Session,
FileEvent,
CommandEvent,
GitEvent,
EventType
)
class Database:
"""Manages SQLite database operations for DevTrace."""
def __init__(self, db_path: Path):
self.db_path = db_path
self._connection: Optional[sqlite3.Connection] = None
def _get_connection(self) -> sqlite3.Connection:
"""Get database connection."""
if self._connection is None:
self._connection = sqlite3.connect(str(self.db_path))
self._connection.row_factory = sqlite3.Row
self._connection.execute("PRAGMA foreign_keys = ON")
return self._connection
def close(self) -> None:
"""Close database connection."""
if self._connection:
self._connection.close()
self._connection = None
def initialize(self) -> None:
"""Initialize database schema."""
conn = self._get_connection()
conn.executescript("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
directory TEXT NOT NULL,
start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP,
notes TEXT
);
CREATE TABLE IF NOT EXISTS file_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
event_type TEXT NOT NULL,
file_path TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
details TEXT,
content_hash TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(id)
);
CREATE TABLE IF NOT EXISTS command_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
command TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
exit_code INTEGER,
working_directory TEXT,
details TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(id)
);
CREATE TABLE IF NOT EXISTS git_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
operation TEXT NOT NULL,
branch TEXT,
commit_hash TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
details TEXT,
diff TEXT,
FOREIGN KEY (session_id) REFERENCES sessions(id)
);
CREATE INDEX IF NOT EXISTS idx_file_events_session ON file_events(session_id);
CREATE INDEX IF NOT EXISTS idx_command_events_session ON command_events(session_id);
CREATE INDEX IF NOT EXISTS idx_git_events_session ON git_events(session_id);
""")
conn.commit()
def create_session(
self,
name: str,
directory: str,
notes: Optional[str] = None
) -> Session:
"""Create a new session."""
conn = self._get_connection()
cursor = conn.execute(
"""
INSERT INTO sessions (name, directory, notes)
VALUES (?, ?, ?)
""",
(name, directory, notes)
)
conn.commit()
return Session(
id=cursor.lastrowid,
name=name,
directory=directory,
start_time=datetime.now(),
end_time=None,
notes=notes
)
def get_session(self, session_id: int) -> Optional[Session]:
"""Get a session by ID."""
conn = self._get_connection()
row = conn.execute(
"SELECT * FROM sessions WHERE id = ?",
(session_id,)
).fetchone()
if row is None:
return None
return Session(
id=row["id"],
name=row["name"],
directory=row["directory"],
start_time=datetime.fromisoformat(row["start_time"]) if row["start_time"] else None,
end_time=datetime.fromisoformat(row["end_time"]) if row["end_time"] else None,
notes=row["notes"]
)
def get_all_sessions(self) -> List[Session]:
"""Get all sessions."""
conn = self._get_connection()
rows = conn.execute("SELECT * FROM sessions ORDER BY start_time DESC").fetchall()
return [
Session(
id=row["id"],
name=row["name"],
directory=row["directory"],
start_time=datetime.fromisoformat(row["start_time"]) if row["start_time"] else None,
end_time=datetime.fromisoformat(row["end_time"]) if row["end_time"] else None,
notes=row["notes"]
)
for row in rows
]
def end_session(self, session_id: int) -> bool:
"""End a session."""
conn = self._get_connection()
cursor = conn.execute(
"UPDATE sessions SET end_time = CURRENT_TIMESTAMP WHERE id = ?",
(session_id,)
)
conn.commit()
return cursor.rowcount > 0
def add_file_event(
self,
session_id: int,
event_type: str,
file_path: str,
details: Optional[str] = None,
content_hash: Optional[str] = None
) -> int:
"""Add a file event."""
conn = self._get_connection()
cursor = conn.execute(
"""
INSERT INTO file_events (session_id, event_type, file_path, details, content_hash)
VALUES (?, ?, ?, ?, ?)
""",
(session_id, event_type, file_path, details, content_hash)
)
conn.commit()
return cursor.lastrowid
def get_file_events(self, session_id: int, limit: int = 100) -> List[FileEvent]:
"""Get file events for a session."""
conn = self._get_connection()
rows = conn.execute(
"SELECT * FROM file_events WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?",
(session_id, limit)
).fetchall()
return [
FileEvent(
id=row["id"],
session_id=row["session_id"],
event_type=row["event_type"],
file_path=row["file_path"],
timestamp=datetime.fromisoformat(row["timestamp"]) if row["timestamp"] else None,
details=row["details"],
content_hash=row["content_hash"]
)
for row in rows
]
def add_command_event(
self,
session_id: int,
command: str,
exit_code: Optional[int] = None,
working_directory: Optional[str] = None,
details: Optional[str] = None
) -> int:
"""Add a command event."""
conn = self._get_connection()
cursor = conn.execute(
"""
INSERT INTO command_events (session_id, command, exit_code, working_directory, details)
VALUES (?, ?, ?, ?, ?)
""",
(session_id, command, exit_code, working_directory, details)
)
conn.commit()
return cursor.lastrowid
def get_command_events(self, session_id: int, limit: int = 100) -> List[CommandEvent]:
"""Get command events for a session."""
conn = self._get_connection()
rows = conn.execute(
"SELECT * FROM command_events WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?",
(session_id, limit)
).fetchall()
return [
CommandEvent(
id=row["id"],
session_id=row["session_id"],
command=row["command"],
timestamp=datetime.fromisoformat(row["timestamp"]) if row["timestamp"] else None,
exit_code=row["exit_code"],
working_directory=row["working_directory"],
details=row["details"]
)
for row in rows
]
def add_git_event(
self,
session_id: int,
operation: str,
branch: Optional[str] = None,
commit_hash: Optional[str] = None,
details: Optional[str] = None,
diff: Optional[str] = None
) -> int:
"""Add a git event."""
conn = self._get_connection()
cursor = conn.execute(
"""
INSERT INTO git_events (session_id, operation, branch, commit_hash, details, diff)
VALUES (?, ?, ?, ?, ?, ?)
""",
(session_id, operation, branch, commit_hash, details, diff)
)
conn.commit()
return cursor.lastrowid
def get_git_events(self, session_id: int, limit: int = 100) -> List[GitEvent]:
"""Get git events for a session."""
conn = self._get_connection()
rows = conn.execute(
"SELECT * FROM git_events WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?",
(session_id, limit)
).fetchall()
return [
GitEvent(
id=row["id"],
session_id=row["session_id"],
operation=row["operation"],
branch=row["branch"],
commit_hash=row["commit_hash"],
timestamp=datetime.fromisoformat(row["timestamp"]) if row["timestamp"] else None,
details=row["details"],
diff=row["diff"]
)
for row in rows
]
def get_session_events(
self,
session_id: int,
limit: int = 100
) -> List[FileEvent | CommandEvent | GitEvent]:
"""Get all events for a session."""
events: List[FileEvent | CommandEvent | GitEvent] = []
events.extend(self.get_file_events(session_id, limit))
events.extend(self.get_command_events(session_id, limit))
events.extend(self.get_git_events(session_id, limit))
events.sort(key=lambda e: e.timestamp or datetime.min, reverse=True)
return events[:limit]
def get_stats(self) -> Dict[str, Any]:
"""Get database statistics."""
conn = self._get_connection()
session_count = conn.execute("SELECT COUNT(*) FROM sessions").fetchone()[0]
active_sessions = conn.execute(
"SELECT COUNT(*) FROM sessions WHERE end_time IS NULL"
).fetchone()[0]
file_event_count = conn.execute("SELECT COUNT(*) FROM file_events").fetchone()[0]
command_event_count = conn.execute("SELECT COUNT(*) FROM command_events").fetchone()[0]
git_event_count = conn.execute("SELECT COUNT(*) FROM git_events").fetchone()[0]
return {
"total_sessions": session_count,
"active_sessions": active_sessions,
"file_events": file_event_count,
"command_events": command_event_count,
"git_events": git_event_count,
"total_events": file_event_count + command_event_count + git_event_count,
"db_path": str(self.db_path)
}