Files

620 lines
21 KiB
Python

"""
Gitea MCP Server for 7000%AUTO
Provides Gitea repository management functionality
"""
import base64
import logging
import os
from typing import Dict
import httpx
from mcp.server.fastmcp import FastMCP
logger = logging.getLogger(__name__)
mcp = FastMCP("Gitea Server")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
GITEA_URL = os.getenv("GITEA_URL", "https://7000pct.gitea.bloupla.net")
GITEA_USERNAME = os.getenv("GITEA_USERNAME", "")
def get_api_base_url() -> str:
"""Get the Gitea API base URL"""
return f"{GITEA_URL.rstrip('/')}/api/v1"
def get_auth_headers() -> Dict[str, str]:
"""Get authentication headers for Gitea API"""
return {
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json",
}
async def get_gitea_username() -> str:
"""Get Gitea username from env or fetch from API"""
if GITEA_USERNAME:
return GITEA_USERNAME
if not GITEA_TOKEN:
return ""
try:
async with httpx.AsyncClient(
base_url=get_api_base_url(),
headers=get_auth_headers(),
timeout=30.0,
) as client:
response = await client.get("/user")
if response.status_code == 200:
return response.json().get("login", "")
except Exception as e:
logger.error(f"Failed to get Gitea username: {e}")
return ""
@mcp.tool()
async def create_repo(name: str, description: str, private: bool = False) -> dict:
"""
Create a new Gitea repository.
Args:
name: Repository name (kebab-case recommended)
description: Repository description
private: Whether the repo should be private (default False)
Returns:
Dictionary with repository URL and details
"""
if not GITEA_TOKEN:
return {"success": False, "error": "Gitea token not configured"}
try:
async with httpx.AsyncClient(
base_url=get_api_base_url(),
headers=get_auth_headers(),
timeout=30.0,
) as client:
response = await client.post(
"/user/repos",
json={
"name": name,
"description": description,
"private": private,
"auto_init": True,
"default_branch": "main",
}
)
if response.status_code in (200, 201):
repo_data = response.json()
logger.info(f"Created repository: {repo_data.get('html_url')}")
return {
"success": True,
"repo": {
"name": repo_data.get("name"),
"full_name": repo_data.get("full_name"),
"url": repo_data.get("html_url"),
"clone_url": repo_data.get("clone_url"),
"description": repo_data.get("description"),
}
}
else:
error_msg = response.json().get("message", response.text)
logger.error(f"Gitea API error: {error_msg}")
return {"success": False, "error": error_msg}
except Exception as e:
logger.error(f"Error creating repo: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
async def push_files(repo: str, files: dict, message: str, branch: str = "main") -> dict:
"""
Push multiple files to a Gitea repository.
Args:
repo: Repository name (username/repo or just repo name)
files: Dictionary of {path: content} for files to push
message: Commit message
branch: Target branch (default "main")
Returns:
Dictionary with commit details
"""
if not GITEA_TOKEN:
return {"success": False, "error": "Gitea token not configured"}
try:
# Determine owner and repo name
if "/" in repo:
owner, repo_name = repo.split("/", 1)
else:
owner = await get_gitea_username()
repo_name = repo
if not owner:
return {"success": False, "error": "Could not determine repository owner"}
pushed_files = []
last_commit = None
async with httpx.AsyncClient(
base_url=get_api_base_url(),
headers=get_auth_headers(),
timeout=30.0,
) as client:
for file_path, content in files.items():
# Check if file exists to determine if we need to update or create
check_response = await client.get(f"/repos/{owner}/{repo_name}/contents/{file_path}")
file_data = {
"content": base64.b64encode(content.encode()).decode(),
"message": message,
"branch": branch,
}
if check_response.status_code == 200:
# File exists, need to include SHA for update
existing = check_response.json()
file_data["sha"] = existing.get("sha")
response = await client.put(
f"/repos/{owner}/{repo_name}/contents/{file_path}",
json=file_data
)
else:
# File doesn't exist, create it
response = await client.post(
f"/repos/{owner}/{repo_name}/contents/{file_path}",
json=file_data
)
if response.status_code in (200, 201):
result = response.json()
last_commit = result.get("commit", {})
pushed_files.append(file_path)
else:
error_msg = response.json().get("message", response.text)
logger.error(f"Failed to push {file_path}: {error_msg}")
if pushed_files:
logger.info(f"Pushed {len(pushed_files)} files to {owner}/{repo_name}")
return {
"success": True,
"commit": {
"sha": last_commit.get("sha", "") if last_commit else "",
"message": message,
"url": f"{GITEA_URL}/{owner}/{repo_name}/commit/{last_commit.get('sha', '')}" if last_commit else ""
},
"files_pushed": pushed_files
}
else:
return {"success": False, "error": "No files were pushed"}
except Exception as e:
logger.error(f"Error pushing files: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
async def create_release(
repo: str,
tag: str,
name: str,
body: str,
draft: bool = False,
prerelease: bool = False
) -> dict:
"""
Create a release on Gitea.
Args:
repo: Repository name
tag: Tag name (e.g., "v1.0.0")
name: Release name
body: Release notes/body
draft: Whether this is a draft release
prerelease: Whether this is a prerelease
Returns:
Dictionary with release URL
"""
if not GITEA_TOKEN:
return {"success": False, "error": "Gitea token not configured"}
try:
# Determine owner and repo name
if "/" in repo:
owner, repo_name = repo.split("/", 1)
else:
owner = await get_gitea_username()
repo_name = repo
if not owner:
return {"success": False, "error": "Could not determine repository owner"}
async with httpx.AsyncClient(
base_url=get_api_base_url(),
headers=get_auth_headers(),
timeout=30.0,
) as client:
response = await client.post(
f"/repos/{owner}/{repo_name}/releases",
json={
"tag_name": tag,
"name": name,
"body": body,
"draft": draft,
"prerelease": prerelease,
"target_commitish": "main",
}
)
if response.status_code in (200, 201):
release_data = response.json()
logger.info(f"Created release {tag} for {owner}/{repo_name}")
return {
"success": True,
"release": {
"tag": tag,
"name": name,
"url": release_data.get("html_url", f"{GITEA_URL}/{owner}/{repo_name}/releases/tag/{tag}"),
"id": release_data.get("id"),
}
}
else:
error_msg = response.json().get("message", response.text)
logger.error(f"Gitea API error: {error_msg}")
return {"success": False, "error": error_msg}
except Exception as e:
logger.error(f"Error creating release: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
async def setup_actions(repo: str, workflow_content: str, workflow_name: str = "ci.yml") -> dict:
"""
Set up Gitea Actions workflow.
Args:
repo: Repository name
workflow_content: YAML content for the workflow
workflow_name: Workflow file name (default "ci.yml")
Returns:
Dictionary with workflow path
"""
try:
workflow_path = f".gitea/workflows/{workflow_name}"
result = await push_files(
repo=repo,
files={workflow_path: workflow_content},
message=f"Add Gitea Actions workflow: {workflow_name}"
)
if result.get("success"):
return {
"success": True,
"workflow": {
"path": workflow_path,
"name": workflow_name
}
}
return result
except Exception as e:
logger.error(f"Error setting up actions: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
async def get_workflow_runs(repo: str, status: str = None, branch: str = None, limit: int = 10) -> dict:
"""
Get workflow runs (Gitea Actions) for a repository.
Args:
repo: Repository name (username/repo or just repo name)
status: Filter by status (queued, in_progress, success, failure, cancelled, skipped, timedout)
branch: Filter by branch name
limit: Maximum number of runs to return (default 10, max 100)
Returns:
Dictionary with workflow runs list
"""
if not GITEA_TOKEN:
return {"success": False, "error": "Gitea token not configured"}
try:
# Determine owner and repo name
if "/" in repo:
owner, repo_name = repo.split("/", 1)
else:
owner = await get_gitea_username()
repo_name = repo
if not owner:
return {"success": False, "error": "Could not determine repository owner"}
async with httpx.AsyncClient(
base_url=get_api_base_url(),
headers=get_auth_headers(),
timeout=30.0,
) as client:
params = {"per_page": min(limit, 100)}
if status:
params["status"] = status
if branch:
params["branch"] = branch
response = await client.get(
f"/repos/{owner}/{repo_name}/actions/runs",
params=params
)
if response.status_code == 200:
data = response.json()
runs = data.get("workflow_runs", data) if isinstance(data, dict) else data
# Simplify the runs data
simplified_runs = []
for run in (runs if isinstance(runs, list) else []):
simplified_runs.append({
"id": run.get("id"),
"name": run.get("display_title") or run.get("name"),
"status": run.get("status"),
"conclusion": run.get("conclusion"),
"branch": run.get("head_branch"),
"commit_sha": run.get("head_sha", "")[:7],
"started_at": run.get("run_started_at"),
"url": f"{GITEA_URL}/{owner}/{repo_name}/actions/runs/{run.get('id')}"
})
return {
"success": True,
"repo": f"{owner}/{repo_name}",
"runs": simplified_runs,
"total": len(simplified_runs)
}
else:
error_msg = response.json().get("message", response.text)
return {"success": False, "error": error_msg}
except Exception as e:
logger.error(f"Error getting workflow runs: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
async def get_latest_workflow_status(repo: str, branch: str = "main") -> dict:
"""
Get the status of the latest workflow run for a repository.
Use this to check if CI/CD passed or failed after uploading code.
Args:
repo: Repository name (username/repo or just repo name)
branch: Branch to check (default "main")
Returns:
Dictionary with latest run status (passed/failed/pending/none)
"""
if not GITEA_TOKEN:
return {"success": False, "error": "Gitea token not configured"}
try:
# Determine owner and repo name
if "/" in repo:
owner, repo_name = repo.split("/", 1)
else:
owner = await get_gitea_username()
repo_name = repo
if not owner:
return {"success": False, "error": "Could not determine repository owner"}
async with httpx.AsyncClient(
base_url=get_api_base_url(),
headers=get_auth_headers(),
timeout=30.0,
) as client:
response = await client.get(
f"/repos/{owner}/{repo_name}/actions/runs",
params={"branch": branch, "per_page": 1}
)
if response.status_code == 200:
data = response.json()
runs = data.get("workflow_runs", data) if isinstance(data, dict) else data
if not runs or (isinstance(runs, list) and len(runs) == 0):
return {
"success": True,
"status": "none",
"message": "No workflow runs found",
"repo": f"{owner}/{repo_name}"
}
latest_run = runs[0] if isinstance(runs, list) else runs
run_status = latest_run.get("status", "unknown")
conclusion = latest_run.get("conclusion")
# Determine overall status
if run_status in ("queued", "in_progress", "waiting"):
overall_status = "pending"
elif conclusion == "success":
overall_status = "passed"
elif conclusion in ("failure", "timedout", "action_required"):
overall_status = "failed"
elif conclusion in ("cancelled", "skipped"):
overall_status = "cancelled"
else:
overall_status = "unknown"
return {
"success": True,
"status": overall_status,
"run_status": run_status,
"conclusion": conclusion,
"run_id": latest_run.get("id"),
"run_name": latest_run.get("display_title") or latest_run.get("name"),
"branch": latest_run.get("head_branch"),
"commit_sha": latest_run.get("head_sha", "")[:7],
"url": f"{GITEA_URL}/{owner}/{repo_name}/actions/runs/{latest_run.get('id')}",
"repo": f"{owner}/{repo_name}"
}
elif response.status_code == 404:
return {
"success": True,
"status": "none",
"message": "Actions not enabled or no runs found",
"repo": f"{owner}/{repo_name}"
}
else:
error_msg = response.json().get("message", response.text)
return {"success": False, "error": error_msg}
except Exception as e:
logger.error(f"Error getting latest workflow status: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
async def get_workflow_run_jobs(repo: str, run_id: int) -> dict:
"""
Get jobs and their status for a specific workflow run.
Use this to see which specific jobs failed in a CI/CD run.
Args:
repo: Repository name (username/repo or just repo name)
run_id: Workflow run ID
Returns:
Dictionary with job details including status and log URLs
"""
if not GITEA_TOKEN:
return {"success": False, "error": "Gitea token not configured"}
try:
# Determine owner and repo name
if "/" in repo:
owner, repo_name = repo.split("/", 1)
else:
owner = await get_gitea_username()
repo_name = repo
if not owner:
return {"success": False, "error": "Could not determine repository owner"}
async with httpx.AsyncClient(
base_url=get_api_base_url(),
headers=get_auth_headers(),
timeout=30.0,
) as client:
response = await client.get(
f"/repos/{owner}/{repo_name}/actions/runs/{run_id}/jobs"
)
if response.status_code == 200:
data = response.json()
jobs = data.get("jobs", data) if isinstance(data, dict) else data
simplified_jobs = []
for job in (jobs if isinstance(jobs, list) else []):
simplified_jobs.append({
"id": job.get("id"),
"name": job.get("name"),
"status": job.get("status"),
"conclusion": job.get("conclusion"),
"started_at": job.get("started_at"),
"completed_at": job.get("completed_at"),
"steps": [
{
"name": step.get("name"),
"status": step.get("status"),
"conclusion": step.get("conclusion")
}
for step in job.get("steps", [])
]
})
return {
"success": True,
"run_id": run_id,
"repo": f"{owner}/{repo_name}",
"jobs": simplified_jobs
}
else:
error_msg = response.json().get("message", response.text)
return {"success": False, "error": error_msg}
except Exception as e:
logger.error(f"Error getting workflow run jobs: {e}")
return {"success": False, "error": str(e)}
@mcp.tool()
async def get_repo_info(repo: str) -> dict:
"""
Get repository information.
Args:
repo: Repository name
Returns:
Dictionary with repository details
"""
if not GITEA_TOKEN:
return {"success": False, "error": "Gitea token not configured"}
try:
# Determine owner and repo name
if "/" in repo:
owner, repo_name = repo.split("/", 1)
else:
owner = await get_gitea_username()
repo_name = repo
if not owner:
return {"success": False, "error": "Could not determine repository owner"}
async with httpx.AsyncClient(
base_url=get_api_base_url(),
headers=get_auth_headers(),
timeout=30.0,
) as client:
response = await client.get(f"/repos/{owner}/{repo_name}")
if response.status_code == 200:
repo_data = response.json()
return {
"success": True,
"repo": {
"name": repo_data.get("name"),
"full_name": repo_data.get("full_name"),
"url": repo_data.get("html_url"),
"description": repo_data.get("description"),
"stars": repo_data.get("stars_count", 0),
"forks": repo_data.get("forks_count", 0),
"default_branch": repo_data.get("default_branch"),
"language": repo_data.get("language"),
}
}
else:
error_msg = response.json().get("message", response.text)
return {"success": False, "error": error_msg}
except Exception as e:
return {"success": False, "error": str(e)}
if __name__ == "__main__":
mcp.run()