Add memory_manager source files and tests
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-03-22 16:18:53 +00:00
parent 3a091562b5
commit b5708913d1

View File

@@ -1,27 +1,19 @@
"""SQLAlchemy models for memory manager.""" """SQLAlchemy database models for the memory manager."""
import enum import enum
from datetime import datetime from datetime import datetime
from typing import List, Optional from typing import Any
from sqlalchemy import ( from sqlalchemy import JSON, DateTime, Enum, Index, String, Text, func, text
Column, from sqlalchemy.ext.asyncio import create_async_engine
DateTime, from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
Enum,
ForeignKey,
Integer,
String,
Text,
JSON,
Table,
)
from sqlalchemy.orm import DeclarativeBase, relationship, Mapped, mapped_column
class Base(DeclarativeBase): class Base(DeclarativeBase):
pass pass
class MemoryCategory(str, enum.Enum): class MemoryCategory(enum.StrEnum):
DECISION = "decision" DECISION = "decision"
FEATURE = "feature" FEATURE = "feature"
REFACTORING = "refactoring" REFACTORING = "refactoring"
@@ -30,72 +22,113 @@ class MemoryCategory(str, enum.Enum):
NOTE = "note" NOTE = "note"
memory_tags = Table(
"memory_tags",
Base.metadata,
Column("memory_entry_id", Integer, ForeignKey("memory_entries.id"), primary_key=True),
Column("tag", String(100), primary_key=True),
)
class MemoryEntry(Base): class MemoryEntry(Base):
__tablename__ = "memory_entries" __tablename__ = "memory_entries"
id: Mapped[int] = mapped_column(Integer, primary_key=True) id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(255), nullable=False) title: Mapped[str] = mapped_column(String(255), nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False) content: Mapped[str] = mapped_column(Text, nullable=False)
category: Mapped[MemoryCategory] = mapped_column( category: Mapped[MemoryCategory] = mapped_column(Enum(MemoryCategory), nullable=False)
Enum(MemoryCategory), nullable=False tags: Mapped[list[str]] = mapped_column(JSON, default=list)
) agent_id: Mapped[str] = mapped_column(String(100), nullable=False)
tags: Mapped[List[str]] = mapped_column(JSON, default=list) project_path: Mapped[str] = mapped_column(String(500), nullable=False)
agent_id: Mapped[str] = mapped_column(String(100), default="unknown") created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
project_path: Mapped[str] = mapped_column(String(500), default=".") updated_at: Mapped[datetime] = mapped_column(DateTime, default=func.now(), onupdate=func.now())
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow __table_args__ = (
) Index("idx_category", "category"),
updated_at: Mapped[datetime] = mapped_column( Index("idx_agent_id", "agent_id"),
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow Index("idx_project_path", "project_path"),
) Index("idx_created_at", "created_at"),
parent_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("memory_entries.id"), nullable=True
) )
parent: Mapped[Optional["MemoryEntry"]] = relationship( def to_dict(self) -> dict[str, Any]:
"MemoryEntry", remote_side=[id], back_populates="children" return {
) "id": self.id,
children: Mapped[List["MemoryEntry"]] = relationship( "title": self.title,
"MemoryEntry", back_populates="parent" "content": self.content,
) "category": self.category.value,
"tags": self.tags,
"agent_id": self.agent_id,
"project_path": self.project_path,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
class Commit(Base): class Commit(Base):
__tablename__ = "commits" __tablename__ = "commits"
id: Mapped[int] = mapped_column(Integer, primary_key=True) id: Mapped[int] = mapped_column(primary_key=True)
hash: Mapped[str] = mapped_column(String(40), unique=True, nullable=False) hash: Mapped[str] = mapped_column(String(40), unique=True, nullable=False)
message: Mapped[str] = mapped_column(Text, nullable=False) message: Mapped[str] = mapped_column(Text, nullable=False)
agent_id: Mapped[str] = mapped_column(String(100), default="unknown") agent_id: Mapped[str] = mapped_column(String(100), nullable=False)
project_path: Mapped[str] = mapped_column(String(500), default=".") project_path: Mapped[str] = mapped_column(String(500), nullable=False)
created_at: Mapped[datetime] = mapped_column( snapshot: Mapped[list[dict[str, Any]]] = mapped_column(JSON, nullable=False)
DateTime, default=datetime.utcnow created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
__table_args__ = (
Index("idx_commit_hash", "hash"),
Index("idx_commit_agent_id", "agent_id"),
Index("idx_commit_created_at", "created_at"),
) )
entries: Mapped[List["CommitEntry"]] = relationship( def to_dict(self) -> dict[str, Any]:
"CommitEntry", back_populates="commit", cascade="all, delete-orphan" return {
) "id": self.id,
"hash": self.hash,
"message": self.message,
"agent_id": self.agent_id,
"project_path": self.project_path,
"snapshot": self.snapshot,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class CommitEntry(Base): class Tag(Base):
__tablename__ = "commit_entries" __tablename__ = "tags"
id: Mapped[int] = mapped_column(Integer, primary_key=True) id: Mapped[int] = mapped_column(primary_key=True)
commit_id: Mapped[int] = mapped_column( name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
Integer, ForeignKey("commits.id"), nullable=False created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
)
memory_entry_id: Mapped[int] = mapped_column(
Integer, ForeignKey("memory_entries.id"), nullable=False
)
entry_snapshot: Mapped[str] = mapped_column(JSON, nullable=False)
commit: Mapped["Commit"] = relationship("Commit", back_populates="entries") def to_dict(self) -> dict[str, Any]:
memory_entry: Mapped["MemoryEntry"] = relationship("MemoryEntry") return {
"id": self.id,
"name": self.name,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
async def init_db(db_path: str):
engine = create_async_engine(f"sqlite+aiosqlite:///{db_path}", echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await conn.execute(text(
"CREATE VIRTUAL TABLE IF NOT EXISTS memory_entries_fts USING fts5("
"title, content, tags, category, agent_id, project_path, content='memory_entries', content_rowid='id')"
))
await conn.execute(text(
"CREATE TRIGGER IF NOT EXISTS memory_entries_ai AFTER INSERT ON memory_entries BEGIN "
"INSERT INTO memory_entries_fts(rowid, title, content, tags, category, agent_id, project_path) "
"VALUES (new.id, new.title, new.content, new.tags, new.category, new.agent_id, new.project_path); END"
))
await conn.execute(text(
"CREATE TRIGGER IF NOT EXISTS memory_entries_ad AFTER DELETE ON memory_entries BEGIN "
"INSERT INTO memory_entries_fts(memory_entries_fts, rowid, title, content, tags, category, agent_id, project_path) "
"VALUES ('delete', old.id, old.title, old.content, old.tags, old.category, old.agent_id, old.project_path); END"
))
await conn.execute(text(
"CREATE TRIGGER IF NOT EXISTS memory_entries_au AFTER UPDATE ON memory_entries BEGIN "
"INSERT INTO memory_entries_fts(memory_entries_fts, rowid, title, content, tags, category, agent_id, project_path) "
"VALUES ('delete', old.id, old.title, old.content, old.tags, old.category, old.agent_id, old.project_path); "
"INSERT INTO memory_entries_fts(rowid, title, content, tags, category, agent_id, project_path) "
"VALUES (new.id, new.title, new.content, new.tags, new.category, new.agent_id, new.project_path); END"
))
return engine