620 lines
21 KiB
Python
620 lines
21 KiB
Python
"""
|
|
Gitea MCP Server for 7000%AUTO
|
|
Provides Gitea repository management functionality
|
|
"""
|
|
|
|
import base64
|
|
import logging
|
|
import os
|
|
from typing import Optional, Dict, Any
|
|
|
|
import httpx
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
mcp = FastMCP("Gitea Server")
|
|
|
|
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
|
GITEA_URL = os.getenv("GITEA_URL", "https://7000pct.gitea.bloupla.net")
|
|
GITEA_USERNAME = os.getenv("GITEA_USERNAME", "")
|
|
|
|
|
|
def get_api_base_url() -> str:
|
|
"""Get the Gitea API base URL"""
|
|
return f"{GITEA_URL.rstrip('/')}/api/v1"
|
|
|
|
|
|
def get_auth_headers() -> Dict[str, str]:
|
|
"""Get authentication headers for Gitea API"""
|
|
return {
|
|
"Authorization": f"token {GITEA_TOKEN}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
}
|
|
|
|
|
|
async def get_gitea_username() -> str:
|
|
"""Get Gitea username from env or fetch from API"""
|
|
if GITEA_USERNAME:
|
|
return GITEA_USERNAME
|
|
|
|
if not GITEA_TOKEN:
|
|
return ""
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
base_url=get_api_base_url(),
|
|
headers=get_auth_headers(),
|
|
timeout=30.0,
|
|
) as client:
|
|
response = await client.get("/user")
|
|
if response.status_code == 200:
|
|
return response.json().get("login", "")
|
|
except Exception as e:
|
|
logger.error(f"Failed to get Gitea username: {e}")
|
|
|
|
return ""
|
|
|
|
|
|
@mcp.tool()
|
|
async def create_repo(name: str, description: str, private: bool = False) -> dict:
|
|
"""
|
|
Create a new Gitea repository.
|
|
|
|
Args:
|
|
name: Repository name (kebab-case recommended)
|
|
description: Repository description
|
|
private: Whether the repo should be private (default False)
|
|
|
|
Returns:
|
|
Dictionary with repository URL and details
|
|
"""
|
|
if not GITEA_TOKEN:
|
|
return {"success": False, "error": "Gitea token not configured"}
|
|
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
base_url=get_api_base_url(),
|
|
headers=get_auth_headers(),
|
|
timeout=30.0,
|
|
) as client:
|
|
response = await client.post(
|
|
"/user/repos",
|
|
json={
|
|
"name": name,
|
|
"description": description,
|
|
"private": private,
|
|
"auto_init": True,
|
|
"default_branch": "main",
|
|
}
|
|
)
|
|
|
|
if response.status_code in (200, 201):
|
|
repo_data = response.json()
|
|
logger.info(f"Created repository: {repo_data.get('html_url')}")
|
|
|
|
return {
|
|
"success": True,
|
|
"repo": {
|
|
"name": repo_data.get("name"),
|
|
"full_name": repo_data.get("full_name"),
|
|
"url": repo_data.get("html_url"),
|
|
"clone_url": repo_data.get("clone_url"),
|
|
"description": repo_data.get("description"),
|
|
}
|
|
}
|
|
else:
|
|
error_msg = response.json().get("message", response.text)
|
|
logger.error(f"Gitea API error: {error_msg}")
|
|
return {"success": False, "error": error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating repo: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
async def push_files(repo: str, files: dict, message: str, branch: str = "main") -> dict:
|
|
"""
|
|
Push multiple files to a Gitea repository.
|
|
|
|
Args:
|
|
repo: Repository name (username/repo or just repo name)
|
|
files: Dictionary of {path: content} for files to push
|
|
message: Commit message
|
|
branch: Target branch (default "main")
|
|
|
|
Returns:
|
|
Dictionary with commit details
|
|
"""
|
|
if not GITEA_TOKEN:
|
|
return {"success": False, "error": "Gitea token not configured"}
|
|
|
|
try:
|
|
# Determine owner and repo name
|
|
if "/" in repo:
|
|
owner, repo_name = repo.split("/", 1)
|
|
else:
|
|
owner = await get_gitea_username()
|
|
repo_name = repo
|
|
|
|
if not owner:
|
|
return {"success": False, "error": "Could not determine repository owner"}
|
|
|
|
pushed_files = []
|
|
last_commit = None
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=get_api_base_url(),
|
|
headers=get_auth_headers(),
|
|
timeout=30.0,
|
|
) as client:
|
|
for file_path, content in files.items():
|
|
# Check if file exists to determine if we need to update or create
|
|
check_response = await client.get(f"/repos/{owner}/{repo_name}/contents/{file_path}")
|
|
|
|
file_data = {
|
|
"content": base64.b64encode(content.encode()).decode(),
|
|
"message": message,
|
|
"branch": branch,
|
|
}
|
|
|
|
if check_response.status_code == 200:
|
|
# File exists, need to include SHA for update
|
|
existing = check_response.json()
|
|
file_data["sha"] = existing.get("sha")
|
|
response = await client.put(
|
|
f"/repos/{owner}/{repo_name}/contents/{file_path}",
|
|
json=file_data
|
|
)
|
|
else:
|
|
# File doesn't exist, create it
|
|
response = await client.post(
|
|
f"/repos/{owner}/{repo_name}/contents/{file_path}",
|
|
json=file_data
|
|
)
|
|
|
|
if response.status_code in (200, 201):
|
|
result = response.json()
|
|
last_commit = result.get("commit", {})
|
|
pushed_files.append(file_path)
|
|
else:
|
|
error_msg = response.json().get("message", response.text)
|
|
logger.error(f"Failed to push {file_path}: {error_msg}")
|
|
|
|
if pushed_files:
|
|
logger.info(f"Pushed {len(pushed_files)} files to {owner}/{repo_name}")
|
|
return {
|
|
"success": True,
|
|
"commit": {
|
|
"sha": last_commit.get("sha", "") if last_commit else "",
|
|
"message": message,
|
|
"url": f"{GITEA_URL}/{owner}/{repo_name}/commit/{last_commit.get('sha', '')}" if last_commit else ""
|
|
},
|
|
"files_pushed": pushed_files
|
|
}
|
|
else:
|
|
return {"success": False, "error": "No files were pushed"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error pushing files: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
async def create_release(
|
|
repo: str,
|
|
tag: str,
|
|
name: str,
|
|
body: str,
|
|
draft: bool = False,
|
|
prerelease: bool = False
|
|
) -> dict:
|
|
"""
|
|
Create a release on Gitea.
|
|
|
|
Args:
|
|
repo: Repository name
|
|
tag: Tag name (e.g., "v1.0.0")
|
|
name: Release name
|
|
body: Release notes/body
|
|
draft: Whether this is a draft release
|
|
prerelease: Whether this is a prerelease
|
|
|
|
Returns:
|
|
Dictionary with release URL
|
|
"""
|
|
if not GITEA_TOKEN:
|
|
return {"success": False, "error": "Gitea token not configured"}
|
|
|
|
try:
|
|
# Determine owner and repo name
|
|
if "/" in repo:
|
|
owner, repo_name = repo.split("/", 1)
|
|
else:
|
|
owner = await get_gitea_username()
|
|
repo_name = repo
|
|
|
|
if not owner:
|
|
return {"success": False, "error": "Could not determine repository owner"}
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=get_api_base_url(),
|
|
headers=get_auth_headers(),
|
|
timeout=30.0,
|
|
) as client:
|
|
response = await client.post(
|
|
f"/repos/{owner}/{repo_name}/releases",
|
|
json={
|
|
"tag_name": tag,
|
|
"name": name,
|
|
"body": body,
|
|
"draft": draft,
|
|
"prerelease": prerelease,
|
|
"target_commitish": "main",
|
|
}
|
|
)
|
|
|
|
if response.status_code in (200, 201):
|
|
release_data = response.json()
|
|
logger.info(f"Created release {tag} for {owner}/{repo_name}")
|
|
|
|
return {
|
|
"success": True,
|
|
"release": {
|
|
"tag": tag,
|
|
"name": name,
|
|
"url": release_data.get("html_url", f"{GITEA_URL}/{owner}/{repo_name}/releases/tag/{tag}"),
|
|
"id": release_data.get("id"),
|
|
}
|
|
}
|
|
else:
|
|
error_msg = response.json().get("message", response.text)
|
|
logger.error(f"Gitea API error: {error_msg}")
|
|
return {"success": False, "error": error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating release: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
async def setup_actions(repo: str, workflow_content: str, workflow_name: str = "ci.yml") -> dict:
|
|
"""
|
|
Set up Gitea Actions workflow.
|
|
|
|
Args:
|
|
repo: Repository name
|
|
workflow_content: YAML content for the workflow
|
|
workflow_name: Workflow file name (default "ci.yml")
|
|
|
|
Returns:
|
|
Dictionary with workflow path
|
|
"""
|
|
try:
|
|
workflow_path = f".gitea/workflows/{workflow_name}"
|
|
|
|
result = await push_files(
|
|
repo=repo,
|
|
files={workflow_path: workflow_content},
|
|
message=f"Add Gitea Actions workflow: {workflow_name}"
|
|
)
|
|
|
|
if result.get("success"):
|
|
return {
|
|
"success": True,
|
|
"workflow": {
|
|
"path": workflow_path,
|
|
"name": workflow_name
|
|
}
|
|
}
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error setting up actions: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_workflow_runs(repo: str, status: str = None, branch: str = None, limit: int = 10) -> dict:
|
|
"""
|
|
Get workflow runs (Gitea Actions) for a repository.
|
|
|
|
Args:
|
|
repo: Repository name (username/repo or just repo name)
|
|
status: Filter by status (queued, in_progress, success, failure, cancelled, skipped, timedout)
|
|
branch: Filter by branch name
|
|
limit: Maximum number of runs to return (default 10, max 100)
|
|
|
|
Returns:
|
|
Dictionary with workflow runs list
|
|
"""
|
|
if not GITEA_TOKEN:
|
|
return {"success": False, "error": "Gitea token not configured"}
|
|
|
|
try:
|
|
# Determine owner and repo name
|
|
if "/" in repo:
|
|
owner, repo_name = repo.split("/", 1)
|
|
else:
|
|
owner = await get_gitea_username()
|
|
repo_name = repo
|
|
|
|
if not owner:
|
|
return {"success": False, "error": "Could not determine repository owner"}
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=get_api_base_url(),
|
|
headers=get_auth_headers(),
|
|
timeout=30.0,
|
|
) as client:
|
|
params = {"per_page": min(limit, 100)}
|
|
if status:
|
|
params["status"] = status
|
|
if branch:
|
|
params["branch"] = branch
|
|
|
|
response = await client.get(
|
|
f"/repos/{owner}/{repo_name}/actions/runs",
|
|
params=params
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
runs = data.get("workflow_runs", data) if isinstance(data, dict) else data
|
|
|
|
# Simplify the runs data
|
|
simplified_runs = []
|
|
for run in (runs if isinstance(runs, list) else []):
|
|
simplified_runs.append({
|
|
"id": run.get("id"),
|
|
"name": run.get("display_title") or run.get("name"),
|
|
"status": run.get("status"),
|
|
"conclusion": run.get("conclusion"),
|
|
"branch": run.get("head_branch"),
|
|
"commit_sha": run.get("head_sha", "")[:7],
|
|
"started_at": run.get("run_started_at"),
|
|
"url": f"{GITEA_URL}/{owner}/{repo_name}/actions/runs/{run.get('id')}"
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"repo": f"{owner}/{repo_name}",
|
|
"runs": simplified_runs,
|
|
"total": len(simplified_runs)
|
|
}
|
|
else:
|
|
error_msg = response.json().get("message", response.text)
|
|
return {"success": False, "error": error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow runs: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_latest_workflow_status(repo: str, branch: str = "main") -> dict:
|
|
"""
|
|
Get the status of the latest workflow run for a repository.
|
|
Use this to check if CI/CD passed or failed after uploading code.
|
|
|
|
Args:
|
|
repo: Repository name (username/repo or just repo name)
|
|
branch: Branch to check (default "main")
|
|
|
|
Returns:
|
|
Dictionary with latest run status (passed/failed/pending/none)
|
|
"""
|
|
if not GITEA_TOKEN:
|
|
return {"success": False, "error": "Gitea token not configured"}
|
|
|
|
try:
|
|
# Determine owner and repo name
|
|
if "/" in repo:
|
|
owner, repo_name = repo.split("/", 1)
|
|
else:
|
|
owner = await get_gitea_username()
|
|
repo_name = repo
|
|
|
|
if not owner:
|
|
return {"success": False, "error": "Could not determine repository owner"}
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=get_api_base_url(),
|
|
headers=get_auth_headers(),
|
|
timeout=30.0,
|
|
) as client:
|
|
response = await client.get(
|
|
f"/repos/{owner}/{repo_name}/actions/runs",
|
|
params={"branch": branch, "per_page": 1}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
runs = data.get("workflow_runs", data) if isinstance(data, dict) else data
|
|
|
|
if not runs or (isinstance(runs, list) and len(runs) == 0):
|
|
return {
|
|
"success": True,
|
|
"status": "none",
|
|
"message": "No workflow runs found",
|
|
"repo": f"{owner}/{repo_name}"
|
|
}
|
|
|
|
latest_run = runs[0] if isinstance(runs, list) else runs
|
|
run_status = latest_run.get("status", "unknown")
|
|
conclusion = latest_run.get("conclusion")
|
|
|
|
# Determine overall status
|
|
if run_status in ("queued", "in_progress", "waiting"):
|
|
overall_status = "pending"
|
|
elif conclusion == "success":
|
|
overall_status = "passed"
|
|
elif conclusion in ("failure", "timedout", "action_required"):
|
|
overall_status = "failed"
|
|
elif conclusion in ("cancelled", "skipped"):
|
|
overall_status = "cancelled"
|
|
else:
|
|
overall_status = "unknown"
|
|
|
|
return {
|
|
"success": True,
|
|
"status": overall_status,
|
|
"run_status": run_status,
|
|
"conclusion": conclusion,
|
|
"run_id": latest_run.get("id"),
|
|
"run_name": latest_run.get("display_title") or latest_run.get("name"),
|
|
"branch": latest_run.get("head_branch"),
|
|
"commit_sha": latest_run.get("head_sha", "")[:7],
|
|
"url": f"{GITEA_URL}/{owner}/{repo_name}/actions/runs/{latest_run.get('id')}",
|
|
"repo": f"{owner}/{repo_name}"
|
|
}
|
|
elif response.status_code == 404:
|
|
return {
|
|
"success": True,
|
|
"status": "none",
|
|
"message": "Actions not enabled or no runs found",
|
|
"repo": f"{owner}/{repo_name}"
|
|
}
|
|
else:
|
|
error_msg = response.json().get("message", response.text)
|
|
return {"success": False, "error": error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting latest workflow status: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_workflow_run_jobs(repo: str, run_id: int) -> dict:
|
|
"""
|
|
Get jobs and their status for a specific workflow run.
|
|
Use this to see which specific jobs failed in a CI/CD run.
|
|
|
|
Args:
|
|
repo: Repository name (username/repo or just repo name)
|
|
run_id: Workflow run ID
|
|
|
|
Returns:
|
|
Dictionary with job details including status and log URLs
|
|
"""
|
|
if not GITEA_TOKEN:
|
|
return {"success": False, "error": "Gitea token not configured"}
|
|
|
|
try:
|
|
# Determine owner and repo name
|
|
if "/" in repo:
|
|
owner, repo_name = repo.split("/", 1)
|
|
else:
|
|
owner = await get_gitea_username()
|
|
repo_name = repo
|
|
|
|
if not owner:
|
|
return {"success": False, "error": "Could not determine repository owner"}
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=get_api_base_url(),
|
|
headers=get_auth_headers(),
|
|
timeout=30.0,
|
|
) as client:
|
|
response = await client.get(
|
|
f"/repos/{owner}/{repo_name}/actions/runs/{run_id}/jobs"
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
jobs = data.get("jobs", data) if isinstance(data, dict) else data
|
|
|
|
simplified_jobs = []
|
|
for job in (jobs if isinstance(jobs, list) else []):
|
|
simplified_jobs.append({
|
|
"id": job.get("id"),
|
|
"name": job.get("name"),
|
|
"status": job.get("status"),
|
|
"conclusion": job.get("conclusion"),
|
|
"started_at": job.get("started_at"),
|
|
"completed_at": job.get("completed_at"),
|
|
"steps": [
|
|
{
|
|
"name": step.get("name"),
|
|
"status": step.get("status"),
|
|
"conclusion": step.get("conclusion")
|
|
}
|
|
for step in job.get("steps", [])
|
|
]
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"run_id": run_id,
|
|
"repo": f"{owner}/{repo_name}",
|
|
"jobs": simplified_jobs
|
|
}
|
|
else:
|
|
error_msg = response.json().get("message", response.text)
|
|
return {"success": False, "error": error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting workflow run jobs: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_repo_info(repo: str) -> dict:
|
|
"""
|
|
Get repository information.
|
|
|
|
Args:
|
|
repo: Repository name
|
|
|
|
Returns:
|
|
Dictionary with repository details
|
|
"""
|
|
if not GITEA_TOKEN:
|
|
return {"success": False, "error": "Gitea token not configured"}
|
|
|
|
try:
|
|
# Determine owner and repo name
|
|
if "/" in repo:
|
|
owner, repo_name = repo.split("/", 1)
|
|
else:
|
|
owner = await get_gitea_username()
|
|
repo_name = repo
|
|
|
|
if not owner:
|
|
return {"success": False, "error": "Could not determine repository owner"}
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=get_api_base_url(),
|
|
headers=get_auth_headers(),
|
|
timeout=30.0,
|
|
) as client:
|
|
response = await client.get(f"/repos/{owner}/{repo_name}")
|
|
|
|
if response.status_code == 200:
|
|
repo_data = response.json()
|
|
return {
|
|
"success": True,
|
|
"repo": {
|
|
"name": repo_data.get("name"),
|
|
"full_name": repo_data.get("full_name"),
|
|
"url": repo_data.get("html_url"),
|
|
"description": repo_data.get("description"),
|
|
"stars": repo_data.get("stars_count", 0),
|
|
"forks": repo_data.get("forks_count", 0),
|
|
"default_branch": repo_data.get("default_branch"),
|
|
"language": repo_data.get("language"),
|
|
}
|
|
}
|
|
else:
|
|
error_msg = response.json().get("message", response.text)
|
|
return {"success": False, "error": error_msg}
|
|
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run()
|