Add memory_manager source files and tests
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-03-22 16:19:03 +00:00
parent 849e4f5be5
commit 91196dfca4

View File

@@ -1,103 +1,360 @@
"""Textual TUI application.""" from datetime import datetime
import os from typing import Any
import asyncio
from typing import Optional
from textual import on
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.widgets import Header, Footer, DataTable, Static, Tabs, Tab from textual.binding import Binding
from textual.containers import Container, VerticalScroll from textual.containers import Container, Horizontal, ScrollableContainer, Vertical
from textual.screen import Screen from textual.screen import Screen
from textual.widgets import (
Button,
Footer,
Header,
Input,
Label,
ListItem,
ListView,
Static,
)
from memory_manager.core.services import MemoryService, CommitService from memory_manager.core.services import MemoryManager
from memory_manager.db.models import MemoryCategory
from memory_manager.db.repository import MemoryRepository from memory_manager.db.repository import MemoryRepository
db_path = os.getenv("MEMORY_DB_PATH", ".memory/codebase_memory.db") db_path = os.getenv("MEMORY_DB_PATH", ".memory/codebase_memory.db")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
repository = MemoryRepository(db_path)
memory_service = MemoryService(repository) async def get_memory_manager() -> MemoryManager:
commit_service = CommitService(repository) repository = MemoryRepository(db_path)
await repository.initialize()
manager = MemoryManager(repository)
return manager
class DashboardScreen(Screen): class DashboardScreen(Screen):
CSS = """
Screen {
background: $surface;
}
.stats-container {
height: auto;
padding: 1 2;
background: $panel;
border: solid $border;
}
.stat-label {
color: $text-muted;
}
.stat-value {
color: $text;
bold: true;
}
.entry-list {
height: 1fr;
}
"""
def __init__(self, manager: MemoryManager):
super().__init__()
self.manager = manager
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Header() yield Header()
yield Static("Memory Manager Dashboard", id="title") yield Container(
yield DataTable() Vertical(
Label("Memory Manager Dashboard", classes="header-title"),
ScrollableContainer(classes="stats-container", id="stats-container"),
ListView(id="recent-entries", classes="entry-list"),
id="dashboard-content",
)
)
yield Footer() yield Footer()
async def on_mount(self): async def on_mount(self) -> None:
stats = await repository.get_stats() await self.load_stats()
table = self.query_one(DataTable)
table.add_columns("Metric", "Value") async def load_stats(self) -> None:
table.add_row("Total Entries", str(stats["total_entries"])) stats_container = self.query_one("#stats-container", ScrollableContainer)
table.add_row("Total Commits", str(stats["total_commits"]))
for category, count in stats["category_counts"].items(): entries = await self.manager.memory_service.list_entries(limit=10000)
table.add_row(category.title(), str(count)) commits = await self.manager.commit_service.list_commits(limit=10000)
entries_by_category: dict[str, int] = {}
for entry in entries:
cat = entry["category"]
entries_by_category[cat] = entries_by_category.get(cat, 0) + 1
stats_text = f"""
Total Entries: {len(entries)} | Total Commits: {len(commits)}
Entries by Category:
"""
for cat, count in entries_by_category.items():
stats_text += f" {cat}: {count}\n"
stats_container.remove_children()
stats_container.mount(Static(stats_text))
list_view = self.query_one("#recent-entries", ListView)
list_view.clear()
for entry in entries[:10]:
created = entry["created_at"]
if created:
created = datetime.fromisoformat(created).strftime("%m/%d %H:%M")
list_item = ListItem(
Label(f"[{entry['category']}] {entry['title'][:40]} - {created}"),
)
await list_view.mount(list_item)
class MemoryListScreen(Screen): class MemoryListScreen(Screen):
CSS = """
Screen {
background: $surface;
}
.filter-bar {
height: 3;
padding: 1;
background: $panel;
}
.entry-detail {
height: 1fr;
padding: 1 2;
}
"""
def __init__(self, manager: MemoryManager, category: MemoryCategory | None = None):
super().__init__()
self.manager = manager
self.current_category = category
self.entries: list[dict[str, Any]] = []
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Header() yield Header()
yield Static("Memory Entries", id="title") yield Container(
yield DataTable() Horizontal(
Button("All", id="filter-all"),
Button("Decision", id="filter-decision"),
Button("Feature", id="filter-feature"),
Button("Refactoring", id="filter-refactoring"),
Button("Architecture", id="filter-architecture"),
Button("Bug", id="filter-bug"),
Button("Note", id="filter-note"),
classes="filter-bar",
),
Horizontal(
ListView(id="entries-list", classes="column"),
ScrollableContainer(id="entry-detail", classes="column entry-detail"),
classes="main-content",
),
)
yield Footer() yield Footer()
async def on_mount(self): async def on_mount(self) -> None:
entries = await memory_service.list_entries(limit=100) await self.load_entries()
table = self.query_one(DataTable)
table.add_columns("ID", "Category", "Title", "Agent") async def load_entries(self, category: MemoryCategory | None = None) -> None:
for entry in entries: self.entries = await self.manager.memory_service.list_entries(
table.add_row( category=category,
str(entry.id), limit=1000,
entry.category.value, )
entry.title[:40],
entry.agent_id, list_view = self.query_one("#entries-list", ListView)
list_view.clear()
for entry in self.entries:
created = entry["created_at"]
if created:
created = datetime.fromisoformat(created).strftime("%m/%d %H:%M")
list_item = ListItem(
Label(f"[{entry['category']}] {entry['title']}"),
Label(f"{entry['agent_id']} | {created}"),
) )
await list_view.mount(list_item)
@on(ListView.Selected)
async def on_entry_selected(self, event: ListView.Selected) -> None:
index = event.list_view.index
if index is not None and 0 <= index < len(self.entries):
entry = self.entries[index]
detail_container = self.query_one("#entry-detail", ScrollableContainer)
detail_container.remove_children()
content = f"""
Title: {entry['title']}
Category: {entry['category']}
Agent: {entry['agent_id']}
Project: {entry['project_path']}
Tags: {', '.join(entry['tags']) if entry['tags'] else '(none)'}
Created: {entry['created_at']}
Updated: {entry['updated_at']}
Content:
{entry['content']}
"""
await detail_container.mount(Static(content))
@on(Button.Pressed)
async def on_filter_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id
category = None
if button_id == "filter-decision":
category = MemoryCategory.DECISION
elif button_id == "filter-feature":
category = MemoryCategory.FEATURE
elif button_id == "filter-refactoring":
category = MemoryCategory.REFACTORING
elif button_id == "filter-architecture":
category = MemoryCategory.ARCHITECTURE
elif button_id == "filter-bug":
category = MemoryCategory.BUG
elif button_id == "filter-note":
category = MemoryCategory.NOTE
await self.load_entries(category)
class CommitHistoryScreen(Screen): class CommitHistoryScreen(Screen):
CSS = """
Screen {
background: $surface;
}
.commit-list {
height: 1fr;
}
"""
def __init__(self, manager: MemoryManager):
super().__init__()
self.manager = manager
self.commits: list[dict[str, Any]] = []
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Header() yield Header()
yield Static("Commit History", id="title") yield Container(
yield DataTable() ScrollableContainer(
ListView(id="commits-list", classes="commit-list"),
id="commits-container",
),
)
yield Footer() yield Footer()
async def on_mount(self): async def on_mount(self) -> None:
commits = await commit_service.get_commits(limit=50) await self.load_commits()
table = self.query_one(DataTable)
table.add_columns("Hash", "Message", "Agent", "Date") async def load_commits(self) -> None:
for commit in commits: self.commits = await self.manager.commit_service.list_commits(limit=100)
table.add_row(
commit.hash[:8], list_view = self.query_one("#commits-list", ListView)
commit.message[:40], list_view.clear()
commit.agent_id,
str(commit.created_at)[:10], for commit in self.commits:
created = commit["created_at"]
if created:
created = datetime.fromisoformat(created).strftime("%Y-%m-%d %H:%M:%S")
content = f"commit {commit['hash'][:8]}\n{commit['agent_id']} | {created}\n\n {commit['message']}"
list_item = ListItem(
Static(content, markup=False),
) )
await list_view.mount(list_item)
class MemoryTUIApp(App): class SearchScreen(Screen):
def __init__(self): CSS = """
Screen {
background: $surface;
}
.search-input {
height: 3;
padding: 1;
}
.results-list {
height: 1fr;
}
"""
def __init__(self, manager: MemoryManager):
super().__init__() super().__init__()
self.title = "Memory Manager TUI" self.manager = manager
self.SCREENS = { self.results: list[dict[str, Any]] = []
"dashboard": DashboardScreen(),
"list": MemoryListScreen(),
"history": CommitHistoryScreen(),
}
def on_mount(self):
self.push_screen("dashboard")
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Tabs( yield Header()
Tab("Dashboard", id="dashboard"), yield Container(
Tab("Entries", id="list"), Horizontal(
Tab("History", id="history"), Input(placeholder="Search query...", id="search-input"),
Button("Search", id="search-button"),
classes="search-input",
),
ScrollableContainer(
ListView(id="results-list", classes="results-list"),
id="results-container",
),
) )
yield Footer()
@on(Button.Pressed, "#search-button")
@on(Input.Submitted, "#search-input")
async def on_search(self) -> None:
input_widget = self.query_one("#search-input", Input)
query = input_widget.value
if not query:
return
self.results = await self.manager.search_service.search(query=query, limit=100)
list_view = self.query_one("#results-list", ListView)
list_view.clear()
for entry in self.results:
created = entry["created_at"]
if created:
created = datetime.fromisoformat(created).strftime("%m/%d %H:%M")
list_item = ListItem(
Label(f"[{entry['category']}] {entry['title'][:40]}"),
Label(f"{entry['content'][:80]}... | {created}"),
)
await list_view.mount(list_item)
def run_tui(): class TUIApp(App):
app = MemoryTUIApp() BINDINGS = [
app.run() Binding("d", "switch_dashboard", "Dashboard"),
Binding("l", "switch_memory_list", "Memory List"),
Binding("c", "switch_commits", "Commits"),
Binding("s", "switch_search", "Search"),
Binding("q", "quit", "Quit"),
]
def __init__(self):
super().__init__()
self.manager = None
async def on_mount(self) -> None:
self.manager = await get_memory_manager()
await self.push_screen(DashboardScreen(self.manager))
async def on_unmount(self) -> None:
if self.manager:
await self.manager.close()
async def switch_dashboard(self) -> None:
if self.manager:
await self.push_screen(DashboardScreen(self.manager))
async def switch_memory_list(self) -> None:
if self.manager:
await self.push_screen(MemoryListScreen(self.manager))
async def switch_commits(self) -> None:
if self.manager:
await self.push_screen(CommitHistoryScreen(self.manager))
async def switch_search(self) -> None:
if self.manager:
await self.push_screen(SearchScreen(self.manager))