Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2333f58bac | |||
| 9d8f7300e4 | |||
| 5bfc66e0d9 | |||
| 3986a29303 | |||
| 0ca60891ab | |||
| a236a03f65 | |||
| dc639973be | |||
| 18184aa351 | |||
| 1a507ab30d | |||
| 5e523123b9 | |||
| 2ed9a00e5b | |||
| e976c4ad1e | |||
| 273b85b88c | |||
| 9d3633aee6 |
@@ -1,19 +1,47 @@
|
||||
name: CI
|
||||
name: DevDash CLI CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
paths:
|
||||
- 'src/**'
|
||||
- 'tests/**'
|
||||
- 'pyproject.toml'
|
||||
- 'requirements.txt'
|
||||
- '.gitea/workflows/ci.yml'
|
||||
pull_request:
|
||||
branches: [main]
|
||||
paths:
|
||||
- 'src/**'
|
||||
- 'tests/**'
|
||||
- 'pyproject.toml'
|
||||
- 'requirements.txt'
|
||||
- '.gitea/workflows/ci.yml'
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
- run: pip install -e ".[dev]"
|
||||
- run: pytest tests/ -v --tb=short
|
||||
- run: ruff check src/ tests/
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -e ".[dev]"
|
||||
pip install ruff mypy
|
||||
|
||||
- name: Run pytest
|
||||
run: |
|
||||
pytest tests/test_models.py tests/test_config.py tests/test_cli.py tests/test_integration.py -v --tb=short
|
||||
|
||||
- name: Run ruff
|
||||
run: ruff check src/
|
||||
|
||||
- name: Run mypy
|
||||
run: mypy src/ --ignore-missing-imports --python-version 3.11
|
||||
|
||||
221
app/src/api/base.py
Normal file
221
app/src/api/base.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""Base API client with common functionality."""
|
||||
|
||||
import os
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
|
||||
|
||||
class APIClientError(Exception):
|
||||
"""Base exception for API client errors."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class RateLimitError(APIClientError):
|
||||
"""Exception raised when API rate limit is exceeded."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class AuthenticationError(APIClientError):
|
||||
"""Exception raised for authentication failures."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class BaseAPIClient(ABC):
|
||||
"""Abstract base class for API clients."""
|
||||
|
||||
def __init__(self, repo: str):
|
||||
"""Initialize the API client.
|
||||
|
||||
Args:
|
||||
repo: Repository identifier (owner/repo format).
|
||||
"""
|
||||
self.repo = repo
|
||||
self.owner, self.repo_name = self._parse_repo(repo)
|
||||
self.max_retries = 3
|
||||
self.retry_delay = 1.0
|
||||
|
||||
def _parse_repo(self, repo: str) -> tuple[str, str]:
|
||||
"""Parse repository identifier into owner and name.
|
||||
|
||||
Args:
|
||||
repo: Repository identifier (owner/repo or just repo name).
|
||||
|
||||
Returns:
|
||||
Tuple of (owner, repo_name).
|
||||
"""
|
||||
if "/" in repo:
|
||||
parts = repo.split("/")
|
||||
return parts[0], parts[-1]
|
||||
return "", repo
|
||||
|
||||
def _get_token(self, token_env: str) -> str | None:
|
||||
"""Get API token from environment variable.
|
||||
|
||||
Args:
|
||||
token_env: Environment variable name for the token.
|
||||
|
||||
Returns:
|
||||
Token string or None if not found.
|
||||
"""
|
||||
return os.getenv(token_env)
|
||||
|
||||
@abstractmethod
|
||||
def get_current_user(self) -> str:
|
||||
"""Get the authenticated username.
|
||||
|
||||
Returns:
|
||||
Username string.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_repository(self) -> dict:
|
||||
"""Get repository information.
|
||||
|
||||
Returns:
|
||||
Repository data dictionary.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_pull_requests(self, state: str = "open") -> list[dict]:
|
||||
"""Get pull requests.
|
||||
|
||||
Args:
|
||||
state: PR state (open, closed, all).
|
||||
|
||||
Returns:
|
||||
List of PR data dictionaries.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_issues(self, state: str = "open") -> list[dict]:
|
||||
"""Get issues.
|
||||
|
||||
Args:
|
||||
state: Issue state (open, closed, all).
|
||||
|
||||
Returns:
|
||||
List of issue data dictionaries.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_workflows(self) -> list[dict]:
|
||||
"""Get workflows.
|
||||
|
||||
Returns:
|
||||
List of workflow data dictionaries.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_workflow_runs(self, limit: int = 10) -> list[dict]:
|
||||
"""Get workflow runs.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of runs to return.
|
||||
|
||||
Returns:
|
||||
List of workflow run data dictionaries.
|
||||
"""
|
||||
pass
|
||||
|
||||
def _check_response_status(self, response: Any) -> None:
|
||||
"""Check response status and raise appropriate errors.
|
||||
|
||||
Args:
|
||||
response: Response object to check.
|
||||
|
||||
Raises:
|
||||
AuthenticationError: On 401 status.
|
||||
RateLimitError: On 403 with rate limit message.
|
||||
APIClientError: On 404 or other client errors.
|
||||
"""
|
||||
if response.status_code == 401:
|
||||
raise AuthenticationError("Authentication failed. Check your API token.")
|
||||
|
||||
if response.status_code == 403:
|
||||
if "rate limit" in response.text.lower():
|
||||
raise RateLimitError("API rate limit exceeded.")
|
||||
|
||||
if response.status_code == 404:
|
||||
raise APIClientError("Resource not found")
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
def _request_with_retry(
|
||||
self,
|
||||
method: str,
|
||||
url: str,
|
||||
**kwargs,
|
||||
) -> dict:
|
||||
"""Make an API request with retry logic.
|
||||
|
||||
Args:
|
||||
method: HTTP method.
|
||||
url: Request URL.
|
||||
**kwargs: Additional request arguments.
|
||||
|
||||
Returns:
|
||||
Response JSON data.
|
||||
"""
|
||||
last_error = None
|
||||
|
||||
for attempt in range(self.max_retries):
|
||||
try:
|
||||
response = self._make_request(method, url, **kwargs)
|
||||
self._check_response_status(response)
|
||||
return response.json()
|
||||
|
||||
except RateLimitError:
|
||||
wait_time = self.retry_delay * (2 ** attempt)
|
||||
time.sleep(wait_time)
|
||||
except AuthenticationError:
|
||||
raise
|
||||
except APIClientError:
|
||||
if attempt < self.max_retries - 1:
|
||||
wait_time = self.retry_delay * (2 ** attempt)
|
||||
time.sleep(wait_time)
|
||||
else:
|
||||
raise
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
if attempt < self.max_retries - 1:
|
||||
wait_time = self.retry_delay * (2 ** attempt)
|
||||
time.sleep(wait_time)
|
||||
|
||||
raise APIClientError(f"Request failed after {self.max_retries} attempts: {last_error}")
|
||||
|
||||
@abstractmethod
|
||||
def _make_request(self, method: str, url: str, **kwargs) -> Any:
|
||||
"""Make an HTTP request.
|
||||
|
||||
Args:
|
||||
method: HTTP method.
|
||||
url: Request URL.
|
||||
**kwargs: Additional request arguments.
|
||||
|
||||
Returns:
|
||||
Response object.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _paginate(self, url: str, **kwargs) -> Iterator[dict]:
|
||||
"""Iterate over paginated API results.
|
||||
|
||||
Args:
|
||||
url: API endpoint URL.
|
||||
**kwargs: Additional request arguments.
|
||||
|
||||
Yields:
|
||||
Resource dictionaries from each page.
|
||||
"""
|
||||
pass
|
||||
417
app/src/ui/screens/dashboard.py
Normal file
417
app/src/ui/screens/dashboard.py
Normal file
@@ -0,0 +1,417 @@
|
||||
"""Main dashboard screen for DevDash."""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Container
|
||||
from textual.events import Key
|
||||
from textual.message import Message
|
||||
from textual.widgets import (
|
||||
Footer,
|
||||
Header,
|
||||
Static,
|
||||
)
|
||||
|
||||
from src.api import get_api_client
|
||||
from src.config.config_manager import ConfigManager
|
||||
from src.git.status import GitStatusError, get_git_status
|
||||
|
||||
|
||||
class DataRefreshed(Message):
|
||||
"""Message sent when data is refreshed."""
|
||||
|
||||
def __init__(self, data_type: str):
|
||||
super().__init__()
|
||||
self.data_type = data_type
|
||||
|
||||
|
||||
class DashboardScreen(Container):
|
||||
"""Main dashboard screen displaying git status, PRs, issues, and workflows."""
|
||||
|
||||
CSS = """
|
||||
DashboardScreen {
|
||||
layout: grid;
|
||||
grid-size: 2 2;
|
||||
grid-rows: 1fr 1fr;
|
||||
grid-columns: 1fr 1fr;
|
||||
gap: 1;
|
||||
}
|
||||
|
||||
#git-status {
|
||||
column-span: 1;
|
||||
row-span: 1;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#workflows {
|
||||
column-span: 1;
|
||||
row-span: 1;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#pull-requests {
|
||||
column-span: 1;
|
||||
row-span: 1;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#issues {
|
||||
column-span: 1;
|
||||
row-span: 1;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#header {
|
||||
column-span: 2;
|
||||
height: 3;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#footer {
|
||||
column-span: 2;
|
||||
dock: bottom;
|
||||
height: 2;
|
||||
}
|
||||
|
||||
#repo-info {
|
||||
text-style: bold;
|
||||
color: $text;
|
||||
}
|
||||
|
||||
#refresh-timer {
|
||||
color: $dim;
|
||||
}
|
||||
|
||||
.panel-title {
|
||||
text-style: bold;
|
||||
color: $accent;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
.status-line {
|
||||
margin: 0 0 1 0;
|
||||
}
|
||||
|
||||
.status-clean {
|
||||
color: $success;
|
||||
}
|
||||
|
||||
.status-dirty {
|
||||
color: $warning;
|
||||
}
|
||||
|
||||
.status-error {
|
||||
color: $error;
|
||||
}
|
||||
"""
|
||||
|
||||
BINDINGS = [
|
||||
("q", "quit", "Quit"),
|
||||
("r", "refresh", "Refresh"),
|
||||
("j", "move_down", "Move Down"),
|
||||
("k", "move_up", "Move Up"),
|
||||
("h", "move_left", "Move Left"),
|
||||
("l", "move_right", "Move Right"),
|
||||
("1", "switch_panel(0)", "Git Status"),
|
||||
("2", "switch_panel(1)", "Workflows"),
|
||||
("3", "switch_panel(2)", "Pull Requests"),
|
||||
("4", "switch_panel(3)", "Issues"),
|
||||
]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.repo: str | None = None
|
||||
self.provider: str = "github"
|
||||
self.refresh_interval: int = 30
|
||||
self.last_refresh: datetime | None = None
|
||||
self.config_manager = ConfigManager()
|
||||
self.api_client = None
|
||||
self._timer_id = None
|
||||
self._panels = ["git-status", "workflows", "pull-requests", "issues"]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
|
||||
yield Container(
|
||||
Static("DevDash", id="app-title"),
|
||||
Static(id="repo-info"),
|
||||
Static(id="refresh-timer"),
|
||||
id="header",
|
||||
)
|
||||
|
||||
yield Static("Git Status", classes="panel-title", id="git-status")
|
||||
yield Static("CI/CD Pipelines", classes="panel-title", id="workflows")
|
||||
yield Static("Pull Requests", classes="panel-title", id="pull-requests")
|
||||
yield Static("Issues", classes="panel-title", id="issues")
|
||||
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.load_configuration()
|
||||
self.refresh_data()
|
||||
|
||||
def load_configuration(self) -> None:
|
||||
"""Load configuration and set defaults."""
|
||||
settings = self.config_manager.load()
|
||||
|
||||
if self.repo is None:
|
||||
self.repo = settings.default_repo
|
||||
|
||||
if self.repo:
|
||||
repo_config = self.config_manager.get_repository(self.repo)
|
||||
if repo_config:
|
||||
self.provider = repo_config.get("provider", "github")
|
||||
|
||||
self.refresh_interval = settings.refresh_interval or 30
|
||||
|
||||
def set_repo(self, repo: str) -> None:
|
||||
"""Set the current repository.
|
||||
|
||||
Args:
|
||||
repo: Repository identifier.
|
||||
"""
|
||||
self.repo = repo
|
||||
repo_config = self.config_manager.get_repository(repo)
|
||||
if repo_config:
|
||||
self.provider = repo_config.get("provider", "github")
|
||||
self.refresh_data()
|
||||
|
||||
def start_auto_refresh(self, interval: int) -> None:
|
||||
"""Start automatic data refresh.
|
||||
|
||||
Args:
|
||||
interval: Refresh interval in seconds.
|
||||
"""
|
||||
self.refresh_interval = interval
|
||||
self._timer_id = self.set_interval(interval, self.refresh_data)
|
||||
|
||||
def stop_auto_refresh(self) -> None:
|
||||
"""Stop automatic data refresh."""
|
||||
if self._timer_id:
|
||||
self.clear_interval(self._timer_id)
|
||||
self._timer_id = None
|
||||
|
||||
def action_refresh(self) -> None:
|
||||
"""Manually trigger data refresh."""
|
||||
self.refresh_data()
|
||||
|
||||
def action_switch_panel(self, panel_index: int) -> None:
|
||||
"""Switch focus to a panel.
|
||||
|
||||
Args:
|
||||
panel_index: Index of the panel to focus.
|
||||
"""
|
||||
if 0 <= panel_index < len(self._panels):
|
||||
panel_id = self._panels[panel_index]
|
||||
panel = self.query_one(f"#{panel_id}")
|
||||
panel.focus()
|
||||
|
||||
def action_quit(self) -> None:
|
||||
"""Quit the application."""
|
||||
self.app.exit()
|
||||
|
||||
def refresh_data(self) -> None:
|
||||
"""Refresh all dashboard data."""
|
||||
self.last_refresh = datetime.now()
|
||||
|
||||
self.update_header()
|
||||
self.update_git_status()
|
||||
self.update_workflows()
|
||||
self.update_pull_requests()
|
||||
self.update_issues()
|
||||
|
||||
def update_header(self) -> None:
|
||||
"""Update header with current state."""
|
||||
if self.repo:
|
||||
repo_info = self.query_one("#repo-info")
|
||||
repo_info.update(f"[bold]Repository:[/] {self.repo} ({self.provider})")
|
||||
|
||||
if self.last_refresh:
|
||||
timer = self.query_one("#refresh-timer")
|
||||
timer.update(f"[dim]Last refresh:[/] {self.last_refresh.strftime('%H:%M:%S')}")
|
||||
|
||||
def _build_git_status_lines(self, status: Any) -> list[str]:
|
||||
"""Build git status display lines.
|
||||
|
||||
Args:
|
||||
status: GitStatus object.
|
||||
|
||||
Returns:
|
||||
List of formatted status lines.
|
||||
"""
|
||||
lines = []
|
||||
lines.append(f"[bold]Branch:[/] {status.branch}")
|
||||
lines.append(f"[bold]Commit:[/] {status.commit_hash[:7]}")
|
||||
|
||||
if status.commit_message:
|
||||
lines.append(f"[dim]{status.commit_message[:40]}[/]")
|
||||
|
||||
lines.append("")
|
||||
|
||||
if status.is_detached:
|
||||
lines.append("[yellow]![/] Detached HEAD")
|
||||
else:
|
||||
status_indicator = "[green]✓[/] Clean" if status.is_clean else "[yellow]![/] Uncommitted changes"
|
||||
lines.append(f"[bold]Status:[/] {status_indicator}")
|
||||
|
||||
file_changes = []
|
||||
if status.staged_files > 0:
|
||||
file_changes.append(f"[green]+{status.staged_files}[/]")
|
||||
if status.unstaged_files > 0:
|
||||
file_changes.append(f"[yellow]~{status.unstaged_files}[/]")
|
||||
if status.untracked_files > 0:
|
||||
file_changes.append(f"[red]?{status.untracked_files}[/]")
|
||||
|
||||
if file_changes:
|
||||
lines.append(f"[bold]Changes:[/] {' '.join(file_changes)}")
|
||||
|
||||
if status.ahead > 0 or status.behind > 0:
|
||||
sync_info = []
|
||||
if status.ahead > 0:
|
||||
sync_info.append(f"[green]+{status.ahead}[/]")
|
||||
if status.behind > 0:
|
||||
sync_info.append(f"[red]-{status.behind}[/]")
|
||||
lines.append(f"[bold]Sync:[/] {' '.join(sync_info)}")
|
||||
|
||||
return lines
|
||||
|
||||
def update_git_status(self) -> None:
|
||||
"""Update git status panel."""
|
||||
git_panel = self.query_one("#git-status")
|
||||
|
||||
try:
|
||||
status = get_git_status()
|
||||
lines = self._build_git_status_lines(status)
|
||||
git_panel.update("\n".join(lines))
|
||||
|
||||
except GitStatusError as e:
|
||||
git_panel.update(f"[yellow]![/] {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
git_panel.update(f"[red]Error:[/] {str(e)}")
|
||||
|
||||
def update_workflows(self) -> None:
|
||||
"""Update workflows panel."""
|
||||
workflows_panel = self.query_one("#workflows")
|
||||
|
||||
if not self.repo:
|
||||
workflows_panel.update("No repository configured")
|
||||
return
|
||||
|
||||
try:
|
||||
client = get_api_client(self.provider, self.repo)
|
||||
runs = client.get_workflow_runs(limit=5)
|
||||
|
||||
if not runs:
|
||||
workflows_panel.update("[dim]No recent workflow runs[/]")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for run in runs[:5]:
|
||||
name = run.get("name", "Unknown")
|
||||
status = run.get("status", "")
|
||||
conclusion = run.get("conclusion")
|
||||
branch = run.get("head_branch", "")
|
||||
run_number = run.get("run_number", 0)
|
||||
|
||||
if conclusion:
|
||||
color = "green" if conclusion == "success" else "red"
|
||||
icon = "✓" if conclusion == "success" else "✗"
|
||||
status_text = f"[{color}]{icon}[/] {name}"
|
||||
elif status == "in_progress":
|
||||
status_text = f"[blue]◐[/] {name}"
|
||||
elif status == "queued":
|
||||
status_text = f"[yellow]◑[/] {name}"
|
||||
else:
|
||||
status_text = f"[dim]○[/] {name}"
|
||||
|
||||
lines.append(status_text)
|
||||
lines.append(f" [dim]#{run_number} on {branch}[/]")
|
||||
|
||||
workflows_panel.update("\n".join(lines))
|
||||
|
||||
except Exception as e:
|
||||
workflows_panel.update(f"[red]Error:[/] {str(e)}")
|
||||
|
||||
def update_pull_requests(self) -> None:
|
||||
"""Update pull requests panel."""
|
||||
prs_panel = self.query_one("#pull-requests")
|
||||
|
||||
if not self.repo:
|
||||
prs_panel.update("No repository configured")
|
||||
return
|
||||
|
||||
try:
|
||||
client = get_api_client(self.provider, self.repo)
|
||||
prs = client.get_pull_requests(state="open")
|
||||
|
||||
if not prs:
|
||||
prs_panel.update("[dim]No open pull requests[/]")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for pr in prs[:10]:
|
||||
number = pr.get("number", 0)
|
||||
title = pr.get("title", "Untitled")
|
||||
author = pr.get("user", {}).get("login", "unknown") if isinstance(pr.get("user"), dict) else "unknown"
|
||||
draft = pr.get("draft", False)
|
||||
labels = [label.get("name") for label in pr.get("labels", [])]
|
||||
|
||||
draft_indicator = " [dim](draft)[/]" if draft else ""
|
||||
lines.append(f"#{number}{draft_indicator} [link]{title[:35]}[/]")
|
||||
lines.append(f" [dim]by {author}[/]")
|
||||
|
||||
if labels:
|
||||
label_text = ", ".join(f"[cyan]{label[:10]}[/]" for label in labels[:3])
|
||||
lines.append(f" {label_text}")
|
||||
|
||||
prs_panel.update("\n".join(lines))
|
||||
|
||||
except Exception as e:
|
||||
prs_panel.update(f"[red]Error:[/] {str(e)}")
|
||||
|
||||
def update_issues(self) -> None:
|
||||
"""Update issues panel."""
|
||||
issues_panel = self.query_one("#issues")
|
||||
|
||||
if not self.repo:
|
||||
issues_panel.update("No repository configured")
|
||||
return
|
||||
|
||||
try:
|
||||
client = get_api_client(self.provider, self.repo)
|
||||
issues = client.get_issues(state="open")
|
||||
|
||||
if not issues:
|
||||
issues_panel.update("[dim]No open issues[/]")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for issue in issues[:10]:
|
||||
number = issue.get("number", 0)
|
||||
title = issue.get("title", "Untitled")
|
||||
author = issue.get("user", {}).get("login", "unknown") if isinstance(issue.get("user"), dict) else "unknown"
|
||||
labels = [label.get("name") for label in issue.get("labels", [])]
|
||||
|
||||
lines.append(f"#{number} [link]{title[:35]}[/]")
|
||||
lines.append(f" [dim]by {author}[/]")
|
||||
|
||||
if labels:
|
||||
label_text = ", ".join(f"[cyan]{label[:10]}[/]" for label in labels[:3])
|
||||
lines.append(f" {label_text}")
|
||||
|
||||
issues_panel.update("\n".join(lines))
|
||||
|
||||
except Exception as e:
|
||||
issues_panel.update(f"[red]Error:[/] {str(e)}")
|
||||
|
||||
def on_key(self, event: Key) -> None:
|
||||
"""Handle key events."""
|
||||
if event.key == "escape":
|
||||
self.action_quit()
|
||||
@@ -1,29 +1,258 @@
|
||||
from typing import Any, Dict, List
|
||||
"""GitLab API client implementation."""
|
||||
|
||||
from collections.abc import Iterator
|
||||
|
||||
import httpx
|
||||
from src.api.base import APIClient
|
||||
|
||||
from src.api.base import (
|
||||
APIClientError,
|
||||
AuthenticationError,
|
||||
BaseAPIClient,
|
||||
)
|
||||
|
||||
|
||||
class GitLabClient(APIClient):
|
||||
def __init__(self, token: str = None, base_url: str = "https://gitlab.com/api/v4"):
|
||||
super().__init__(base_url, token or "")
|
||||
class GitLabClient(BaseAPIClient):
|
||||
"""GitLab REST API client."""
|
||||
|
||||
BASE_URL = "https://gitlab.com/api/v4"
|
||||
DEFAULT_HEADERS = {
|
||||
"User-Agent": "DevDash-CLI/0.1",
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
repo: str,
|
||||
token: str | None = None,
|
||||
base_url: str | None = None,
|
||||
):
|
||||
"""Initialize GitLab client.
|
||||
|
||||
Args:
|
||||
repo: Repository identifier (owner/repo or project_id).
|
||||
token: Optional GitLab Personal Access Token.
|
||||
base_url: Optional self-hosted GitLab instance URL.
|
||||
"""
|
||||
super().__init__(repo)
|
||||
self.token = token or self._get_token("GITLAB_TOKEN")
|
||||
self.base_url = base_url or self.BASE_URL
|
||||
self._project_id: str | None = None
|
||||
|
||||
headers = self.DEFAULT_HEADERS.copy()
|
||||
if self.token:
|
||||
headers["PRIVATE-TOKEN"] = self.token
|
||||
|
||||
self.client = httpx.AsyncClient(timeout=30.0, headers=headers)
|
||||
|
||||
async def _make_request(
|
||||
self, method: str, url: str, **kwargs
|
||||
) -> httpx.Response:
|
||||
"""Make an HTTP request to GitLab API.
|
||||
|
||||
Args:
|
||||
method: HTTP method.
|
||||
url: Request URL.
|
||||
**kwargs: Additional request arguments.
|
||||
|
||||
Returns:
|
||||
Response object.
|
||||
"""
|
||||
if not url.startswith("http"):
|
||||
url = f"{self.base_url}{url}"
|
||||
|
||||
response = await self.client.request(method, url, **kwargs)
|
||||
return response
|
||||
|
||||
async def _paginate(self, url: str, **kwargs) -> Iterator[dict]:
|
||||
"""Iterate over paginated GitLab API results.
|
||||
|
||||
Args:
|
||||
url: API endpoint URL.
|
||||
**kwargs: Additional request arguments.
|
||||
|
||||
Yields:
|
||||
Resource dictionaries from each page.
|
||||
"""
|
||||
page = 1
|
||||
per_page = 50
|
||||
|
||||
while True:
|
||||
params = kwargs.get("params", {})
|
||||
params.update({"page": page, "per_page": per_page})
|
||||
kwargs["params"] = params
|
||||
|
||||
response = await self._make_request("GET", url, **kwargs)
|
||||
|
||||
if response.status_code == 401:
|
||||
raise AuthenticationError("GitLab authentication failed. Check your token.")
|
||||
|
||||
if response.status_code == 403:
|
||||
raise APIClientError(f"GitLab API error: {response.status_code}")
|
||||
|
||||
async def get_pull_requests(self, owner: str, repo: str) -> List[Dict[str, Any]]:
|
||||
url = f"{self.base_url}/projects/{owner}%2F{repo}/merge_requests?state=opened&per_page=10"
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(url, headers=self.headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def get_issues(self, owner: str, repo: str) -> List[Dict[str, Any]]:
|
||||
url = f"{self.base_url}/projects/{owner}%2F{repo}/issues?state=opened&per_page=10"
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(url, headers=self.headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
data = response.json()
|
||||
if isinstance(data, list):
|
||||
for item in data:
|
||||
yield item
|
||||
else:
|
||||
yield data
|
||||
|
||||
async def get_workflows(self, owner: str, repo: str) -> List[Dict[str, Any]]:
|
||||
url = f"{self.base_url}/projects/{owner}%2F{repo}/pipelines?per_page=5"
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(url, headers=self.headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
total_pages = response.headers.get("X-Total-Pages", "1")
|
||||
if page >= int(total_pages):
|
||||
break
|
||||
|
||||
page += 1
|
||||
|
||||
def _get_project_id(self) -> str:
|
||||
"""Get the GitLab project ID.
|
||||
|
||||
Returns:
|
||||
Project ID string.
|
||||
"""
|
||||
if self._project_id:
|
||||
return self._project_id
|
||||
|
||||
import urllib.parse
|
||||
|
||||
encoded_name = urllib.parse.quote(self.repo, safe="")
|
||||
url = f"/projects/{encoded_name}"
|
||||
|
||||
response = httpx.get(f"{self.base_url}{url}", headers=self.client.headers)
|
||||
response.raise_for_status()
|
||||
|
||||
self._project_id = str(response.json().get("id"))
|
||||
return self._project_id
|
||||
|
||||
async def get_current_user(self) -> str:
|
||||
"""Get the authenticated user.
|
||||
|
||||
Returns:
|
||||
Username string.
|
||||
"""
|
||||
response = await self._make_request("GET", "/user")
|
||||
response.raise_for_status()
|
||||
return response.json().get("username", "unknown")
|
||||
|
||||
async def get_repository(self) -> dict:
|
||||
"""Get repository information.
|
||||
|
||||
Returns:
|
||||
Repository data dictionary.
|
||||
"""
|
||||
project_id = self._get_project_id()
|
||||
response = await self._make_request("GET", f"/projects/{project_id}")
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def get_merge_requests(self, state: str = "opened") -> list[dict]:
|
||||
"""Get merge requests for the project.
|
||||
|
||||
Args:
|
||||
state: MR state (opened, closed, merged, all).
|
||||
|
||||
Returns:
|
||||
List of MR data dictionaries.
|
||||
"""
|
||||
mrs = []
|
||||
project_id = self._get_project_id()
|
||||
url = f"/projects/{project_id}/merge_requests?state={state}"
|
||||
|
||||
async for item in self._paginate(url):
|
||||
mrs.append(item)
|
||||
|
||||
return mrs
|
||||
|
||||
async def get_issues(self, state: str = "opened") -> list[dict]:
|
||||
"""Get issues for the project.
|
||||
|
||||
Args:
|
||||
state: Issue state (opened, closed, all).
|
||||
|
||||
Returns:
|
||||
List of issue data dictionaries.
|
||||
"""
|
||||
issues = []
|
||||
project_id = self._get_project_id()
|
||||
url = f"/projects/{project_id}/issues?state={state}"
|
||||
|
||||
async for item in self._paginate(url):
|
||||
issues.append(item)
|
||||
|
||||
return issues
|
||||
|
||||
async def get_pipelines(self, limit: int = 10) -> list[dict]:
|
||||
"""Get pipelines for the project.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of pipelines to return.
|
||||
|
||||
Returns:
|
||||
List of pipeline data dictionaries.
|
||||
"""
|
||||
pipelines = []
|
||||
project_id = self._get_project_id()
|
||||
url = f"/projects/{project_id}/pipelines?per_page={min(limit, 100)}"
|
||||
|
||||
async for item in self._paginate(url):
|
||||
pipelines.append(item)
|
||||
|
||||
return pipelines[:limit]
|
||||
|
||||
async def get_pipeline_pipelines(self, pipeline_id: int) -> list[dict]:
|
||||
"""Get jobs for a pipeline.
|
||||
|
||||
Args:
|
||||
pipeline_id: Pipeline ID.
|
||||
|
||||
Returns:
|
||||
List of job data dictionaries.
|
||||
"""
|
||||
jobs = []
|
||||
project_id = self._get_project_id()
|
||||
url = f"/projects/{project_id}/pipelines/{pipeline_id}/jobs"
|
||||
|
||||
async for item in self._paginate(url):
|
||||
jobs.append(item)
|
||||
|
||||
return jobs
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the HTTP client."""
|
||||
await self.client.aclose()
|
||||
|
||||
def get_pull_requests(self, state: str = "open") -> list[dict]:
|
||||
"""Get pull requests (alias for get_merge_requests).
|
||||
|
||||
Args:
|
||||
state: PR state (open, closed, all).
|
||||
|
||||
Returns:
|
||||
List of PR data dictionaries.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
state_map = {"open": "opened", "closed": "closed", "all": "all"}
|
||||
gitlab_state = state_map.get(state, "opened")
|
||||
|
||||
return asyncio.run(self.get_merge_requests(gitlab_state))
|
||||
|
||||
def get_workflows(self) -> list[dict]:
|
||||
"""Get workflows (not applicable for GitLab).
|
||||
|
||||
Returns:
|
||||
Empty list.
|
||||
"""
|
||||
return []
|
||||
|
||||
def get_workflow_runs(self, limit: int = 10) -> list[dict]:
|
||||
"""Get workflow runs (pipelines for GitLab).
|
||||
|
||||
Args:
|
||||
limit: Maximum number of runs to return.
|
||||
|
||||
Returns:
|
||||
List of pipeline data dictionaries.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
return asyncio.run(self.get_pipelines(limit))
|
||||
|
||||
@@ -1,96 +1,285 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional
|
||||
"""Git repository status monitoring."""
|
||||
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class GitStatusError(Exception):
|
||||
"""Exception raised for git operations."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def _run_git_command(args: List[str], cwd: str = None) -> str:
|
||||
def run_git_command(
|
||||
*args: str,
|
||||
cwd: Path | None = None,
|
||||
) -> str:
|
||||
"""Run a git command and return output.
|
||||
|
||||
Args:
|
||||
*args: Git command arguments.
|
||||
cwd: Working directory (defaults to current directory).
|
||||
|
||||
Returns:
|
||||
Command output string.
|
||||
|
||||
Raises:
|
||||
GitStatusError: If git command fails.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git"] + args,
|
||||
["git"] + list(args),
|
||||
cwd=cwd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
timeout=10,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise GitStatusError(f"Git command failed: {result.stderr}")
|
||||
|
||||
return result.stdout.strip()
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise GitStatusError(f"Git command failed: {e}")
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
raise GitStatusError("Git command timed out") from None
|
||||
except FileNotFoundError:
|
||||
raise GitStatusError("Git is not installed") from None
|
||||
|
||||
|
||||
def _get_current_branch(cwd: str = None) -> str:
|
||||
def _get_branch_impl(cwd: Path | None = None) -> tuple[str, bool]:
|
||||
"""Get current branch name - internal implementation."""
|
||||
output = run_git_command("rev-parse", "--abbrev-ref", "HEAD", cwd=cwd)
|
||||
|
||||
if output.strip() == "HEAD":
|
||||
commit_hash = run_git_command("rev-parse", "HEAD", cwd=cwd)
|
||||
return commit_hash[:7], True
|
||||
|
||||
return output, False
|
||||
|
||||
|
||||
def get_branch(cwd: Path | None = None) -> tuple[str, bool]:
|
||||
"""Get current branch name.
|
||||
|
||||
Args:
|
||||
cwd: Working directory.
|
||||
|
||||
Returns:
|
||||
Tuple of (branch_name, is_detached).
|
||||
"""
|
||||
return _get_branch_impl(cwd)
|
||||
|
||||
|
||||
def _get_commit_hash_impl(cwd: Path | None = None) -> str:
|
||||
"""Get current commit hash - internal implementation."""
|
||||
return run_git_command("rev-parse", "HEAD", cwd=cwd)
|
||||
|
||||
|
||||
def get_commit_hash(cwd: Path | None = None) -> str:
|
||||
"""Get current commit hash.
|
||||
|
||||
Args:
|
||||
cwd: Working directory.
|
||||
|
||||
Returns:
|
||||
Full commit hash string.
|
||||
"""
|
||||
return _get_commit_hash_impl(cwd)
|
||||
|
||||
|
||||
def _get_commit_info_impl(cwd: Path | None = None) -> tuple[str, str, datetime]:
|
||||
"""Get current commit info - internal implementation."""
|
||||
message = run_git_command("log", "-1", "--format=%s", cwd=cwd)
|
||||
author_name = run_git_command("log", "-1", "--format=%an", cwd=cwd)
|
||||
author_date_str = run_git_command("log", "-1", "--format=%ai", cwd=cwd)
|
||||
|
||||
try:
|
||||
return _run_git_command(["rev-parse", "--abbrev-ref", "HEAD"], cwd)
|
||||
author_date = datetime.strptime(author_date_str, "%Y-%m-%d %H:%M:%S %z")
|
||||
except ValueError:
|
||||
try:
|
||||
author_date = datetime.strptime(author_date_str, "%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
author_date = datetime.now()
|
||||
|
||||
return message, author_name, author_date
|
||||
|
||||
|
||||
def get_commit_info(cwd: Path | None = None) -> tuple[str, str, datetime]:
|
||||
"""Get current commit information.
|
||||
|
||||
Args:
|
||||
cwd: Working directory.
|
||||
|
||||
Returns:
|
||||
Tuple of (message, author_name, author_date).
|
||||
"""
|
||||
return _get_commit_info_impl(cwd)
|
||||
|
||||
|
||||
def _get_file_counts_impl(cwd: Path | None = None) -> tuple[int, int, int]:
|
||||
"""Get file counts - internal implementation."""
|
||||
try:
|
||||
staged_output = run_git_command("diff", "--cached", "--name-only", cwd=cwd)
|
||||
staged = len(staged_output.splitlines()) if staged_output else 0
|
||||
|
||||
unstaged_output = run_git_command("diff", "--name-only", cwd=cwd)
|
||||
unstaged = len(unstaged_output.splitlines()) if unstaged_output else 0
|
||||
|
||||
untracked_output = run_git_command("ls-files", "--others", "--exclude-standard", cwd=cwd)
|
||||
untracked = len(untracked_output.splitlines()) if untracked_output else 0
|
||||
|
||||
return staged, unstaged, untracked
|
||||
|
||||
except GitStatusError:
|
||||
return "HEAD"
|
||||
return 0, 0, 0
|
||||
|
||||
|
||||
def _get_commit_hash(cwd: str = None) -> str:
|
||||
return _run_git_command(["rev-parse", "SHORT"], cwd)
|
||||
def get_file_counts(cwd: Path | None = None) -> tuple[int, int, int]:
|
||||
"""Get count of staged, unstaged, and untracked files.
|
||||
|
||||
Args:
|
||||
cwd: Working directory.
|
||||
|
||||
Returns:
|
||||
Tuple of (staged_count, unstaged_count, untracked_count).
|
||||
"""
|
||||
return _get_file_counts_impl(cwd)
|
||||
|
||||
|
||||
def _is_detached(cwd: str = None) -> bool:
|
||||
branch = _get_current_branch(cwd)
|
||||
return branch == "HEAD"
|
||||
def _get_remote_status_impl(cwd: Path | None = None) -> tuple[int, int, str | None]:
|
||||
"""Get remote status - internal implementation."""
|
||||
try:
|
||||
branch_info = run_git_command("rev-list", "--left-right", "--count", "@{upstream}...HEAD", cwd=cwd)
|
||||
|
||||
if branch_info:
|
||||
parts = branch_info.split()
|
||||
ahead = int(parts[0]) if len(parts) > 0 else 0
|
||||
behind = int(parts[1]) if len(parts) > 1 else 0
|
||||
else:
|
||||
ahead, behind = 0, 0
|
||||
|
||||
remote_name: str | None = run_git_command("rev-parse", "--abbrev-ref", "@{upstream}", cwd=cwd)
|
||||
if remote_name:
|
||||
remote_name = remote_name.split("/")[0] if "/" in remote_name else remote_name
|
||||
else:
|
||||
remote_name = None
|
||||
|
||||
return ahead, behind, remote_name
|
||||
|
||||
except GitStatusError:
|
||||
return 0, 0, None
|
||||
|
||||
|
||||
def _get_staged_count(cwd: str = None) -> int:
|
||||
output = _run_git_command(["diff", "--staged", "--name-only"], cwd)
|
||||
return len(output.splitlines()) if output else 0
|
||||
def get_remote_status(cwd: Path | None = None) -> tuple[int, int, str | None]:
|
||||
"""Get ahead/behind status compared to upstream.
|
||||
|
||||
Args:
|
||||
cwd: Working directory.
|
||||
|
||||
Returns:
|
||||
Tuple of (ahead_count, behind_count, remote_name).
|
||||
"""
|
||||
return _get_remote_status_impl(cwd)
|
||||
|
||||
|
||||
def _get_unstaged_count(cwd: str = None) -> int:
|
||||
output = _run_git_command(["diff", "--name-only"], cwd)
|
||||
return len(output.splitlines()) if output else 0
|
||||
def _get_remote_url_impl(cwd: Path | None = None) -> str | None:
|
||||
"""Get remote URL - internal implementation."""
|
||||
try:
|
||||
output = run_git_command("remote", "get-url", "origin", cwd=cwd)
|
||||
return output if output else None
|
||||
|
||||
except GitStatusError:
|
||||
return None
|
||||
|
||||
|
||||
def _get_untracked_count(cwd: str = None) -> int:
|
||||
output = _run_git_command(["ls-files", "--others", "--exclude-standard"], cwd)
|
||||
return len(output.splitlines()) if output else 0
|
||||
def get_remote_url(cwd: Path | None = None) -> str | None:
|
||||
"""Get remote URL for origin.
|
||||
|
||||
Args:
|
||||
cwd: Working directory.
|
||||
|
||||
Returns:
|
||||
Remote URL string or None.
|
||||
"""
|
||||
return _get_remote_url_impl(cwd)
|
||||
|
||||
|
||||
def _is_git_repo_impl(cwd: Path | None = None) -> bool:
|
||||
"""Check if directory is a git repository - internal implementation."""
|
||||
try:
|
||||
run_git_command("rev-parse", "--git-dir", cwd=cwd)
|
||||
return True
|
||||
except GitStatusError:
|
||||
return False
|
||||
|
||||
|
||||
def is_git_repo(cwd: Path | None = None) -> bool:
|
||||
"""Check if directory is a git repository.
|
||||
|
||||
Args:
|
||||
cwd: Directory to check.
|
||||
|
||||
Returns:
|
||||
True if directory is a git repository.
|
||||
"""
|
||||
return _is_git_repo_impl(cwd)
|
||||
|
||||
|
||||
@dataclass
|
||||
class GitStatus:
|
||||
"""Git repository status data."""
|
||||
|
||||
branch: str
|
||||
commit: str
|
||||
staged: int = 0
|
||||
unstaged: int = 0
|
||||
untracked: int = 0
|
||||
commit_hash: str
|
||||
commit_message: str | None = None
|
||||
author_name: str | None = None
|
||||
author_date: datetime | None = None
|
||||
staged_files: int = 0
|
||||
unstaged_files: int = 0
|
||||
untracked_files: int = 0
|
||||
ahead: int = 0
|
||||
behind: int = 0
|
||||
is_detached: bool = False
|
||||
remote_name: str | None = None
|
||||
remote_url: str | None = None
|
||||
is_clean: bool = True
|
||||
is_detached: bool = False
|
||||
|
||||
|
||||
def get_git_status(cwd: str = None) -> GitStatus:
|
||||
try:
|
||||
is_detached_head = _is_detached(cwd)
|
||||
branch = "(detached)" if is_detached_head else _get_current_branch(cwd)
|
||||
commit = _get_commit_hash(cwd)
|
||||
staged = _get_staged_count(cwd)
|
||||
unstaged = _get_unstaged_count(cwd)
|
||||
untracked = _get_untracked_count(cwd)
|
||||
is_clean = (staged == 0 and unstaged == 0 and untracked == 0)
|
||||
def get_git_status(cwd: Path | None = None) -> GitStatus:
|
||||
"""Get comprehensive git status for a repository.
|
||||
|
||||
return GitStatus(
|
||||
branch=branch,
|
||||
commit=commit,
|
||||
staged=staged,
|
||||
unstaged=unstaged,
|
||||
untracked=untracked,
|
||||
is_detached=is_detached_head,
|
||||
is_clean=is_clean,
|
||||
)
|
||||
except GitStatusError:
|
||||
raise GitStatusError("Failed to get git status")
|
||||
Args:
|
||||
cwd: Repository directory path.
|
||||
|
||||
Returns:
|
||||
GitStatus dataclass with all status information.
|
||||
"""
|
||||
if cwd and not _is_git_repo_impl(cwd):
|
||||
raise GitStatusError(f"Not a git repository: {cwd}")
|
||||
|
||||
def is_git_repo(cwd: str = None) -> bool:
|
||||
try:
|
||||
_run_git_command(["rev-parse", "--git-dir"], cwd)
|
||||
return True
|
||||
except GitStatusError:
|
||||
return False
|
||||
branch, is_detached = _get_branch_impl(cwd)
|
||||
commit_hash = _get_commit_hash_impl(cwd)
|
||||
commit_message, author_name, author_date = _get_commit_info_impl(cwd)
|
||||
staged, unstaged, untracked = _get_file_counts_impl(cwd)
|
||||
ahead, behind, remote_name = _get_remote_status_impl(cwd)
|
||||
remote_url = _get_remote_url_impl(cwd)
|
||||
|
||||
is_clean = staged == 0 and unstaged == 0 and untracked == 0
|
||||
|
||||
return GitStatus(
|
||||
branch=branch,
|
||||
commit_hash=commit_hash,
|
||||
commit_message=commit_message,
|
||||
author_name=author_name,
|
||||
author_date=author_date,
|
||||
staged_files=staged,
|
||||
unstaged_files=unstaged,
|
||||
untracked_files=untracked,
|
||||
ahead=ahead,
|
||||
behind=behind,
|
||||
remote_name=remote_name,
|
||||
remote_url=remote_url,
|
||||
is_clean=is_clean,
|
||||
is_detached=is_detached,
|
||||
)
|
||||
|
||||
@@ -1,4 +1,39 @@
|
||||
from src.models.types import *
|
||||
from src.models.entities import *
|
||||
"""Data models for DevDash."""
|
||||
|
||||
__all__ = ["Repository", "PullRequest", "Issue", "Workflow", "Config"]
|
||||
from src.models.entities import (
|
||||
IssueModel,
|
||||
PipelineModel,
|
||||
PullRequestModel,
|
||||
RepositoryModel,
|
||||
WorkflowModel,
|
||||
WorkflowRunModel,
|
||||
)
|
||||
from src.models.types import (
|
||||
AppSettings,
|
||||
GitStatus,
|
||||
Issue,
|
||||
Pipeline,
|
||||
Provider,
|
||||
PullRequest,
|
||||
Repository,
|
||||
RepositoryConfig,
|
||||
Workflow,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"Provider",
|
||||
"Repository",
|
||||
"PullRequest",
|
||||
"Issue",
|
||||
"Workflow",
|
||||
"Pipeline",
|
||||
"GitStatus",
|
||||
"RepositoryConfig",
|
||||
"AppSettings",
|
||||
"RepositoryModel",
|
||||
"PullRequestModel",
|
||||
"IssueModel",
|
||||
"WorkflowModel",
|
||||
"WorkflowRunModel",
|
||||
"PipelineModel",
|
||||
]
|
||||
|
||||
@@ -1,29 +1,171 @@
|
||||
from textual.widgets import Static, Label
|
||||
from textual.containers import Container
|
||||
from typing import List, Optional
|
||||
"""Card components for DevDash."""
|
||||
from textual.widgets import Static
|
||||
|
||||
from src.models import IssueModel, PullRequestModel, WorkflowRunModel
|
||||
|
||||
|
||||
class PullRequestCard(Container):
|
||||
def __init__(
|
||||
self,
|
||||
title: str,
|
||||
author: str,
|
||||
number: int,
|
||||
draft: bool = False,
|
||||
labels: List[str] = None,
|
||||
**kwargs
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self.title = title
|
||||
self.author = author
|
||||
self.number = number
|
||||
self.draft = draft
|
||||
self.labels = labels or []
|
||||
class PullRequestCard(Static):
|
||||
"""Pull request card component."""
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Label(f"PR #{self.number}: {self.title}", classes="pr-title")
|
||||
yield Static(f"by {self.author}", classes="pr-author")
|
||||
if self.labels:
|
||||
yield Static(f"Labels: {', '.join(self.labels)}", classes="pr-labels")
|
||||
if self.draft:
|
||||
yield Static("[DRAFT]", classes="draft-badge")
|
||||
CSS = """
|
||||
PullRequestCard {
|
||||
height: auto;
|
||||
border: solid $surface;
|
||||
padding: 1;
|
||||
margin: 0 1 1 0;
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(self, pr: PullRequestModel):
|
||||
"""Initialize PR card.
|
||||
|
||||
Args:
|
||||
pr: Pull request model.
|
||||
"""
|
||||
super().__init__()
|
||||
self.pr = pr
|
||||
|
||||
def compose(self):
|
||||
"""Compose the card."""
|
||||
yield Static(self._get_content())
|
||||
|
||||
def _get_content(self) -> str:
|
||||
"""Get card content.
|
||||
|
||||
Returns:
|
||||
Formatted card content.
|
||||
"""
|
||||
pr = self.pr
|
||||
lines = []
|
||||
lines.append(f"#{pr.number} [link]{pr.title[:40]}[/]")
|
||||
lines.append(f"[dim]by[/] {pr.author}")
|
||||
|
||||
if pr.checks_total > 0:
|
||||
if pr.checks_passed is True:
|
||||
checks_status = "[green]✓[/]"
|
||||
elif pr.checks_passed is False:
|
||||
checks_status = "[red]✗[/]"
|
||||
else:
|
||||
checks_status = "[yellow]?[/]"
|
||||
lines.append(f"[dim]checks:[/] {checks_status} {pr.checks_success}/{pr.checks_total}")
|
||||
|
||||
if pr.labels:
|
||||
label_text = ", ".join(f"[cyan]{label}[/]" for label in pr.labels[:3])
|
||||
if len(pr.labels) > 3:
|
||||
label_text += f" [dim]+{len(pr.labels) - 3}[/]"
|
||||
lines.append(label_text)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class IssueCard(Static):
|
||||
"""Issue card component."""
|
||||
|
||||
CSS = """
|
||||
IssueCard {
|
||||
height: auto;
|
||||
border: solid $surface;
|
||||
padding: 1;
|
||||
margin: 0 1 1 0;
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(self, issue: IssueModel):
|
||||
"""Initialize issue card.
|
||||
|
||||
Args:
|
||||
issue: Issue model.
|
||||
"""
|
||||
super().__init__()
|
||||
self.issue = issue
|
||||
|
||||
def compose(self):
|
||||
"""Compose the card."""
|
||||
yield Static(self._get_content())
|
||||
|
||||
def _get_content(self) -> str:
|
||||
"""Get card content.
|
||||
|
||||
Returns:
|
||||
Formatted card content.
|
||||
"""
|
||||
issue = self.issue
|
||||
lines = []
|
||||
lines.append(f"#{issue.number} [link]{issue.title[:40]}[/]")
|
||||
lines.append(f"[dim]by[/] {issue.author}")
|
||||
|
||||
if issue.labels:
|
||||
label_text = ", ".join(f"[cyan]{label}[/]" for label in issue.labels[:3])
|
||||
if len(issue.labels) > 3:
|
||||
label_text += f" [dim]+{len(issue.labels) - 3}[/]"
|
||||
lines.append(label_text)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
class WorkflowCard(Static):
|
||||
"""Workflow card component."""
|
||||
|
||||
CSS = """
|
||||
WorkflowCard {
|
||||
height: auto;
|
||||
border: solid $surface;
|
||||
padding: 1;
|
||||
margin: 0 1 1 0;
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(self, workflow: WorkflowRunModel):
|
||||
"""Initialize workflow card.
|
||||
|
||||
Args:
|
||||
workflow: Workflow model.
|
||||
"""
|
||||
super().__init__()
|
||||
self.workflow = workflow
|
||||
|
||||
def compose(self):
|
||||
"""Compose the card."""
|
||||
yield Static(self._get_content())
|
||||
|
||||
def _get_content(self) -> str:
|
||||
"""Get card content.
|
||||
|
||||
Returns:
|
||||
Formatted card content.
|
||||
"""
|
||||
wf = self.workflow
|
||||
status_colors = {
|
||||
"queued": "yellow",
|
||||
"in_progress": "blue",
|
||||
"completed": "green"
|
||||
}
|
||||
conclusion_colors = {
|
||||
"success": "green",
|
||||
"failure": "red",
|
||||
"cancelled": "yellow",
|
||||
"skipped": "gray"
|
||||
}
|
||||
|
||||
color = status_colors.get(wf.status, "gray")
|
||||
if wf.status == "completed" and wf.conclusion:
|
||||
color = conclusion_colors.get(wf.conclusion, "gray")
|
||||
|
||||
icon = "○" if wf.status == "queued" else "◉" if wf.status == "in_progress" else "✓" if wf.conclusion == "success" else "✗" if wf.conclusion == "failure" else "○"
|
||||
|
||||
lines = []
|
||||
lines.append(f"[{color}]{icon}[/] {wf.name} #{wf.run_number}")
|
||||
lines.append(f"[dim]branch:[/] {wf.head_branch[:20]}")
|
||||
lines.append(f"[dim]commit:[/] {wf.head_sha[:7]}")
|
||||
|
||||
if wf.duration_seconds:
|
||||
duration = wf.duration_seconds
|
||||
if duration < 60:
|
||||
duration_text = f"{duration}s"
|
||||
elif duration < 3600:
|
||||
duration_text = f"{duration // 60}m {duration % 60}s"
|
||||
else:
|
||||
duration_text = f"{duration // 3600}h {(duration % 3600) // 60}m"
|
||||
lines.append(f"[dim]duration:[/] {duration_text}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
@@ -1,19 +1,46 @@
|
||||
"""Loading spinner component for async operations."""
|
||||
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Static
|
||||
|
||||
|
||||
class LoadingSpinner(Static):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__("Loading...", **kwargs)
|
||||
self._spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⠫⠽⠘⠰ ⠁⠂⠄⡀⢀⠠⠐⠈"
|
||||
self._index = 0
|
||||
class LoadingSpinner(Widget):
|
||||
"""A loading spinner widget for indicating async operations."""
|
||||
|
||||
async def on_mount(self) -> None:
|
||||
self.update_timer = self.set_interval(0.1, self._spin)
|
||||
DEFAULT_CSS = """
|
||||
LoadingSpinner {
|
||||
height: 3;
|
||||
width: 20;
|
||||
align: center middle;
|
||||
}
|
||||
|
||||
LoadingSpinner > Static {
|
||||
content: "Loading...";
|
||||
color: $accent;
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(self, message: str = "Loading...", *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.message = message
|
||||
self._spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]
|
||||
self._frame = 0
|
||||
|
||||
def watch_is_running(self, is_running: bool) -> None:
|
||||
"""Handle running state changes."""
|
||||
if not is_running:
|
||||
self.remove_class("loading")
|
||||
|
||||
def compose(self):
|
||||
yield Static(self.message, id="loading-text")
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.set_interval(0.1, self._spin)
|
||||
|
||||
def _spin(self) -> None:
|
||||
self._index = (self._index + 1) % len(self._spinner_chars)
|
||||
self.update(self._spinner_chars[self._index])
|
||||
|
||||
def on_unmount(self) -> None:
|
||||
if hasattr(self, 'update_timer'):
|
||||
self.update_timer.stop()
|
||||
"""Animate the spinner."""
|
||||
if self.display:
|
||||
self._frame = (self._frame + 1) % len(self._spinner_chars)
|
||||
spinner = self._spinner_chars[self._frame]
|
||||
loading_text = self.query_one("#loading-text", Static)
|
||||
loading_text.update(f"{spinner} {self.message}")
|
||||
|
||||
@@ -1,58 +1,417 @@
|
||||
"""Main dashboard screen for DevDash."""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Container, Vertical, Grid
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Static, Header, Footer, Label
|
||||
from textual import work
|
||||
from src.ui.components.panels import GitStatusPanel, CIStatusPanel, PullRequestsPanel, IssuesPanel
|
||||
from src.api.github import GitHubClient
|
||||
from src.git.status import get_git_status, is_git_repo
|
||||
import os
|
||||
from textual.containers import Container
|
||||
from textual.events import Key
|
||||
from textual.message import Message
|
||||
from textual.widgets import (
|
||||
Footer,
|
||||
Header,
|
||||
Static,
|
||||
)
|
||||
|
||||
from src.api import get_api_client
|
||||
from src.config.config_manager import ConfigManager
|
||||
from src.git.status import GitStatusError, get_git_status
|
||||
|
||||
|
||||
class DashboardScreen(Screen):
|
||||
class DataRefreshed(Message):
|
||||
"""Message sent when data is refreshed."""
|
||||
|
||||
def __init__(self, data_type: str):
|
||||
super().__init__()
|
||||
self.data_type = data_type
|
||||
|
||||
|
||||
class DashboardScreen(Container):
|
||||
"""Main dashboard screen displaying git status, PRs, issues, and workflows."""
|
||||
|
||||
CSS = """
|
||||
DashboardScreen {
|
||||
layout: grid;
|
||||
grid-size: 2 2;
|
||||
grid-rows: 1fr 1fr;
|
||||
grid-columns: 1fr 1fr;
|
||||
gap: 1;
|
||||
}
|
||||
|
||||
#git-status {
|
||||
column-span: 1;
|
||||
row-span: 1;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#workflows {
|
||||
column-span: 1;
|
||||
row-span: 1;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#pull-requests {
|
||||
column-span: 1;
|
||||
row-span: 1;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#issues {
|
||||
column-span: 1;
|
||||
row-span: 1;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#header {
|
||||
column-span: 2;
|
||||
height: 3;
|
||||
border: solid $accent;
|
||||
padding: 1;
|
||||
}
|
||||
|
||||
#footer {
|
||||
column-span: 2;
|
||||
dock: bottom;
|
||||
height: 2;
|
||||
}
|
||||
|
||||
#repo-info {
|
||||
text-style: bold;
|
||||
color: $text;
|
||||
}
|
||||
|
||||
#refresh-timer {
|
||||
color: $dim;
|
||||
}
|
||||
|
||||
.panel-title {
|
||||
text-style: bold;
|
||||
color: $accent;
|
||||
margin-bottom: 1;
|
||||
}
|
||||
|
||||
.status-line {
|
||||
margin: 0 0 1 0;
|
||||
}
|
||||
|
||||
.status-clean {
|
||||
color: $success;
|
||||
}
|
||||
|
||||
.status-dirty {
|
||||
color: $warning;
|
||||
}
|
||||
|
||||
.status-error {
|
||||
color: $error;
|
||||
}
|
||||
"""
|
||||
|
||||
BINDINGS = [
|
||||
("q", "quit", "Quit"),
|
||||
("r", "refresh", "Refresh"),
|
||||
("j", "move_down", "Move Down"),
|
||||
("k", "move_up", "Move Up"),
|
||||
("h", "move_left", "Move Left"),
|
||||
("l", "move_right", "Move Right"),
|
||||
("1", "switch_panel(0)", "Git Status"),
|
||||
("2", "switch_panel(1)", "Workflows"),
|
||||
("3", "switch_panel(2)", "Pull Requests"),
|
||||
("4", "switch_panel(3)", "Issues"),
|
||||
]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.repo: str | None = None
|
||||
self.provider: str = "github"
|
||||
self.refresh_interval: int = 30
|
||||
self.last_refresh: datetime | None = None
|
||||
self.config_manager = ConfigManager()
|
||||
self.api_client = None
|
||||
self._timer_id = None
|
||||
self._panels = ["git-status", "workflows", "pull-requests", "issues"]
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
|
||||
yield Container(
|
||||
GitStatusPanel(id="git-status"),
|
||||
CIStatusPanel(id="ci-status"),
|
||||
PullRequestsPanel(id="prs"),
|
||||
IssuesPanel(id="issues"),
|
||||
id="dashboard-grid"
|
||||
Static("DevDash", id="app-title"),
|
||||
Static(id="repo-info"),
|
||||
Static(id="refresh-timer"),
|
||||
id="header",
|
||||
)
|
||||
|
||||
yield Static("Git Status", classes="panel-title", id="git-status")
|
||||
yield Static("CI/CD Pipelines", classes="panel-title", id="workflows")
|
||||
yield Static("Pull Requests", classes="panel-title", id="pull-requests")
|
||||
yield Static("Issues", classes="panel-title", id="issues")
|
||||
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.refresh_dashboard()
|
||||
self.load_configuration()
|
||||
self.refresh_data()
|
||||
|
||||
def on_key(self, event: events.Key) -> None:
|
||||
if event.key == "r":
|
||||
self.refresh_dashboard()
|
||||
elif event.key == "q":
|
||||
self.app.exit()
|
||||
def load_configuration(self) -> None:
|
||||
"""Load configuration and set defaults."""
|
||||
settings = self.config_manager.load()
|
||||
|
||||
if self.repo is None:
|
||||
self.repo = settings.default_repo
|
||||
|
||||
if self.repo:
|
||||
repo_config = self.config_manager.get_repository(self.repo)
|
||||
if repo_config:
|
||||
self.provider = repo_config.get("provider", "github")
|
||||
|
||||
self.refresh_interval = settings.refresh_interval or 30
|
||||
|
||||
def set_repo(self, repo: str) -> None:
|
||||
"""Set the current repository.
|
||||
|
||||
Args:
|
||||
repo: Repository identifier.
|
||||
"""
|
||||
self.repo = repo
|
||||
repo_config = self.config_manager.get_repository(repo)
|
||||
if repo_config:
|
||||
self.provider = repo_config.get("provider", "github")
|
||||
self.refresh_data()
|
||||
|
||||
def start_auto_refresh(self, interval: int) -> None:
|
||||
"""Start automatic data refresh.
|
||||
|
||||
Args:
|
||||
interval: Refresh interval in seconds.
|
||||
"""
|
||||
self.refresh_interval = interval
|
||||
self._timer_id = self.set_interval(interval, self.refresh_data)
|
||||
|
||||
def stop_auto_refresh(self) -> None:
|
||||
"""Stop automatic data refresh."""
|
||||
if self._timer_id:
|
||||
self._timer_id.stop()
|
||||
self._timer_id = None
|
||||
|
||||
def action_refresh(self) -> None:
|
||||
"""Manually trigger data refresh."""
|
||||
self.refresh_data()
|
||||
|
||||
def action_switch_panel(self, panel_index: int) -> None:
|
||||
"""Switch focus to a panel.
|
||||
|
||||
Args:
|
||||
panel_index: Index of the panel to focus.
|
||||
"""
|
||||
if 0 <= panel_index < len(self._panels):
|
||||
panel_id = self._panels[panel_index]
|
||||
panel = self.query_one(f"#{panel_id}")
|
||||
panel.focus()
|
||||
|
||||
def action_quit(self) -> None:
|
||||
"""Quit the application."""
|
||||
self.app.exit()
|
||||
|
||||
def refresh_data(self) -> None:
|
||||
"""Refresh all dashboard data."""
|
||||
self.last_refresh = datetime.now()
|
||||
|
||||
self.update_header()
|
||||
self.update_git_status()
|
||||
self.update_workflows()
|
||||
self.update_pull_requests()
|
||||
self.update_issues()
|
||||
|
||||
def update_header(self) -> None:
|
||||
"""Update header with current state."""
|
||||
if self.repo:
|
||||
repo_info: Static = self.query_one("#repo-info")
|
||||
repo_info.update(f"[bold]Repository:[/] {self.repo} ({self.provider})")
|
||||
|
||||
if self.last_refresh:
|
||||
timer: Static = self.query_one("#refresh-timer")
|
||||
timer.update(f"[dim]Last refresh:[/] {self.last_refresh.strftime('%H:%M:%S')}")
|
||||
|
||||
def _build_git_status_lines(self, status: Any) -> list[str]:
|
||||
"""Build git status display lines.
|
||||
|
||||
Args:
|
||||
status: GitStatus object.
|
||||
|
||||
Returns:
|
||||
List of formatted status lines.
|
||||
"""
|
||||
lines = []
|
||||
lines.append(f"[bold]Branch:[/] {status.branch}")
|
||||
lines.append(f"[bold]Commit:[/] {status.commit_hash[:7]}")
|
||||
|
||||
if status.commit_message:
|
||||
lines.append(f"[dim]{status.commit_message[:40]}[/]")
|
||||
|
||||
lines.append("")
|
||||
|
||||
if status.is_detached:
|
||||
lines.append("[yellow]![/] Detached HEAD")
|
||||
else:
|
||||
status_indicator = "[green]✓[/] Clean" if status.is_clean else "[yellow]![/] Uncommitted changes"
|
||||
lines.append(f"[bold]Status:[/] {status_indicator}")
|
||||
|
||||
file_changes = []
|
||||
if status.staged_files > 0:
|
||||
file_changes.append(f"[green]+{status.staged_files}[/]")
|
||||
if status.unstaged_files > 0:
|
||||
file_changes.append(f"[yellow]~{status.unstaged_files}[/]")
|
||||
if status.untracked_files > 0:
|
||||
file_changes.append(f"[red]?{status.untracked_files}[/]")
|
||||
|
||||
if file_changes:
|
||||
lines.append(f"[bold]Changes:[/] {' '.join(file_changes)}")
|
||||
|
||||
if status.ahead > 0 or status.behind > 0:
|
||||
sync_info = []
|
||||
if status.ahead > 0:
|
||||
sync_info.append(f"[green]+{status.ahead}[/]")
|
||||
if status.behind > 0:
|
||||
sync_info.append(f"[red]-{status.behind}[/]")
|
||||
lines.append(f"[bold]Sync:[/] {' '.join(sync_info)}")
|
||||
|
||||
return lines
|
||||
|
||||
def update_git_status(self) -> None:
|
||||
"""Update git status panel."""
|
||||
git_panel = self.query_one("#git-status")
|
||||
|
||||
@work(exclusive=True)
|
||||
async def refresh_dashboard(self) -> None:
|
||||
try:
|
||||
git_status = get_git_status()
|
||||
self.query_one("#git-status", GitStatusPanel).update_status(git_status)
|
||||
except Exception:
|
||||
pass
|
||||
status = get_git_status()
|
||||
lines = self._build_git_status_lines(status)
|
||||
git_panel.update("\n".join(lines))
|
||||
|
||||
repo = os.environ.get("DEVDASH_REPO", "")
|
||||
if repo and "/" in repo:
|
||||
owner, repo_name = repo.split("/", 1)
|
||||
try:
|
||||
client = GitHubClient()
|
||||
prs = await client.get_pull_requests(owner, repo_name)
|
||||
self.query_one("#prs", PullRequestsPanel).update_prs(prs)
|
||||
except Exception:
|
||||
pass
|
||||
except GitStatusError as e:
|
||||
git_panel.update(f"[yellow]![/] {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
git_panel.update(f"[red]Error:[/] {str(e)}")
|
||||
|
||||
def update_workflows(self) -> None:
|
||||
"""Update workflows panel."""
|
||||
workflows_panel = self.query_one("#workflows")
|
||||
|
||||
if not self.repo:
|
||||
workflows_panel.update("No repository configured")
|
||||
return
|
||||
|
||||
try:
|
||||
client = get_api_client(self.provider, self.repo)
|
||||
runs = client.get_workflow_runs(limit=5)
|
||||
|
||||
if not runs:
|
||||
workflows_panel.update("[dim]No recent workflow runs[/]")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for run in runs[:5]:
|
||||
name = run.get("name", "Unknown")
|
||||
status = run.get("status", "")
|
||||
conclusion = run.get("conclusion")
|
||||
branch = run.get("head_branch", "")
|
||||
run_number = run.get("run_number", 0)
|
||||
|
||||
if conclusion:
|
||||
color = "green" if conclusion == "success" else "red"
|
||||
icon = "✓" if conclusion == "success" else "✗"
|
||||
status_text = f"[{color}]{icon}[/] {name}"
|
||||
elif status == "in_progress":
|
||||
status_text = f"[blue]◐[/] {name}"
|
||||
elif status == "queued":
|
||||
status_text = f"[yellow]◑[/] {name}"
|
||||
else:
|
||||
status_text = f"[dim]○[/] {name}"
|
||||
|
||||
lines.append(status_text)
|
||||
lines.append(f" [dim]#{run_number} on {branch}[/]")
|
||||
|
||||
workflows_panel.update("\n".join(lines))
|
||||
|
||||
except Exception as e:
|
||||
workflows_panel.update(f"[red]Error:[/] {str(e)}")
|
||||
|
||||
def update_pull_requests(self) -> None:
|
||||
"""Update pull requests panel."""
|
||||
prs_panel = self.query_one("#pull-requests")
|
||||
|
||||
if not self.repo:
|
||||
prs_panel.update("No repository configured")
|
||||
return
|
||||
|
||||
try:
|
||||
client = get_api_client(self.provider, self.repo)
|
||||
prs = client.get_pull_requests(state="open")
|
||||
|
||||
if not prs:
|
||||
prs_panel.update("[dim]No open pull requests[/]")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for pr in prs[:10]:
|
||||
number = pr.get("number", 0)
|
||||
title = pr.get("title", "Untitled")
|
||||
author = pr.get("user", {}).get("login", "unknown") if isinstance(pr.get("user"), dict) else "unknown"
|
||||
draft = pr.get("draft", False)
|
||||
labels = [label.get("name") for label in pr.get("labels", [])]
|
||||
|
||||
draft_indicator = " [dim](draft)[/]" if draft else ""
|
||||
lines.append(f"#{number}{draft_indicator} [link]{title[:35]}[/]")
|
||||
lines.append(f" [dim]by {author}[/]")
|
||||
|
||||
if labels:
|
||||
label_text = ", ".join(f"[cyan]{label[:10]}[/]" for label in labels[:3])
|
||||
lines.append(f" {label_text}")
|
||||
|
||||
prs_panel.update("\n".join(lines))
|
||||
|
||||
except Exception as e:
|
||||
prs_panel.update(f"[red]Error:[/] {str(e)}")
|
||||
|
||||
def update_issues(self) -> None:
|
||||
"""Update issues panel."""
|
||||
issues_panel = self.query_one("#issues")
|
||||
|
||||
if not self.repo:
|
||||
issues_panel.update("No repository configured")
|
||||
return
|
||||
|
||||
try:
|
||||
client = get_api_client(self.provider, self.repo)
|
||||
issues = client.get_issues(state="open")
|
||||
|
||||
if not issues:
|
||||
issues_panel.update("[dim]No open issues[/]")
|
||||
return
|
||||
|
||||
lines = []
|
||||
for issue in issues[:10]:
|
||||
number = issue.get("number", 0)
|
||||
title = issue.get("title", "Untitled")
|
||||
author = issue.get("user", {}).get("login", "unknown") if isinstance(issue.get("user"), dict) else "unknown"
|
||||
labels = [label.get("name") for label in issue.get("labels", [])]
|
||||
|
||||
lines.append(f"#{number} [link]{title[:35]}[/]")
|
||||
lines.append(f" [dim]by {author}[/]")
|
||||
|
||||
if labels:
|
||||
label_text = ", ".join(f"[cyan]{label[:10]}[/]" for label in labels[:3])
|
||||
lines.append(f" {label_text}")
|
||||
|
||||
issues_panel.update("\n".join(lines))
|
||||
|
||||
except Exception as e:
|
||||
issues_panel.update(f"[red]Error:[/] {str(e)}")
|
||||
|
||||
def on_key(self, event: Key) -> None:
|
||||
"""Handle key events."""
|
||||
if event.key == "escape":
|
||||
self.action_quit()
|
||||
|
||||
Reference in New Issue
Block a user