416 lines
15 KiB
Python
416 lines
15 KiB
Python
"""
|
|
7000%AUTO Web Dashboard
|
|
Real-time monitoring dashboard for the autonomous AI system.
|
|
Read-only interface with SSE streaming for live updates.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import AsyncGenerator, Optional, Dict, Any, List
|
|
from collections import deque
|
|
|
|
from fastapi import FastAPI, Request
|
|
|
|
# Module-level logger for SSE broadcasting
|
|
logger = logging.getLogger(__name__)
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from sse_starlette.sse import EventSourceResponse
|
|
|
|
|
|
# =============================================================================
|
|
# Global Event System for SSE
|
|
# =============================================================================
|
|
|
|
class EventBroadcaster:
|
|
"""
|
|
Broadcasts events to all connected SSE clients.
|
|
Thread-safe event distribution system.
|
|
"""
|
|
|
|
def __init__(self, max_history: int = 100):
|
|
self._subscribers: List[asyncio.Queue] = []
|
|
self._history: deque = deque(maxlen=max_history)
|
|
self._lock = asyncio.Lock()
|
|
self._current_state: Dict[str, Any] = {}
|
|
|
|
async def subscribe(self) -> asyncio.Queue:
|
|
"""Subscribe to events. Returns a queue to receive events."""
|
|
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
|
|
async with self._lock:
|
|
self._subscribers.append(queue)
|
|
# Send recent history to new subscriber
|
|
for event in self._history:
|
|
await queue.put(event)
|
|
return queue
|
|
|
|
async def unsubscribe(self, queue: asyncio.Queue):
|
|
"""Unsubscribe from events."""
|
|
async with self._lock:
|
|
if queue in self._subscribers:
|
|
self._subscribers.remove(queue)
|
|
|
|
async def broadcast(self, event: Dict[str, Any]):
|
|
"""Broadcast an event to all subscribers."""
|
|
event["timestamp"] = datetime.utcnow().isoformat()
|
|
event_type = event.get("type", "unknown")
|
|
|
|
# Log agent_output events specifically for debugging
|
|
if event_type == "agent_output":
|
|
agent = event.get("agent", "unknown")
|
|
msg_len = len(event.get("message", "")) if event.get("message") else 0
|
|
msg_preview = event.get("message", "")[:100] if event.get("message") else "empty"
|
|
logger.info(f"[SSE_BROADCAST] agent_output: agent={agent}, len={msg_len}, preview={msg_preview}...")
|
|
|
|
async with self._lock:
|
|
subscriber_count = len(self._subscribers)
|
|
self._history.append(event)
|
|
dead_subscribers = []
|
|
sent_count = 0
|
|
for q in self._subscribers:
|
|
try:
|
|
q.put_nowait(event)
|
|
sent_count += 1
|
|
except asyncio.QueueFull:
|
|
dead_subscribers.append(q)
|
|
for dead in dead_subscribers:
|
|
self._subscribers.remove(dead)
|
|
|
|
if event_type == "agent_output":
|
|
logger.info(f"[SSE_BROADCAST] Sent to {sent_count}/{subscriber_count} subscribers, {len(dead_subscribers)} dead")
|
|
|
|
# Update current state tracking
|
|
if event.get("type") in ("agent_started", "status", "heartbeat"):
|
|
self._current_state.update(event)
|
|
|
|
def get_history(self) -> List[Dict[str, Any]]:
|
|
"""Get recent event history."""
|
|
return list(self._history)
|
|
|
|
|
|
# Global event broadcaster
|
|
event_broadcaster = EventBroadcaster()
|
|
|
|
|
|
# =============================================================================
|
|
# FastAPI Application
|
|
# =============================================================================
|
|
|
|
app = FastAPI(
|
|
title="7000%AUTO Dashboard",
|
|
description="Real-time monitoring dashboard for the AI automation system",
|
|
version="1.0.0",
|
|
docs_url=None,
|
|
redoc_url=None,
|
|
)
|
|
|
|
# CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["GET"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Agent Pipeline Definition
|
|
# =============================================================================
|
|
|
|
AGENT_PIPELINE = [
|
|
{"id": "ideator", "name": "Ideator", "icon": "💡", "description": "Generates innovative project ideas"},
|
|
{"id": "planner", "name": "Planner", "icon": "📋", "description": "Creates detailed implementation plans"},
|
|
{"id": "developer", "name": "Developer", "icon": "👨💻", "description": "Implements code and solutions"},
|
|
{"id": "tester", "name": "Tester", "icon": "🧪", "description": "Tests and validates implementations"},
|
|
{"id": "uploader", "name": "Uploader", "icon": "🚀", "description": "Uploads to Gitea repository"},
|
|
{"id": "evangelist", "name": "Evangelist", "icon": "📣", "description": "Promotes on social media"},
|
|
]
|
|
|
|
|
|
# =============================================================================
|
|
# Template Loading
|
|
# =============================================================================
|
|
|
|
def get_dashboard_html() -> str:
|
|
"""Load and return the dashboard HTML template."""
|
|
template_path = Path(__file__).parent / "templates" / "dashboard.html"
|
|
if template_path.exists():
|
|
return template_path.read_text(encoding="utf-8")
|
|
return "<html><body><h1>Dashboard template not found</h1></body></html>"
|
|
|
|
|
|
# =============================================================================
|
|
# Helper Functions
|
|
# =============================================================================
|
|
|
|
def get_orchestrator():
|
|
"""Get the orchestrator instance from main module."""
|
|
try:
|
|
import main
|
|
return getattr(main, 'orchestrator', None)
|
|
except (ImportError, AttributeError):
|
|
return None
|
|
|
|
|
|
async def get_system_status_async() -> Dict[str, Any]:
|
|
"""Get current system status (async version)."""
|
|
orchestrator = get_orchestrator()
|
|
|
|
status = {
|
|
"orchestrator_running": False,
|
|
"current_agent": None,
|
|
"current_project": None,
|
|
"dev_test_iterations": 0,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
if orchestrator:
|
|
status["orchestrator_running"] = orchestrator.is_running
|
|
|
|
if hasattr(orchestrator, 'state_manager'):
|
|
try:
|
|
active_state = await orchestrator.state_manager.get_active_state()
|
|
if active_state:
|
|
status["current_agent"] = active_state.current_agent
|
|
status["current_project"] = {
|
|
"id": active_state.project_id,
|
|
"status": active_state.status,
|
|
"name": active_state.plan.get("project_name") if active_state.plan else None,
|
|
"current_agent": active_state.current_agent,
|
|
}
|
|
status["dev_test_iterations"] = active_state.dev_test_iterations
|
|
except Exception:
|
|
pass
|
|
|
|
return status
|
|
|
|
|
|
async def get_database_stats() -> Dict[str, Any]:
|
|
"""Get statistics from database."""
|
|
try:
|
|
from database.db import get_db, get_stats
|
|
from database.models import Project, AgentLog, ProjectStatus
|
|
from sqlalchemy import select, desc
|
|
|
|
async with get_db() as session:
|
|
# Get stats
|
|
stats = await get_stats(session)
|
|
|
|
# Get recent logs
|
|
result = await session.execute(
|
|
select(AgentLog)
|
|
.order_by(desc(AgentLog.created_at))
|
|
.limit(50)
|
|
)
|
|
logs = result.scalars().all()
|
|
|
|
# Get active project
|
|
active_result = await session.execute(
|
|
select(Project)
|
|
.where(Project.status.notin_([
|
|
ProjectStatus.COMPLETED.value,
|
|
ProjectStatus.FAILED.value
|
|
]))
|
|
.order_by(desc(Project.created_at))
|
|
.limit(1)
|
|
)
|
|
active_project = active_result.scalar_one_or_none()
|
|
|
|
return {
|
|
"stats": stats,
|
|
"recent_logs": [
|
|
{
|
|
"id": log.id,
|
|
"project_id": log.project_id,
|
|
"agent_name": log.agent_name,
|
|
"message": log.message,
|
|
"log_type": log.log_type,
|
|
"timestamp": log.created_at.isoformat() if log.created_at else None,
|
|
}
|
|
for log in logs
|
|
],
|
|
"active_project": {
|
|
"id": active_project.id,
|
|
"name": active_project.name,
|
|
"status": active_project.status,
|
|
"current_agent": active_project.current_agent,
|
|
"dev_test_iterations": active_project.dev_test_iterations,
|
|
} if active_project else None,
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"stats": {"total_ideas": 0, "total_projects": 0, "completed_projects": 0},
|
|
"recent_logs": [],
|
|
"active_project": None,
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# API Endpoints
|
|
# =============================================================================
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def serve_dashboard():
|
|
"""Serve the main dashboard HTML page."""
|
|
return HTMLResponse(content=get_dashboard_html())
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint."""
|
|
return {
|
|
"status": "healthy",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@app.get("/api/status")
|
|
async def get_status():
|
|
"""Get current system status and database stats."""
|
|
system_status = await get_system_status_async()
|
|
db_data = await get_database_stats()
|
|
|
|
return {
|
|
**system_status,
|
|
**db_data,
|
|
"agents": AGENT_PIPELINE,
|
|
}
|
|
|
|
|
|
@app.get("/api/stream")
|
|
async def stream_events(request: Request):
|
|
"""SSE endpoint for real-time event streaming."""
|
|
|
|
async def event_generator() -> AsyncGenerator[dict, None]:
|
|
queue = await event_broadcaster.subscribe()
|
|
|
|
try:
|
|
# Send initial status
|
|
status = await get_system_status_async()
|
|
yield {
|
|
"event": "status",
|
|
"data": json.dumps(status),
|
|
}
|
|
|
|
# Send initial database data
|
|
db_data = await get_database_stats()
|
|
yield {
|
|
"event": "init",
|
|
"data": json.dumps(db_data),
|
|
}
|
|
|
|
# Stream events
|
|
while True:
|
|
# Check if client disconnected
|
|
if await request.is_disconnected():
|
|
break
|
|
|
|
try:
|
|
# Wait for event with timeout
|
|
event = await asyncio.wait_for(queue.get(), timeout=5.0)
|
|
yield {
|
|
"event": event.get("type", "message"),
|
|
"data": json.dumps(event),
|
|
}
|
|
except asyncio.TimeoutError:
|
|
# Send separate events for each data type (cleaner and easier to debug)
|
|
status = await get_system_status_async()
|
|
db_data = await get_database_stats()
|
|
|
|
# 1. Status update - orchestrator running state
|
|
yield {
|
|
"event": "status_update",
|
|
"data": json.dumps({
|
|
"orchestrator_running": status.get("orchestrator_running", False),
|
|
"timestamp": status.get("timestamp"),
|
|
}),
|
|
}
|
|
|
|
# 2. Project update - current project data from DB (source of truth)
|
|
active_project = db_data.get("active_project")
|
|
if active_project:
|
|
yield {
|
|
"event": "project_update",
|
|
"data": json.dumps(active_project),
|
|
}
|
|
|
|
# 3. Agent update - current agent
|
|
current_agent = active_project.get("current_agent") if active_project else None
|
|
if current_agent:
|
|
yield {
|
|
"event": "agent_update",
|
|
"data": json.dumps({"agent": current_agent}),
|
|
}
|
|
|
|
# 4. Iteration update - dev/test iteration count
|
|
iterations = active_project.get("dev_test_iterations", 0) if active_project else 0
|
|
yield {
|
|
"event": "iteration_update",
|
|
"data": json.dumps({"iterations": iterations}),
|
|
}
|
|
|
|
# 5. Heartbeat - just a ping to confirm connection is alive
|
|
yield {
|
|
"event": "heartbeat",
|
|
"data": json.dumps({"timestamp": datetime.utcnow().isoformat()}),
|
|
}
|
|
|
|
finally:
|
|
await event_broadcaster.unsubscribe(queue)
|
|
|
|
return EventSourceResponse(event_generator())
|
|
|
|
|
|
@app.get("/api/logs")
|
|
async def get_logs():
|
|
"""Get recent logs from database."""
|
|
db_data = await get_database_stats()
|
|
return {
|
|
"logs": db_data.get("recent_logs", []),
|
|
}
|
|
|
|
|
|
@app.get("/api/agents")
|
|
async def get_agents():
|
|
"""Get agent pipeline definition."""
|
|
return {
|
|
"agents": AGENT_PIPELINE,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Event Broadcasting Helper
|
|
# =============================================================================
|
|
|
|
async def broadcast_event(event_type: str, agent: Optional[str] = None,
|
|
message: str = "", data: Optional[Dict] = None):
|
|
"""
|
|
Broadcast an event to all SSE clients.
|
|
Called from the orchestrator to push real-time updates.
|
|
"""
|
|
await event_broadcaster.broadcast({
|
|
"type": event_type,
|
|
"agent": agent,
|
|
"message": message,
|
|
"data": data or {},
|
|
})
|
|
|
|
|
|
# =============================================================================
|
|
# Static Files Mount (for images)
|
|
# =============================================================================
|
|
|
|
images_path = Path(__file__).parent / "images"
|
|
if images_path.exists():
|
|
app.mount("/images", StaticFiles(directory=str(images_path)), name="images")
|
|
|
|
|
|
# Expose for external use
|
|
__all__ = ["app", "event_broadcaster", "broadcast_event"]
|