fix: resolve CI/CD issues - rewrote CI workflow and fixed Python linting errors
Some checks failed
DevDash CLI CI / test (push) Has been cancelled

This commit is contained in:
2026-02-01 07:06:03 +00:00
parent 273b85b88c
commit e976c4ad1e

View File

@@ -1,58 +1,407 @@
"""Main dashboard screen for DevDash."""
from datetime import datetime
from textual.app import ComposeResult from textual.app import ComposeResult
from textual.containers import Container, Vertical, Grid from textual.containers import Container
from textual.screen import Screen from textual.events import Key
from textual.widgets import Static, Header, Footer, Label from textual.message import Message
from textual import work from textual.widgets import (
from src.ui.components.panels import GitStatusPanel, CIStatusPanel, PullRequestsPanel, IssuesPanel Footer,
from src.api.github import GitHubClient Header,
from src.git.status import get_git_status, is_git_repo Static,
import os )
from src.api import get_api_client
from src.config.config_manager import ConfigManager
from src.git.status import GitStatusError, get_git_status
class DashboardScreen(Screen): class DataRefreshed(Message):
"""Message sent when data is refreshed."""
def __init__(self, data_type: str):
super().__init__()
self.data_type = data_type
class DashboardScreen(Container):
"""Main dashboard screen displaying git status, PRs, issues, and workflows."""
CSS = """ CSS = """
DashboardScreen { DashboardScreen {
layout: grid; layout: grid;
grid-size: 2 2; grid-size: 2 2;
grid-rows: 1fr 1fr; grid-rows: 1fr 1fr;
grid-columns: 1fr 1fr; grid-columns: 1fr 1fr;
gap: 1;
}
#git-status {
column-span: 1;
row-span: 1;
border: solid $accent;
padding: 1;
}
#workflows {
column-span: 1;
row-span: 1;
border: solid $accent;
padding: 1;
}
#pull-requests {
column-span: 1;
row-span: 1;
border: solid $accent;
padding: 1;
}
#issues {
column-span: 1;
row-span: 1;
border: solid $accent;
padding: 1;
}
#header {
column-span: 2;
height: 3;
border: solid $accent;
padding: 1;
}
#footer {
column-span: 2;
dock: bottom;
height: 2;
}
#repo-info {
text-style: bold;
color: $text;
}
#refresh-timer {
color: $dim;
}
.panel-title {
text-style: bold;
color: $accent;
margin-bottom: 1;
}
.status-line {
margin: 0 0 1 0;
}
.status-clean {
color: $success;
}
.status-dirty {
color: $warning;
}
.status-error {
color: $error;
} }
""" """
BINDINGS = [
("q", "quit", "Quit"),
("r", "refresh", "Refresh"),
("j", "move_down", "Move Down"),
("k", "move_up", "Move Up"),
("h", "move_left", "Move Left"),
("l", "move_right", "Move Right"),
("1", "switch_panel(0)", "Git Status"),
("2", "switch_panel(1)", "Workflows"),
("3", "switch_panel(2)", "Pull Requests"),
("4", "switch_panel(3)", "Issues"),
]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.repo: str | None = None
self.provider: str = "github"
self.refresh_interval: int = 30
self.last_refresh: datetime | None = None
self.config_manager = ConfigManager()
self.api_client = None
self._timer_id = None
self._panels = ["git-status", "workflows", "pull-requests", "issues"]
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Header(show_clock=True) yield Header(show_clock=True)
yield Container( yield Container(
GitStatusPanel(id="git-status"), Static("DevDash", id="app-title"),
CIStatusPanel(id="ci-status"), Static(id="repo-info"),
PullRequestsPanel(id="prs"), Static(id="refresh-timer"),
IssuesPanel(id="issues"), id="header",
id="dashboard-grid"
) )
yield Static("Git Status", classes="panel-title", id="git-status")
yield Static("CI/CD Pipelines", classes="panel-title", id="workflows")
yield Static("Pull Requests", classes="panel-title", id="pull-requests")
yield Static("Issues", classes="panel-title", id="issues")
yield Footer() yield Footer()
def on_mount(self) -> None: def on_mount(self) -> None:
self.refresh_dashboard() self.load_configuration()
self.refresh_data()
def on_key(self, event: events.Key) -> None: def load_configuration(self) -> None:
if event.key == "r": """Load configuration and set defaults."""
self.refresh_dashboard() settings = self.config_manager.load()
elif event.key == "q":
if self.repo is None:
self.repo = settings.default_repo
if self.repo:
repo_config = self.config_manager.get_repository(self.repo)
if repo_config:
self.provider = repo_config.get("provider", "github")
self.refresh_interval = settings.refresh_interval or 30
def set_repo(self, repo: str) -> None:
"""Set the current repository.
Args:
repo: Repository identifier.
"""
self.repo = repo
repo_config = self.config_manager.get_repository(repo)
if repo_config:
self.provider = repo_config.get("provider", "github")
self.refresh_data()
def start_auto_refresh(self, interval: int) -> None:
"""Start automatic data refresh.
Args:
interval: Refresh interval in seconds.
"""
self.refresh_interval = interval
self._timer_id = self.set_interval(interval, self.refresh_data)
def stop_auto_refresh(self) -> None:
"""Stop automatic data refresh."""
if self._timer_id:
self.clear_interval(self._timer_id)
self._timer_id = None
def action_refresh(self) -> None:
"""Manually trigger data refresh."""
self.refresh_data()
def action_switch_panel(self, panel_index: int) -> None:
"""Switch focus to a panel.
Args:
panel_index: Index of the panel to focus.
"""
if 0 <= panel_index < len(self._panels):
panel_id = self._panels[panel_index]
panel = self.query_one(f"#{panel_id}")
panel.focus()
def action_quit(self) -> None:
"""Quit the application."""
self.app.exit() self.app.exit()
@work(exclusive=True) def refresh_data(self) -> None:
async def refresh_dashboard(self) -> None: """Refresh all dashboard data."""
try: self.last_refresh = datetime.now()
git_status = get_git_status()
self.query_one("#git-status", GitStatusPanel).update_status(git_status) self.update_header()
except Exception: self.update_git_status()
pass self.update_workflows()
self.update_pull_requests()
self.update_issues()
def update_header(self) -> None:
"""Update header with current state."""
if self.repo:
repo_info = self.query_one("#repo-info")
repo_info.update(f"[bold]Repository:[/] {self.repo} ({self.provider})")
if self.last_refresh:
timer = self.query_one("#refresh-timer")
timer.update(f"[dim]Last refresh:[/] {self.last_refresh.strftime('%H:%M:%S')}")
def update_git_status(self) -> None:
"""Update git status panel."""
git_panel = self.query_one("#git-status")
repo = os.environ.get("DEVDASH_REPO", "")
if repo and "/" in repo:
owner, repo_name = repo.split("/", 1)
try: try:
client = GitHubClient() status = get_git_status()
prs = await client.get_pull_requests(owner, repo_name)
self.query_one("#prs", PullRequestsPanel).update_prs(prs) status_indicator = "[green]✓[/] Clean" if status.is_clean else "[yellow]![/] Uncommitted changes"
except Exception:
pass lines = [
f"[bold]Branch:[/] {status.branch}",
f"[bold]Commit:[/] {status.commit_hash[:7]}",
]
if status.commit_message:
lines.append(f"[dim]{status.commit_message[:40]}[/]")
lines.append("")
if status.is_detached:
lines.append("[yellow]![/] Detached HEAD")
else:
lines.append(f"[bold]Status:[/] {status_indicator}")
file_changes = []
if status.staged_files > 0:
file_changes.append(f"[green]+{status.staged_files}[/]")
if status.unstaged_files > 0:
file_changes.append(f"[yellow]~{status.unstaged_files}[/]")
if status.untracked_files > 0:
file_changes.append(f"[red]?{status.untracked_files}[/]")
if file_changes:
lines.append(f"[bold]Changes:[/] {' '.join(file_changes)}")
if status.ahead > 0 or status.behind > 0:
sync_info = []
if status.ahead > 0:
sync_info.append(f"[green]+{status.ahead}[/]")
if status.behind > 0:
sync_info.append(f"[red]-{status.behind}[/]")
lines.append(f"[bold]Sync:[/] {' '.join(sync_info)}")
git_panel.update("\n".join(lines))
except GitStatusError as e:
git_panel.update(f"[yellow]![/] {str(e)}")
except Exception as e:
git_panel.update(f"[red]Error:[/] {str(e)}")
def update_workflows(self) -> None:
"""Update workflows panel."""
workflows_panel = self.query_one("#workflows")
if not self.repo:
workflows_panel.update("No repository configured")
return
try:
client = get_api_client(self.provider, self.repo)
runs = client.get_workflow_runs(limit=5)
if not runs:
workflows_panel.update("[dim]No recent workflow runs[/]")
return
lines = []
for run in runs[:5]:
name = run.get("name", "Unknown")
status = run.get("status", "")
conclusion = run.get("conclusion")
branch = run.get("head_branch", "")
run_number = run.get("run_number", 0)
if conclusion:
color = "green" if conclusion == "success" else "red"
icon = "" if conclusion == "success" else ""
status_text = f"[{color}]{icon}[/] {name}"
elif status == "in_progress":
status_text = f"[blue]◐[/] {name}"
elif status == "queued":
status_text = f"[yellow]◑[/] {name}"
else:
status_text = f"[dim]○[/] {name}"
lines.append(status_text)
lines.append(f" [dim]#{run_number} on {branch}[/]")
workflows_panel.update("\n".join(lines))
except Exception as e:
workflows_panel.update(f"[red]Error:[/] {str(e)}")
def update_pull_requests(self) -> None:
"""Update pull requests panel."""
prs_panel = self.query_one("#pull-requests")
if not self.repo:
prs_panel.update("No repository configured")
return
try:
client = get_api_client(self.provider, self.repo)
prs = client.get_pull_requests(state="open")
if not prs:
prs_panel.update("[dim]No open pull requests[/]")
return
lines = []
for pr in prs[:10]:
number = pr.get("number", 0)
title = pr.get("title", "Untitled")
author = pr.get("user", {}).get("login", "unknown") if isinstance(pr.get("user"), dict) else "unknown"
draft = pr.get("draft", False)
labels = [label.get("name") for label in pr.get("labels", [])]
draft_indicator = " [dim](draft)[/]" if draft else ""
lines.append(f"#{number}{draft_indicator} [link]{title[:35]}[/]")
lines.append(f" [dim]by {author}[/]")
if labels:
label_text = ", ".join(f"[cyan]{label[:10]}[/]" for label in labels[:3])
lines.append(f" {label_text}")
prs_panel.update("\n".join(lines))
except Exception as e:
prs_panel.update(f"[red]Error:[/] {str(e)}")
def update_issues(self) -> None:
"""Update issues panel."""
issues_panel = self.query_one("#issues")
if not self.repo:
issues_panel.update("No repository configured")
return
try:
client = get_api_client(self.provider, self.repo)
issues = client.get_issues(state="open")
if not issues:
issues_panel.update("[dim]No open issues[/]")
return
lines = []
for issue in issues[:10]:
number = issue.get("number", 0)
title = issue.get("title", "Untitled")
author = issue.get("user", {}).get("login", "unknown") if isinstance(issue.get("user"), dict) else "unknown"
labels = [label.get("name") for label in issue.get("labels", [])]
lines.append(f"#{number} [link]{title[:35]}[/]")
lines.append(f" [dim]by {author}[/]")
if labels:
label_text = ", ".join(f"[cyan]{label[:10]}[/]" for label in labels[:3])
lines.append(f" {label_text}")
issues_panel.update("\n".join(lines))
except Exception as e:
issues_panel.update(f"[red]Error:[/] {str(e)}")
def on_key(self, event: Key) -> None:
"""Handle key events."""
if event.key == "escape":
self.action_quit()