1060 lines
42 KiB
Python
1060 lines
42 KiB
Python
"""
|
|
Workflow Orchestrator for 7000%AUTO
|
|
Manages the AI agent pipeline from ideation to promotion
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Optional, Dict, Any, List, Callable
|
|
|
|
from .state import StateManager, ProjectState, AgentType
|
|
from .opencode_client import OpenCodeClient
|
|
|
|
# DevTest MCP communication imports
|
|
from database.db import (
|
|
get_project_implementation_status_json,
|
|
get_project_test_result_json,
|
|
clear_project_devtest_state,
|
|
get_project_ci_result_json,
|
|
get_project_upload_status_json,
|
|
clear_project_ci_state,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WorkflowEventType(str, Enum):
|
|
"""Types of workflow events for SSE"""
|
|
WORKFLOW_STARTED = "workflow_started"
|
|
AGENT_STARTED = "agent_started"
|
|
AGENT_OUTPUT = "agent_output"
|
|
AGENT_COMPLETED = "agent_completed"
|
|
AGENT_ERROR = "agent_error"
|
|
TEST_PASSED = "test_passed"
|
|
TEST_FAILED = "test_failed"
|
|
ITERATION_STARTED = "iteration_started"
|
|
WORKFLOW_COMPLETED = "workflow_completed"
|
|
WORKFLOW_FAILED = "workflow_failed"
|
|
LOG = "log"
|
|
|
|
|
|
@dataclass
|
|
class WorkflowEvent:
|
|
"""Event emitted during workflow execution"""
|
|
type: WorkflowEventType
|
|
agent: Optional[str] = None
|
|
message: str = ""
|
|
data: Optional[Dict[str, Any]] = None
|
|
timestamp: datetime = field(default_factory=datetime.utcnow)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"type": self.type.value,
|
|
"agent": self.agent,
|
|
"message": self.message,
|
|
"data": self.data,
|
|
"timestamp": self.timestamp.isoformat()
|
|
}
|
|
|
|
|
|
class WorkflowOrchestrator:
|
|
"""
|
|
Main orchestrator for the AI agent pipeline.
|
|
|
|
Manages the flow: Ideator -> Planner -> Developer <-> Tester -> Uploader -> Evangelist
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.state_manager = StateManager()
|
|
self.client = OpenCodeClient()
|
|
self._running = False
|
|
self._event_listeners: List[Callable[[WorkflowEvent], None]] = []
|
|
|
|
def _create_output_callback(self, agent_name: str):
|
|
"""
|
|
Create an async callback for streaming agent output.
|
|
|
|
Args:
|
|
agent_name: Name of the agent for the output event
|
|
|
|
Returns:
|
|
Async callback function that emits AGENT_OUTPUT events
|
|
"""
|
|
async def output_callback(content: str):
|
|
logger.info(f"[OUTPUT_CALLBACK] Agent '{agent_name}': Received content ({len(content) if content else 0} chars)")
|
|
if content:
|
|
preview = content[:100] if len(content) > 100 else content
|
|
logger.info(f"[OUTPUT_CALLBACK] Agent '{agent_name}': Content preview: {preview}...")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_OUTPUT,
|
|
agent=agent_name,
|
|
message=content,
|
|
data={"streaming": True}
|
|
))
|
|
logger.info(f"[OUTPUT_CALLBACK] Agent '{agent_name}': Event emitted")
|
|
return output_callback
|
|
|
|
def add_event_listener(self, listener: Callable[[WorkflowEvent], None]):
|
|
"""Add event listener for SSE streaming"""
|
|
self._event_listeners.append(listener)
|
|
|
|
def remove_event_listener(self, listener: Callable[[WorkflowEvent], None]):
|
|
"""Remove event listener"""
|
|
if listener in self._event_listeners:
|
|
self._event_listeners.remove(listener)
|
|
|
|
async def _emit_event(self, event: WorkflowEvent):
|
|
"""Emit event to all listeners and web dashboard"""
|
|
# Log all events for debugging
|
|
if event.type == WorkflowEventType.AGENT_OUTPUT:
|
|
preview = event.message[:100] if event.message else 'empty'
|
|
logger.info(f"[EMIT_EVENT] AGENT_OUTPUT: agent={event.agent}, len={len(event.message) if event.message else 0}, preview={preview}...")
|
|
else:
|
|
logger.info(f"[EMIT_EVENT] {event.type.value}: agent={event.agent}, message={event.message[:50] if event.message else 'none'}...")
|
|
|
|
# Emit to local listeners
|
|
for listener in self._event_listeners:
|
|
try:
|
|
if asyncio.iscoroutinefunction(listener):
|
|
await listener(event)
|
|
else:
|
|
listener(event)
|
|
except Exception as e:
|
|
logger.error(f"[EMIT_EVENT] Local listener error: {e}")
|
|
|
|
# Emit to web dashboard SSE broadcaster
|
|
try:
|
|
from web.app import event_broadcaster
|
|
event_data = {
|
|
"type": event.type.value,
|
|
"agent": event.agent,
|
|
"message": event.message,
|
|
"data": event.data,
|
|
}
|
|
logger.info(f"[EMIT_EVENT] Broadcasting to SSE: type={event.type.value}, agent={event.agent}")
|
|
await event_broadcaster.broadcast(event_data)
|
|
logger.info(f"[EMIT_EVENT] Broadcast SUCCESS: type={event.type.value}")
|
|
except ImportError:
|
|
logger.warning(f"[EMIT_EVENT] Web module not available, cannot broadcast")
|
|
except Exception as e:
|
|
logger.error(f"[EMIT_EVENT] Broadcast FAILED: {e}")
|
|
|
|
async def _log(self, project_id: int, agent: str, message: str, log_type: str = "info"):
|
|
"""Log agent activity to database and emit event"""
|
|
try:
|
|
from database import add_agent_log
|
|
await add_agent_log(project_id, agent, message, log_type)
|
|
except Exception as e:
|
|
logger.error(f"Failed to log: {e}")
|
|
|
|
# Emit workflow event
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.LOG,
|
|
agent=agent,
|
|
message=message,
|
|
data={"log_type": log_type, "project_id": project_id}
|
|
))
|
|
|
|
async def run_ideator(self, project_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Run the Ideator agent to generate a project idea"""
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_STARTED,
|
|
agent="ideator",
|
|
message="Starting idea generation"
|
|
))
|
|
|
|
session_id = None
|
|
try:
|
|
session_id = await self.client.create_session("ideator")
|
|
|
|
# Pass project_id so agent can use submit_idea tool with it
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
Search for trending topics and generate ONE innovative project idea.
|
|
|
|
Use the search tools to find inspiration from:
|
|
- arXiv papers
|
|
- Reddit programming communities
|
|
- Hacker News
|
|
- Product Hunt
|
|
|
|
Then check the database for existing ideas to avoid duplicates.
|
|
|
|
When you have finalized your idea, use the submit_idea tool with project_id={project_id} to save it."""
|
|
|
|
# Run the agent with streaming output callback (real-time streaming)
|
|
result = await self.client.send_message(
|
|
session_id,
|
|
prompt,
|
|
output_callback=self._create_output_callback("ideator")
|
|
)
|
|
|
|
await self.client.close_session(session_id)
|
|
session_id = None
|
|
|
|
# Get the submitted idea from database
|
|
from database.db import get_project_idea_json
|
|
idea = await get_project_idea_json(project_id)
|
|
|
|
if idea:
|
|
logger.info(f"Ideator submitted idea: {idea.get('title', 'Unknown')}")
|
|
await self._log(project_id, "ideator", f"Generated idea: {idea.get('title', 'Unknown')}", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_COMPLETED,
|
|
agent="ideator",
|
|
message="Idea generated successfully",
|
|
data=idea
|
|
))
|
|
return idea
|
|
|
|
raise Exception("Ideator did not submit idea via submit_idea tool")
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "API Error" in error_msg or "Auth" in error_msg:
|
|
error_msg = f"{error_msg} - Please check your OPENCODE_API_KEY environment variable"
|
|
|
|
await self._log(project_id, "ideator", f"Error: {error_msg}", "error")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_ERROR,
|
|
agent="ideator",
|
|
message=error_msg
|
|
))
|
|
return None
|
|
finally:
|
|
if session_id:
|
|
try:
|
|
await self.client.close_session(session_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def run_planner(self, project_id: int, idea: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Run the Planner agent to create implementation plan"""
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_STARTED,
|
|
agent="planner",
|
|
message="Creating implementation plan"
|
|
))
|
|
|
|
session_id = None
|
|
try:
|
|
session_id = await self.client.create_session("planner")
|
|
|
|
# Pass project_id so agent can use submit_plan tool with it
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
Create a detailed implementation plan for this project idea:
|
|
|
|
{idea}
|
|
|
|
Research the best technologies and create a comprehensive plan including:
|
|
- Technology stack selection
|
|
- File structure
|
|
- Feature breakdown
|
|
- Implementation steps
|
|
- Testing strategy
|
|
|
|
When you have finalized your plan, use the submit_plan tool with project_id={project_id} to save it."""
|
|
|
|
# Run the agent with streaming output callback (real-time streaming)
|
|
result = await self.client.send_message(
|
|
session_id,
|
|
prompt,
|
|
output_callback=self._create_output_callback("planner")
|
|
)
|
|
|
|
await self.client.close_session(session_id)
|
|
session_id = None
|
|
|
|
# Get the submitted plan from database
|
|
from database.db import get_project_plan_json
|
|
plan = await get_project_plan_json(project_id)
|
|
|
|
if plan:
|
|
logger.info(f"Planner submitted plan: {plan.get('project_name', 'Unknown')}")
|
|
await self._log(project_id, "planner", f"Created plan for: {plan.get('project_name', 'Unknown')}", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_COMPLETED,
|
|
agent="planner",
|
|
message="Plan created successfully",
|
|
data=plan
|
|
))
|
|
return plan
|
|
|
|
raise Exception("Planner did not submit plan via submit_plan tool")
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "API Error" in error_msg or "Auth" in error_msg:
|
|
error_msg = f"{error_msg} - Please check your OPENCODE_API_KEY environment variable"
|
|
|
|
await self._log(project_id, "planner", f"Error: {error_msg}", "error")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_ERROR,
|
|
agent="planner",
|
|
message=error_msg
|
|
))
|
|
return None
|
|
finally:
|
|
if session_id:
|
|
try:
|
|
await self.client.close_session(session_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def run_developer(self, project_id: int, plan: Dict[str, Any], is_fixing: bool = False) -> bool:
|
|
"""Run the Developer agent to implement the project"""
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_STARTED,
|
|
agent="developer",
|
|
message="Starting implementation" if not is_fixing else "Fixing bugs"
|
|
))
|
|
|
|
session_id = None
|
|
try:
|
|
session_id = await self.client.create_session("developer")
|
|
|
|
if is_fixing:
|
|
# Developer will use get_test_result MCP tool to see bugs
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
The Tester found bugs in your implementation. Use the get_test_result tool with project_id={project_id} to see the detailed bug report.
|
|
|
|
Fix all the bugs reported by the Tester. After fixing, use the submit_implementation_status tool with:
|
|
- project_id={project_id}
|
|
- status="fixed"
|
|
- bugs_addressed: list the bugs you fixed
|
|
- ready_for_testing=True"""
|
|
else:
|
|
# New implementation
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
Implement this project according to the plan:
|
|
|
|
{plan}
|
|
|
|
Create all files, install dependencies, and ensure the project is complete and working.
|
|
|
|
When done, use the submit_implementation_status tool with:
|
|
- project_id={project_id}
|
|
- status="completed"
|
|
- files_created: list the files you created
|
|
- dependencies_installed: list the packages you installed
|
|
- ready_for_testing=True"""
|
|
|
|
# Run the agent with streaming output callback (real-time streaming)
|
|
result = await self.client.send_message(
|
|
session_id,
|
|
prompt,
|
|
output_callback=self._create_output_callback("developer")
|
|
)
|
|
|
|
await self.client.close_session(session_id)
|
|
session_id = None
|
|
|
|
# Verify implementation status was submitted
|
|
impl_status = await get_project_implementation_status_json(project_id)
|
|
|
|
if impl_status and impl_status.get("ready_for_testing"):
|
|
status = impl_status.get("status", "completed")
|
|
await self._log(project_id, "developer", f"Implementation {status}", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_COMPLETED,
|
|
agent="developer",
|
|
message=f"Implementation {status}",
|
|
data=impl_status
|
|
))
|
|
return True
|
|
else:
|
|
# Even without explicit status, consider success if no exception
|
|
await self._log(project_id, "developer", "Implementation completed", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_COMPLETED,
|
|
agent="developer",
|
|
message="Implementation completed"
|
|
))
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "API Error" in error_msg or "Auth" in error_msg:
|
|
error_msg = f"{error_msg} - Please check your OPENCODE_API_KEY environment variable"
|
|
|
|
await self._log(project_id, "developer", f"Error: {error_msg}", "error")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_ERROR,
|
|
agent="developer",
|
|
message=error_msg
|
|
))
|
|
return False
|
|
finally:
|
|
if session_id:
|
|
try:
|
|
await self.client.close_session(session_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def run_tester(self, project_id: int) -> Dict[str, Any]:
|
|
"""Run the Tester agent to validate the implementation"""
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_STARTED,
|
|
agent="tester",
|
|
message="Running tests"
|
|
))
|
|
|
|
session_id = None
|
|
try:
|
|
session_id = await self.client.create_session("tester")
|
|
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
Test the implemented project:
|
|
|
|
1. Run linting and type checking
|
|
2. Run unit tests
|
|
3. Verify the build works
|
|
4. Check for obvious bugs
|
|
|
|
Run the actual test commands and report if they pass or fail.
|
|
|
|
When done, use the submit_test_result tool with:
|
|
- project_id={project_id}
|
|
- status="PASS" or "FAIL"
|
|
- summary: brief description of results
|
|
- checks_performed: list of checks you ran
|
|
- bugs: list of bugs found (if any)
|
|
- ready_for_upload: true only if all tests pass"""
|
|
|
|
# Run the agent with streaming output callback (real-time streaming)
|
|
result = await self.client.send_message(
|
|
session_id,
|
|
prompt,
|
|
output_callback=self._create_output_callback("tester")
|
|
)
|
|
|
|
await self.client.close_session(session_id)
|
|
session_id = None
|
|
|
|
# Get test result from database (submitted via MCP)
|
|
test_result = await get_project_test_result_json(project_id)
|
|
|
|
if test_result:
|
|
status = test_result.get("status", "PASS")
|
|
bugs = test_result.get("bugs", [])
|
|
|
|
if status == "PASS":
|
|
await self._log(project_id, "tester", "All tests passed", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.TEST_PASSED,
|
|
agent="tester",
|
|
message="All tests passed",
|
|
data=test_result
|
|
))
|
|
return {"status": "PASS"}
|
|
else:
|
|
await self._log(project_id, "tester", f"Tests failed: {len(bugs)} bugs found", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.TEST_FAILED,
|
|
agent="tester",
|
|
message=f"Tests failed: {len(bugs)} bugs found",
|
|
data=test_result
|
|
))
|
|
return {"status": "FAIL", "bugs": bugs}
|
|
else:
|
|
# No explicit test result - assume pass if no exception
|
|
await self._log(project_id, "tester", "Tests completed", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.TEST_PASSED,
|
|
agent="tester",
|
|
message="Tests completed"
|
|
))
|
|
return {"status": "PASS"}
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "API Error" in error_msg or "Auth" in error_msg:
|
|
error_msg = f"{error_msg} - Please check your OPENCODE_API_KEY environment variable"
|
|
|
|
await self._log(project_id, "tester", f"Error: {error_msg}", "error")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.TEST_FAILED,
|
|
agent="tester",
|
|
message="Tests failed",
|
|
data={"error": error_msg}
|
|
))
|
|
return {"status": "FAIL", "error": error_msg}
|
|
finally:
|
|
if session_id:
|
|
try:
|
|
await self.client.close_session(session_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def run_uploader(self, project_id: int, project_name: str, is_fixing: bool = False) -> Optional[str]:
|
|
"""Run the Uploader agent to publish to Gitea"""
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_STARTED,
|
|
agent="uploader",
|
|
message="Uploading to Gitea" if not is_fixing else "Re-uploading fixes to Gitea"
|
|
))
|
|
|
|
session_id = None
|
|
try:
|
|
session_id = await self.client.create_session("uploader")
|
|
|
|
if is_fixing:
|
|
# Re-upload after CI fix
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
The Developer has fixed CI/CD issues. Use the get_ci_result tool to see what was fixed.
|
|
|
|
Re-upload the fixed code to the existing repository "{project_name}" on Gitea:
|
|
|
|
1. Push the updated/fixed files to the existing repository
|
|
2. Use a meaningful commit message describing the CI fixes
|
|
|
|
After pushing, use the submit_upload_status tool with:
|
|
- project_id={project_id}
|
|
- status="completed"
|
|
- repo_name="{project_name}"
|
|
- gitea_url: the repository URL
|
|
- files_pushed: list of files you pushed
|
|
|
|
Use the gitea tools to push the updates."""
|
|
else:
|
|
# Initial upload
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
Upload the project "{project_name}" to Gitea:
|
|
|
|
1. Create a new public repository named "{project_name}"
|
|
2. Write a comprehensive README
|
|
3. Set up Gitea Actions for CI/CD
|
|
4. Push all code
|
|
5. Create an initial release if appropriate
|
|
|
|
After uploading, use the submit_upload_status tool with:
|
|
- project_id={project_id}
|
|
- status="completed"
|
|
- repo_name="{project_name}"
|
|
- gitea_url: the full repository URL on Gitea
|
|
- files_pushed: list of files you pushed
|
|
|
|
Use the gitea tools to create and push the repository."""
|
|
|
|
# Run the agent with streaming output callback (real-time streaming)
|
|
result = await self.client.send_message(
|
|
session_id,
|
|
prompt,
|
|
output_callback=self._create_output_callback("uploader")
|
|
)
|
|
|
|
await self.client.close_session(session_id)
|
|
session_id = None
|
|
|
|
# Get actual Gitea URL from upload status submitted by agent
|
|
upload_status = await get_project_upload_status_json(project_id)
|
|
if upload_status and upload_status.get("gitea_url"):
|
|
gitea_url = upload_status.get("gitea_url")
|
|
else:
|
|
# Fallback: construct URL from project name
|
|
from config import settings
|
|
gitea_url = f"{settings.GITEA_URL.rstrip('/')}/{settings.GITEA_USERNAME}/{project_name}"
|
|
|
|
await self._log(project_id, "uploader", f"Uploaded to Gitea: {gitea_url}", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_COMPLETED,
|
|
agent="uploader",
|
|
message="Upload completed",
|
|
data={"gitea_url": gitea_url}
|
|
))
|
|
return gitea_url
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "API Error" in error_msg or "Auth" in error_msg:
|
|
error_msg = f"{error_msg} - Please check your OPENCODE_API_KEY environment variable"
|
|
|
|
await self._log(project_id, "uploader", f"Error: {error_msg}", "error")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_ERROR,
|
|
agent="uploader",
|
|
message=error_msg
|
|
))
|
|
return None
|
|
finally:
|
|
if session_id:
|
|
try:
|
|
await self.client.close_session(session_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def run_ci_tester(self, project_id: int, project_name: str, gitea_url: str) -> Dict[str, Any]:
|
|
"""Run the Tester agent to check Gitea Actions CI/CD status"""
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_STARTED,
|
|
agent="tester",
|
|
message="Checking Gitea Actions CI/CD status"
|
|
))
|
|
|
|
session_id = None
|
|
try:
|
|
session_id = await self.client.create_session("tester")
|
|
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
The Uploader has pushed code to Gitea. Check the Gitea Actions CI/CD status:
|
|
|
|
1. Use get_latest_workflow_status tool with repo="{project_name}" to check CI status
|
|
2. If status is "pending", wait a moment and check again (CI may still be running)
|
|
3. If status is "passed", CI is successful
|
|
4. If status is "failed", use get_workflow_run_jobs to see which jobs failed
|
|
|
|
After checking, use submit_ci_result tool with:
|
|
- project_id={project_id}
|
|
- status="PASS" or "FAIL" or "PENDING"
|
|
- repo_name="{project_name}"
|
|
- gitea_url="{gitea_url}"
|
|
- run_id, run_url, summary, failed_jobs (if applicable)
|
|
- error_logs: any relevant error messages from failed jobs
|
|
|
|
Be thorough in reporting what went wrong if CI failed."""
|
|
|
|
# Run the agent with streaming output callback
|
|
result = await self.client.send_message(
|
|
session_id,
|
|
prompt,
|
|
output_callback=self._create_output_callback("tester")
|
|
)
|
|
|
|
await self.client.close_session(session_id)
|
|
session_id = None
|
|
|
|
# Get CI result from database (submitted via MCP)
|
|
ci_result = await get_project_ci_result_json(project_id)
|
|
|
|
if ci_result:
|
|
status = ci_result.get("status", "PASS")
|
|
|
|
if status == "PASS":
|
|
await self._log(project_id, "tester", "CI/CD passed", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.TEST_PASSED,
|
|
agent="tester",
|
|
message="CI/CD passed",
|
|
data=ci_result
|
|
))
|
|
return {"status": "PASS"}
|
|
elif status == "PENDING":
|
|
await self._log(project_id, "tester", "CI/CD still pending", "output")
|
|
return {"status": "PENDING"}
|
|
else:
|
|
failed_jobs = ci_result.get("failed_jobs", [])
|
|
await self._log(project_id, "tester", f"CI/CD failed: {len(failed_jobs)} jobs failed", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.TEST_FAILED,
|
|
agent="tester",
|
|
message=f"CI/CD failed: {ci_result.get('summary', 'Unknown error')}",
|
|
data=ci_result
|
|
))
|
|
return {"status": "FAIL", "ci_result": ci_result}
|
|
else:
|
|
# No explicit CI result - assume pass if no exception
|
|
await self._log(project_id, "tester", "CI check completed (no explicit result)", "output")
|
|
return {"status": "PASS"}
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "API Error" in error_msg or "Auth" in error_msg:
|
|
error_msg = f"{error_msg} - Please check your OPENCODE_API_KEY environment variable"
|
|
|
|
await self._log(project_id, "tester", f"Error checking CI: {error_msg}", "error")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_ERROR,
|
|
agent="tester",
|
|
message=error_msg
|
|
))
|
|
return {"status": "FAIL", "error": error_msg}
|
|
finally:
|
|
if session_id:
|
|
try:
|
|
await self.client.close_session(session_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def run_ci_developer(self, project_id: int, plan: Dict[str, Any]) -> bool:
|
|
"""Run the Developer agent to fix CI/CD issues"""
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_STARTED,
|
|
agent="developer",
|
|
message="Fixing CI/CD issues"
|
|
))
|
|
|
|
session_id = None
|
|
try:
|
|
session_id = await self.client.create_session("developer")
|
|
|
|
prompt = f"""You are working on project_id={project_id}.
|
|
|
|
The Tester found that Gitea Actions CI/CD failed. Use get_ci_result tool with project_id={project_id} to see the detailed CI failure report.
|
|
|
|
Fix all the CI/CD issues reported. Common issues include:
|
|
- Test failures
|
|
- Linting errors
|
|
- Build errors
|
|
- Missing dependencies
|
|
- Configuration issues
|
|
|
|
After fixing, use submit_implementation_status tool with:
|
|
- project_id={project_id}
|
|
- status="fixed"
|
|
- bugs_addressed: list the CI issues you fixed
|
|
- ready_for_testing=True"""
|
|
|
|
# Run the agent with streaming output callback
|
|
result = await self.client.send_message(
|
|
session_id,
|
|
prompt,
|
|
output_callback=self._create_output_callback("developer")
|
|
)
|
|
|
|
await self.client.close_session(session_id)
|
|
session_id = None
|
|
|
|
# Verify implementation status was submitted
|
|
impl_status = await get_project_implementation_status_json(project_id)
|
|
|
|
if impl_status and impl_status.get("ready_for_testing"):
|
|
await self._log(project_id, "developer", "CI fixes completed", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_COMPLETED,
|
|
agent="developer",
|
|
message="CI fixes completed",
|
|
data=impl_status
|
|
))
|
|
return True
|
|
else:
|
|
await self._log(project_id, "developer", "CI fix attempt completed", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_COMPLETED,
|
|
agent="developer",
|
|
message="CI fix attempt completed"
|
|
))
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "API Error" in error_msg or "Auth" in error_msg:
|
|
error_msg = f"{error_msg} - Please check your OPENCODE_API_KEY environment variable"
|
|
|
|
await self._log(project_id, "developer", f"Error: {error_msg}", "error")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_ERROR,
|
|
agent="developer",
|
|
message=error_msg
|
|
))
|
|
return False
|
|
finally:
|
|
if session_id:
|
|
try:
|
|
await self.client.close_session(session_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def run_evangelist(self, project_id: int, gitea_url: str, project_info: Dict[str, Any]) -> Optional[str]:
|
|
"""Run the Evangelist agent to promote on X/Twitter"""
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_STARTED,
|
|
agent="evangelist",
|
|
message="Creating promotional post"
|
|
))
|
|
|
|
session_id = None
|
|
try:
|
|
session_id = await self.client.create_session("evangelist")
|
|
|
|
prompt = f"""Create and post an engaging X/Twitter post to promote this project:
|
|
|
|
Project: {project_info.get('title', 'Unknown')}
|
|
Description: {project_info.get('description', '')}
|
|
Gitea Repository: {gitea_url}
|
|
|
|
IMPORTANT: Use the Gitea URL above (not GitHub). This project is hosted on Gitea.
|
|
|
|
Use the x_api tools to create and post a compelling tweet under 280 characters with emojis and hashtags."""
|
|
|
|
# Run the agent with streaming output callback (real-time streaming)
|
|
result = await self.client.send_message(
|
|
session_id,
|
|
prompt,
|
|
output_callback=self._create_output_callback("evangelist")
|
|
)
|
|
|
|
await self.client.close_session(session_id)
|
|
session_id = None
|
|
|
|
# Agent uses x_api MCP to post - consider it done
|
|
await self._log(project_id, "evangelist", "Promotional post created", "output")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_COMPLETED,
|
|
agent="evangelist",
|
|
message="Post created"
|
|
))
|
|
return "posted"
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "API Error" in error_msg or "Auth" in error_msg:
|
|
error_msg = f"{error_msg} - Please check your OPENCODE_API_KEY environment variable"
|
|
|
|
await self._log(project_id, "evangelist", f"Error: {error_msg}", "error")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.AGENT_ERROR,
|
|
agent="evangelist",
|
|
message=error_msg
|
|
))
|
|
return None
|
|
finally:
|
|
if session_id:
|
|
try:
|
|
await self.client.close_session(session_id)
|
|
except Exception:
|
|
pass
|
|
|
|
async def run_full_pipeline(self) -> Dict[str, Any]:
|
|
"""
|
|
Run the complete workflow pipeline.
|
|
|
|
Developer-Tester loop runs INFINITELY until tests pass.
|
|
|
|
Returns:
|
|
Dict with keys: success, project_id, github_url, x_post_url, dev_test_iterations, error
|
|
"""
|
|
self._running = True
|
|
|
|
try:
|
|
from database import (
|
|
create_idea, create_project, update_project_status,
|
|
mark_idea_used, ProjectStatus
|
|
)
|
|
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.WORKFLOW_STARTED,
|
|
message="Starting new project pipeline"
|
|
))
|
|
|
|
# Create initial project record
|
|
idea_record = await create_idea(
|
|
title="Pending",
|
|
description="Generating idea...",
|
|
source="system"
|
|
)
|
|
|
|
project = await create_project(
|
|
idea_id=idea_record.id,
|
|
name="pending"
|
|
)
|
|
|
|
project_id = project.id
|
|
await self.state_manager.create_state(project_id)
|
|
await self.state_manager.set_active_project(project_id)
|
|
|
|
# 1. IDEATOR
|
|
await update_project_status(project_id, "ideation", current_agent="ideator")
|
|
idea = await self.run_ideator(project_id)
|
|
if not idea:
|
|
await update_project_status(project_id, "failed")
|
|
return {"success": False, "project_id": project_id, "error": "Ideator failed to generate idea"}
|
|
|
|
await self.state_manager.update_state(project_id, idea=idea)
|
|
|
|
# 2. PLANNER
|
|
await update_project_status(project_id, "planning", current_agent="planner")
|
|
plan = await self.run_planner(project_id, idea)
|
|
if not plan:
|
|
await update_project_status(project_id, "failed")
|
|
return {"success": False, "project_id": project_id, "error": "Planner failed to create plan"}
|
|
|
|
await self.state_manager.update_state(project_id, plan=plan)
|
|
|
|
project_name = plan.get("project_name", idea.get("title", "project"))
|
|
await update_project_status(project_id, "planning", plan_json=plan, name=project_name)
|
|
|
|
# 3. DEVELOPER -> TESTER LOOP (INFINITE)
|
|
iteration = 0
|
|
is_fixing = False
|
|
|
|
while self._running:
|
|
iteration += 1
|
|
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.ITERATION_STARTED,
|
|
message=f"Developer-Tester iteration {iteration}",
|
|
data={"iteration": iteration}
|
|
))
|
|
|
|
# Clear previous devtest state for new iteration
|
|
await clear_project_devtest_state(project_id)
|
|
|
|
await update_project_status(
|
|
project_id, "development",
|
|
current_agent="developer",
|
|
dev_test_iterations=iteration
|
|
)
|
|
|
|
# Developer (uses MCP to get test results if is_fixing)
|
|
success = await self.run_developer(project_id, plan, is_fixing)
|
|
if not success:
|
|
continue # Try again
|
|
|
|
# Tester (uses MCP to submit test results)
|
|
await update_project_status(project_id, "testing", current_agent="tester")
|
|
test_result = await self.run_tester(project_id)
|
|
|
|
if test_result.get("status") == "PASS":
|
|
break # Exit loop - tests passed!
|
|
|
|
# Tests failed - next iteration will be a fix
|
|
is_fixing = True
|
|
|
|
await self.state_manager.update_state(
|
|
project_id,
|
|
dev_test_iterations=iteration,
|
|
error=f"Test failed at iteration {iteration}"
|
|
)
|
|
|
|
if not self._running:
|
|
await update_project_status(project_id, "failed")
|
|
return {"success": False, "project_id": project_id, "error": "Pipeline was stopped"}
|
|
|
|
# 4. UPLOADER -> CI TESTER -> DEVELOPER LOOP
|
|
# Initial upload
|
|
await update_project_status(project_id, "uploading", current_agent="uploader")
|
|
gitea_url = await self.run_uploader(project_id, project_name, is_fixing=False)
|
|
if not gitea_url:
|
|
await update_project_status(project_id, "failed")
|
|
return {"success": False, "project_id": project_id, "error": "Uploader failed to upload to Gitea"}
|
|
|
|
await update_project_status(project_id, "uploading", gitea_url=gitea_url)
|
|
|
|
# CI/CD verification loop: Uploader -> Tester -> Developer -> Uploader ...
|
|
ci_iteration = 0
|
|
max_ci_iterations = 5 # Prevent infinite loops
|
|
|
|
while self._running and ci_iteration < max_ci_iterations:
|
|
ci_iteration += 1
|
|
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.ITERATION_STARTED,
|
|
message=f"CI/CD verification iteration {ci_iteration}",
|
|
data={"ci_iteration": ci_iteration}
|
|
))
|
|
|
|
# Clear CI state for new iteration
|
|
await clear_project_ci_state(project_id)
|
|
|
|
# Tester checks CI/CD status
|
|
await update_project_status(
|
|
project_id, "testing",
|
|
current_agent="tester",
|
|
ci_test_iterations=ci_iteration
|
|
)
|
|
|
|
ci_result = await self.run_ci_tester(project_id, project_name, gitea_url)
|
|
|
|
if ci_result.get("status") == "PASS":
|
|
logger.info(f"CI/CD passed after {ci_iteration} iteration(s)")
|
|
break # CI passed - proceed to evangelist
|
|
|
|
if ci_result.get("status") == "PENDING":
|
|
# CI still running - wait and retry
|
|
await asyncio.sleep(10) # Wait 10 seconds
|
|
ci_iteration -= 1 # Don't count this as a fix iteration
|
|
continue
|
|
|
|
# CI failed - Developer fixes, then Uploader re-uploads
|
|
await update_project_status(project_id, "development", current_agent="developer")
|
|
fix_success = await self.run_ci_developer(project_id, plan)
|
|
if not fix_success:
|
|
continue # Try again
|
|
|
|
# Re-upload the fixes
|
|
await update_project_status(project_id, "uploading", current_agent="uploader")
|
|
gitea_url = await self.run_uploader(project_id, project_name, is_fixing=True)
|
|
if not gitea_url:
|
|
continue # Try again
|
|
|
|
await update_project_status(project_id, "uploading", gitea_url=gitea_url)
|
|
|
|
if not self._running:
|
|
await update_project_status(project_id, "failed")
|
|
return {"success": False, "project_id": project_id, "error": "Pipeline was stopped"}
|
|
|
|
if ci_iteration >= max_ci_iterations:
|
|
logger.warning(f"CI/CD loop reached max iterations ({max_ci_iterations})")
|
|
# Continue anyway - let the project complete
|
|
|
|
# 5. EVANGELIST
|
|
await update_project_status(project_id, "promoting", current_agent="evangelist")
|
|
x_post_url = await self.run_evangelist(project_id, gitea_url, idea)
|
|
|
|
if x_post_url:
|
|
await update_project_status(project_id, "promoting", x_post_url=x_post_url)
|
|
|
|
# COMPLETED!
|
|
await update_project_status(project_id, "completed", current_agent=None)
|
|
await mark_idea_used(idea_record.id)
|
|
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.WORKFLOW_COMPLETED,
|
|
message="Project completed successfully!",
|
|
data={
|
|
"project_id": project_id,
|
|
"project_name": project_name,
|
|
"gitea_url": gitea_url,
|
|
"dev_test_iterations": iteration,
|
|
"ci_test_iterations": ci_iteration
|
|
}
|
|
))
|
|
|
|
return {
|
|
"success": True,
|
|
"project_id": project_id,
|
|
"project_name": project_name,
|
|
"gitea_url": gitea_url,
|
|
"x_post_url": x_post_url,
|
|
"dev_test_iterations": iteration,
|
|
"ci_test_iterations": ci_iteration,
|
|
"error": None
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Pipeline failed: {e}")
|
|
await self._emit_event(WorkflowEvent(
|
|
type=WorkflowEventType.WORKFLOW_FAILED,
|
|
message=str(e)
|
|
))
|
|
return {
|
|
"success": False,
|
|
"project_id": locals().get("project_id"),
|
|
"error": str(e)
|
|
}
|
|
|
|
finally:
|
|
await self.state_manager.set_active_project(None)
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
"""Check if the pipeline is currently running"""
|
|
return self._running
|
|
|
|
async def stop(self):
|
|
"""Stop the running pipeline"""
|
|
self._running = False
|
|
# Close the HTTP client
|
|
if self.client:
|
|
await self.client.close()
|