Initial upload: Agentic Codebase Memory Manager v0.1.0
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-03-22 16:00:53 +00:00
parent 90ac99870a
commit bad9cf0042

View File

@@ -0,0 +1,133 @@
"""Core services for memory management."""
from typing import List, Optional
from memory_manager.db.models import MemoryCategory, MemoryEntry, Commit
from memory_manager.db.repository import MemoryRepository
class MemoryService:
def __init__(self, repository: MemoryRepository):
self.repository = repository
async def create_entry(
self,
title: str,
content: str,
category: str,
tags: List[str],
agent_id: str = "unknown",
project_path: str = ".",
parent_id: Optional[int] = None,
) -> MemoryEntry:
category_enum = MemoryCategory(category)
return await self.repository.create_entry(
title=title,
content=content,
category=category_enum,
tags=tags,
agent_id=agent_id,
project_path=project_path,
parent_id=parent_id,
)
async def get_entry(self, entry_id: int) -> Optional[MemoryEntry]:
return await self.repository.get_entry(entry_id)
async def list_entries(
self,
category: Optional[str] = None,
agent_id: Optional[str] = None,
limit: int = 100,
offset: int = 0,
) -> List[MemoryEntry]:
category_enum = MemoryCategory(category) if category else None
return await self.repository.list_entries(
category=category_enum,
agent_id=agent_id,
limit=limit,
offset=offset,
)
async def update_entry(
self,
entry_id: int,
title: Optional[str] = None,
content: Optional[str] = None,
category: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> Optional[MemoryEntry]:
category_enum = MemoryCategory(category) if category else None
return await self.repository.update_entry(
entry_id=entry_id,
title=title,
content=content,
category=category_enum,
tags=tags,
)
async def delete_entry(self, entry_id: int) -> bool:
return await self.repository.delete_entry(entry_id)
async def search_entries(
self,
query: str,
category: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> List[MemoryEntry]:
category_enum = MemoryCategory(category) if category else None
return await self.repository.search_entries(
query=query,
category=category_enum,
tags=tags,
)
class CommitService:
def __init__(self, repository: MemoryRepository):
self.repository = repository
async def create_commit(
self,
message: str,
agent_id: str = "unknown",
project_path: str = ".",
) -> Commit:
return await self.repository.create_commit(
message=message,
agent_id=agent_id,
project_path=project_path,
)
async def get_commits(self, limit: int = 50) -> List[Commit]:
return await self.repository.get_commits(limit=limit)
async def get_commit(self, commit_hash: str) -> Optional[Commit]:
return await self.repository.get_commit(commit_hash)
async def diff_commits(self, hash1: str, hash2: str) -> dict:
entries1 = await self.repository.get_commit_entries(hash1)
entries2 = await self.repository.get_commit_entries(hash2)
snapshot1 = {e.memory_entry_id: e.entry_snapshot for e in entries1}
snapshot2 = {e.memory_entry_id: e.entry_snapshot for e in entries2}
all_ids = set(snapshot1.keys()) | set(snapshot2.keys())
diff_result = {
"hash1": hash1,
"hash2": hash2,
"added": [],
"removed": [],
"modified": [],
}
for entry_id in all_ids:
if entry_id not in snapshot1:
diff_result["added"].append(snapshot2[entry_id])
elif entry_id not in snapshot2:
diff_result["removed"].append(snapshot1[entry_id])
elif snapshot1[entry_id] != snapshot2[entry_id]:
diff_result["modified"].append({
"before": snapshot1[entry_id],
"after": snapshot2[entry_id],
})
return diff_result