Initial upload: Agentic Codebase Memory Manager v0.1.0
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-03-22 16:00:58 +00:00
parent 5d886c86db
commit a99e36abb6

View File

@@ -0,0 +1,194 @@
"""FastAPI application for memory manager API."""
import os
from contextlib import asynccontextmanager
from typing import List, Optional
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from memory_manager.core.services import MemoryService, CommitService
from memory_manager.db.repository import MemoryRepository
from memory_manager.api.schemas import (
MemoryEntryCreate,
MemoryEntryUpdate,
MemoryEntryResponse,
CommitCreate,
CommitResponse,
DiffResponse,
StatsResponse,
)
repository: Optional[MemoryRepository] = None
memory_service: Optional[MemoryService] = None
commit_service: Optional[CommitService] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global repository, memory_service, commit_service
db_path = os.getenv("MEMORY_DB_PATH", ".memory/codebase_memory.db")
os.makedirs(os.path.dirname(db_path), exist_ok=True)
repository = MemoryRepository(db_path)
await repository.init_db()
memory_service = MemoryService(repository)
commit_service = CommitService(repository)
yield
if repository:
await repository.close()
app = FastAPI(
title="Agentic Codebase Memory Manager API",
description="A centralized memory store for AI coding agents",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def entry_to_response(entry) -> MemoryEntryResponse:
return MemoryEntryResponse(
id=entry.id,
title=entry.title,
content=entry.content,
category=entry.category.value,
tags=entry.tags or [],
agent_id=entry.agent_id,
project_path=entry.project_path,
created_at=entry.created_at,
updated_at=entry.updated_at,
parent_id=entry.parent_id,
)
@app.get("/")
def root():
return {"message": "Agentic Codebase Memory Manager API", "version": "0.1.0"}
@app.get("/api/memory", response_model=List[MemoryEntryResponse])
async def list_memory(
category: Optional[str] = Query(None),
agent_id: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
entries = await memory_service.list_entries(
category=category,
agent_id=agent_id,
limit=limit,
offset=offset,
)
return [entry_to_response(e) for e in entries]
@app.post("/api/memory", response_model=MemoryEntryResponse)
async def create_memory(entry: MemoryEntryCreate):
new_entry = await memory_service.create_entry(
title=entry.title,
content=entry.content,
category=entry.category,
tags=entry.tags,
agent_id=entry.agent_id,
project_path=entry.project_path,
parent_id=entry.parent_id,
)
return entry_to_response(new_entry)
@app.get("/api/memory/{entry_id}", response_model=MemoryEntryResponse)
async def get_memory(entry_id: int):
entry = await memory_service.get_entry(entry_id)
if not entry:
raise HTTPException(status_code=404, detail="Entry not found")
return entry_to_response(entry)
@app.put("/api/memory/{entry_id}", response_model=MemoryEntryResponse)
async def update_memory(entry_id: int, entry: MemoryEntryUpdate):
updated = await memory_service.update_entry(
entry_id=entry_id,
title=entry.title,
content=entry.content,
category=entry.category,
tags=entry.tags,
)
if not updated:
raise HTTPException(status_code=404, detail="Entry not found")
return entry_to_response(updated)
@app.delete("/api/memory/{entry_id}")
async def delete_memory(entry_id: int):
deleted = await memory_service.delete_entry(entry_id)
if not deleted:
raise HTTPException(status_code=404, detail="Entry not found")
return {"message": "Entry deleted successfully"}
@app.get("/api/memory/search/query", response_model=List[MemoryEntryResponse])
async def search_memory(
q: str = Query(..., min_length=1),
category: Optional[str] = Query(None),
tags: Optional[str] = Query(None),
):
tag_list = tags.split(",") if tags else None
entries = await memory_service.search_entries(
query=q,
category=category,
tags=tag_list,
)
return [entry_to_response(e) for e in entries]
@app.post("/api/memory/commit", response_model=CommitResponse)
async def create_commit(commit: CommitCreate):
new_commit = await commit_service.create_commit(
message=commit.message,
agent_id=commit.agent_id,
project_path=commit.project_path,
)
return CommitResponse(
id=new_commit.id,
hash=new_commit.hash,
message=new_commit.message,
agent_id=new_commit.agent_id,
project_path=new_commit.project_path,
created_at=new_commit.created_at,
)
@app.get("/api/memory/log", response_model=List[CommitResponse])
async def get_log(limit: int = Query(50, ge=1, le=1000)):
commits = await commit_service.get_commits(limit=limit)
return [
CommitResponse(
id=c.id,
hash=c.hash,
message=c.message,
agent_id=c.agent_id,
project_path=c.project_path,
created_at=c.created_at,
)
for c in commits
]
@app.get("/api/memory/diff/{hash1}/{hash2}", response_model=DiffResponse)
async def diff_commits(hash1: str, hash2: str):
diff_result = await commit_service.diff_commits(hash1, hash2)
return DiffResponse(**diff_result)
@app.get("/api/memory/stats", response_model=StatsResponse)
async def get_stats():
stats = await repository.get_stats()
return StatsResponse(**stats)