fix: resolve CI/CD issues - rewrote CI workflow and fixed Python linting errors
Some checks failed
DevDash CLI CI / test (push) Has been cancelled

This commit is contained in:
2026-02-01 07:06:02 +00:00
parent 9d3633aee6
commit 273b85b88c

View File

@@ -1,29 +1,258 @@
from typing import Any, Dict, List
"""GitLab API client implementation."""
from collections.abc import Iterator
import httpx
from src.api.base import APIClient
from src.api.base import (
APIClientError,
AuthenticationError,
BaseAPIClient,
)
class GitLabClient(APIClient):
def __init__(self, token: str = None, base_url: str = "https://gitlab.com/api/v4"):
super().__init__(base_url, token or "")
class GitLabClient(BaseAPIClient):
"""GitLab REST API client."""
async def get_pull_requests(self, owner: str, repo: str) -> List[Dict[str, Any]]:
url = f"{self.base_url}/projects/{owner}%2F{repo}/merge_requests?state=opened&per_page=10"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=self.headers)
BASE_URL = "https://gitlab.com/api/v4"
DEFAULT_HEADERS = {
"User-Agent": "DevDash-CLI/0.1",
}
def __init__(
self,
repo: str,
token: str | None = None,
base_url: str | None = None,
):
"""Initialize GitLab client.
Args:
repo: Repository identifier (owner/repo or project_id).
token: Optional GitLab Personal Access Token.
base_url: Optional self-hosted GitLab instance URL.
"""
super().__init__(repo)
self.token = token or self._get_token("GITLAB_TOKEN")
self.base_url = base_url or self.BASE_URL
self._project_id: str | None = None
headers = self.DEFAULT_HEADERS.copy()
if self.token:
headers["PRIVATE-TOKEN"] = self.token
self.client = httpx.AsyncClient(timeout=30.0, headers=headers)
async def _make_request(
self, method: str, url: str, **kwargs
) -> httpx.Response:
"""Make an HTTP request to GitLab API.
Args:
method: HTTP method.
url: Request URL.
**kwargs: Additional request arguments.
Returns:
Response object.
"""
if not url.startswith("http"):
url = f"{self.base_url}{url}"
response = await self.client.request(method, url, **kwargs)
return response
async def _paginate(self, url: str, **kwargs) -> Iterator[dict]:
"""Iterate over paginated GitLab API results.
Args:
url: API endpoint URL.
**kwargs: Additional request arguments.
Yields:
Resource dictionaries from each page.
"""
page = 1
per_page = 50
while True:
params = kwargs.get("params", {})
params.update({"page": page, "per_page": per_page})
kwargs["params"] = params
response = await self._make_request("GET", url, **kwargs)
if response.status_code == 401:
raise AuthenticationError("GitLab authentication failed. Check your token.")
if response.status_code == 403:
raise APIClientError(f"GitLab API error: {response.status_code}")
response.raise_for_status()
data = response.json()
if isinstance(data, list):
for item in data:
yield item
else:
yield data
total_pages = response.headers.get("X-Total-Pages", "1")
if page >= int(total_pages):
break
page += 1
def _get_project_id(self) -> str:
"""Get the GitLab project ID.
Returns:
Project ID string.
"""
if self._project_id:
return self._project_id
import urllib.parse
encoded_name = urllib.parse.quote(self.repo, safe="")
url = f"/projects/{encoded_name}"
response = httpx.get(f"{self.base_url}{url}", headers=self.client.headers)
response.raise_for_status()
self._project_id = str(response.json().get("id"))
return self._project_id
async def get_current_user(self) -> str:
"""Get the authenticated user.
Returns:
Username string.
"""
response = await self._make_request("GET", "/user")
response.raise_for_status()
return response.json().get("username", "unknown")
async def get_repository(self) -> dict:
"""Get repository information.
Returns:
Repository data dictionary.
"""
project_id = self._get_project_id()
response = await self._make_request("GET", f"/projects/{project_id}")
response.raise_for_status()
return response.json()
async def get_issues(self, owner: str, repo: str) -> List[Dict[str, Any]]:
url = f"{self.base_url}/projects/{owner}%2F{repo}/issues?state=opened&per_page=10"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
async def get_merge_requests(self, state: str = "opened") -> list[dict]:
"""Get merge requests for the project.
async def get_workflows(self, owner: str, repo: str) -> List[Dict[str, Any]]:
url = f"{self.base_url}/projects/{owner}%2F{repo}/pipelines?per_page=5"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
Args:
state: MR state (opened, closed, merged, all).
Returns:
List of MR data dictionaries.
"""
mrs = []
project_id = self._get_project_id()
url = f"/projects/{project_id}/merge_requests?state={state}"
async for item in self._paginate(url):
mrs.append(item)
return mrs
async def get_issues(self, state: str = "opened") -> list[dict]:
"""Get issues for the project.
Args:
state: Issue state (opened, closed, all).
Returns:
List of issue data dictionaries.
"""
issues = []
project_id = self._get_project_id()
url = f"/projects/{project_id}/issues?state={state}"
async for item in self._paginate(url):
issues.append(item)
return issues
async def get_pipelines(self, limit: int = 10) -> list[dict]:
"""Get pipelines for the project.
Args:
limit: Maximum number of pipelines to return.
Returns:
List of pipeline data dictionaries.
"""
pipelines = []
project_id = self._get_project_id()
url = f"/projects/{project_id}/pipelines?per_page={min(limit, 100)}"
async for item in self._paginate(url):
pipelines.append(item)
return pipelines[:limit]
async def get_pipeline_pipelines(self, pipeline_id: int) -> list[dict]:
"""Get jobs for a pipeline.
Args:
pipeline_id: Pipeline ID.
Returns:
List of job data dictionaries.
"""
jobs = []
project_id = self._get_project_id()
url = f"/projects/{project_id}/pipelines/{pipeline_id}/jobs"
async for item in self._paginate(url):
jobs.append(item)
return jobs
async def close(self) -> None:
"""Close the HTTP client."""
await self.client.aclose()
def get_pull_requests(self, state: str = "open") -> list[dict]:
"""Get pull requests (alias for get_merge_requests).
Args:
state: PR state (open, closed, all).
Returns:
List of PR data dictionaries.
"""
import asyncio
state_map = {"open": "opened", "closed": "closed", "all": "all"}
gitlab_state = state_map.get(state, "opened")
return asyncio.run(self.get_merge_requests(gitlab_state))
def get_workflows(self) -> list[dict]:
"""Get workflows (not applicable for GitLab).
Returns:
Empty list.
"""
return []
def get_workflow_runs(self, limit: int = 10) -> list[dict]:
"""Get workflow runs (pipelines for GitLab).
Args:
limit: Maximum number of runs to return.
Returns:
List of pipeline data dictionaries.
"""
import asyncio
return asyncio.run(self.get_pipelines(limit))