From f367f7de1116db9cb60ab0ee98754866cc72a082 Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Sun, 1 Feb 2026 21:45:47 +0000 Subject: [PATCH] Initial upload: Devtoolbelt v1.0.0 - unified CLI toolkit for developers --- devtoolbelt/commands/api.py | 496 ++++++++++++++++++++++++++++++++++++ 1 file changed, 496 insertions(+) create mode 100644 devtoolbelt/commands/api.py diff --git a/devtoolbelt/commands/api.py b/devtoolbelt/commands/api.py new file mode 100644 index 0000000..e7237d4 --- /dev/null +++ b/devtoolbelt/commands/api.py @@ -0,0 +1,496 @@ +"""API testing commands for Devtoolbelt.""" + +import json +import time +from typing import Any, Dict, List, Optional + +import click +import requests +from rich import print as rprint +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.table import Table + +from ..config import get_config +from ..utils import console, format_duration, format_json + + +@click.group() +def api(): + """API testing and inspection commands.""" + pass + + +@api.command("list") +@click.option( + "--config", "-c", + type=click.Path(exists=True), + help="Path to configuration file." +) +def list_endpoints(config: Optional[str]): + """List configured API endpoints.""" + cfg = get_config(config) + endpoints = cfg.get_api_endpoints() + + if not endpoints: + rprint("[yellow]No API endpoints configured.[/yellow]") + rprint("Add API endpoints to your config file or use 'api add' command.") + return + + table = Table(title="Configured API Endpoints") + table.add_column("Name", style="cyan") + table.add_column("URL", style="green") + + for name, url in endpoints.items(): + table.add_row(name, url) + + console.print(table) + + +@api.command("get") +@click.argument("url_or_name") +@click.option( + "--headers", "-H", + multiple=True, + help="Headers to include (format: 'Key: Value')." +) +@click.option( + "--auth", "-a", + help="Authentication token or 'user:pass' format." +) +@click.option( + "--timeout", "-t", + type=int, + default=30, + help="Request timeout in seconds." +) +@click.option( + "--config", "-c", + type=click.Path(exists=True), + help="Path to configuration file." +) +@click.option( + "--verbose", "-v", + is_flag=True, + help="Show response headers." +) +def api_get( + url_or_name: str, + headers: tuple, + auth: Optional[str], + timeout: int, + config: Optional[str], + verbose: bool +): + """Make a GET request to an API endpoint.""" + url = _resolve_url(url_or_name, config) + + request_headers = _parse_headers(headers) + auth_token = _parse_auth(auth) + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + transient=True + ) as progress: + progress.add_task("Making request...", total=None) + + response = requests.get( + url, + headers=request_headers, + auth=auth_token, + timeout=timeout + ) + + _display_response(response, verbose) + + except requests.RequestException as e: + rprint(f"[red]Request failed: {e}[/red]") + + +@api.command("post") +@click.argument("url_or_name") +@click.option( + "--data", "-d", + help="JSON data to send in request body." +) +@click.option( + "--data-file", "-D", + type=click.Path(exists=True), + help="File containing JSON data." +) +@click.option( + "--headers", "-H", + multiple=True, + help="Headers to include (format: 'Key: Value')." +) +@click.option( + "--auth", "-a", + help="Authentication token or 'user:pass' format." +) +@click.option( + "--timeout", "-t", + type=int, + default=30, + help="Request timeout in seconds." +) +@click.option( + "--config", "-c", + type=click.Path(exists=True), + help="Path to configuration file." +) +@click.option( + "--verbose", "-v", + is_flag=True, + help="Show response headers." +) +def api_post( + url_or_name: str, + data: Optional[str], + data_file: Optional[str], + headers: tuple, + auth: Optional[str], + timeout: int, + config: Optional[str], + verbose: bool +): + """Make a POST request to an API endpoint.""" + url = _resolve_url(url_or_name, config) + + body_data = None + if data_file: + with open(data_file, 'r') as f: + body_data = json.load(f) + elif data: + body_data = json.loads(data) + + request_headers = _parse_headers(headers) + if body_data and "Content-Type" not in request_headers: + request_headers["Content-Type"] = "application/json" + + auth_token = _parse_auth(auth) + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + transient=True + ) as progress: + progress.add_task("Making request...", total=None) + + response = requests.post( + url, + json=body_data, + headers=request_headers, + auth=auth_token, + timeout=timeout + ) + + _display_response(response, verbose) + + except requests.RequestException as e: + rprint(f"[red]Request failed: {e}[/red]") + except json.JSONDecodeError as e: + rprint(f"[red]Invalid JSON data: {e}[/red]") + + +@api.command("test") +@click.argument("url_or_name") +@click.option( + "--method", "-m", + type=click.Choice(["GET", "POST", "PUT", "DELETE", "PATCH"]), + default="GET", + help="HTTP method." +) +@click.option( + "--data", "-d", + help="JSON data to send (for POST, PUT, PATCH)." +) +@click.option( + "--headers", "-H", + multiple=True, + help="Headers to include." +) +@click.option( + "--auth", "-a", + help="Authentication token or 'user:pass' format." +) +@click.option( + "--timeout", "-t", + type=int, + default=30, + help="Request timeout." +) +@click.option( + "--config", "-c", + type=click.Path(exists=True), + help="Path to configuration file." +) +@click.option( + "--expect-status", "-e", + type=int, + default=200, + help="Expected HTTP status code." +) +def api_test( + url_or_name: str, + method: str, + data: Optional[str], + headers: tuple, + auth: Optional[str], + timeout: int, + config: Optional[str], + expect_status: int +): + """Test an API endpoint with assertions.""" + url = _resolve_url(url_or_name, config) + + body_data = None + if data: + body_data = json.loads(data) + + request_headers = _parse_headers(headers) + if body_data and "Content-Type" not in request_headers: + request_headers["Content-Type"] = "application/json" + + auth_token = _parse_auth(auth) + + start_time = time.time() + + try: + response = requests.request( + method=method, + url=url, + json=body_data, + headers=request_headers, + auth=auth_token, + timeout=timeout + ) + + duration = time.time() - start_time + + results_table = Table(title="API Test Results") + results_table.add_column("Metric", style="cyan") + results_table.add_column("Value", style="green") + + status_color = "green" if response.status_code == expect_status else "red" + results_table.add_row( + "Status", + f"[{status_color}]{response.status_code}[/{status_color}]" + ) + results_table.add_row( + "Expected", + str(expect_status) + ) + results_table.add_row( + "Duration", + format_duration(duration) + ) + results_table.add_row( + "Content-Type", + response.headers.get("Content-Type", "unknown") + ) + + console.print(results_table) + + if response.status_code == expect_status: + rprint("[green]✓ Test PASSED[/green]") + else: + rprint(f"[red]✗ Test FAILED (expected {expect_status})[/red]") + + if response.headers.get("Content-Type", "").startswith("application/json"): + try: + rprint("\n[bold cyan]Response Body:[/bold cyan]") + rprint(format_json(response.json())) + except json.JSONDecodeError: + rprint("\n[yellow]Response is not valid JSON[/yellow]") + + except requests.RequestException as e: + rprint(f"[red]Request failed: {e}[/red]") + + +@api.command("add") +@click.argument("name") +@click.argument("url") +@click.option( + "--config", "-c", + type=click.Path(exists=True), + help="Path to configuration file." +) +def add_endpoint(name: str, url: str, config: Optional[str]): + """Add an API endpoint configuration.""" + cfg = get_config(config) + endpoints = cfg.get_api_endpoints() + + if name in endpoints: + rprint(f"[yellow]Endpoint '{name}' already exists. Overwriting.[/yellow]") + + endpoints[name] = url + cfg.set("api_endpoints", endpoints) + cfg.save() + + rprint(f"[green]API endpoint '{name}' added successfully.[/green]") + + +@api.command("benchmark") +@click.argument("url_or_name") +@click.option( + "--requests", "-n", + type=int, + default=100, + help="Number of requests to make." +) +@click.option( + "--concurrency", "-c", + type=int, + default=1, + help="Number of concurrent workers." +) +@click.option( + "--method", "-m", + type=click.Choice(["GET", "POST"]), + default="GET", + help="HTTP method." +) +@click.option( + "--config", "-c", + type=click.Path(exists=True), + help="Path to configuration file." +) +def api_benchmark( + url_or_name: str, + requests_count: int, + concurrency: int, + method: str, + config: Optional[str] +): + """Benchmark an API endpoint.""" + from concurrent.futures import ThreadPoolExecutor, as_completed + + url = _resolve_url(url_or_name, config) + + results: List[Dict[str, Any]] = [] + + def make_request() -> Dict[str, Any]: + start = time.time() + try: + response = requests.request(method=method, url=url, timeout=30) + return { + "status": response.status_code, + "duration": time.time() - start, + "success": response.status_code < 400 + } + except Exception as e: + return { + "status": 0, + "duration": time.time() - start, + "success": False, + "error": str(e) + } + + with Progress(transient=True) as progress: + task = progress.add_task("Running benchmark...", total=requests_count) + + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = [executor.submit(make_request) for _ in range(requests_count)] + for future in as_completed(futures): + results.append(future.result()) + progress.advance(task) + + successes = sum(1 for r in results if r.get("success")) + durations = [r["duration"] for r in results] + avg_duration = sum(durations) / len(durations) if durations else 0 + + benchmark_table = Table(title=f"Benchmark Results: {url}") + benchmark_table.add_column("Metric", style="cyan") + benchmark_table.add_column("Value", style="green") + + benchmark_table.add_row("Total Requests", str(len(results))) + benchmark_table.add_row("Successful", f"{successes} ({successes/len(results)*100:.1f}%)") + benchmark_table.add_row("Failed", str(len(results) - successes)) + benchmark_table.add_row("Avg Response Time", format_duration(avg_duration)) + benchmark_table.add_row("Min Response Time", format_duration(min(durations)) if durations else "N/A") + benchmark_table.add_row("Max Response Time", format_duration(max(durations)) if durations else "N/A") + + console.print(benchmark_table) + + +def _resolve_url(url_or_name: str, config: Optional[str]) -> str: + """Resolve URL from name or direct URL.""" + if url_or_name.startswith(("http://", "https://")): + return url_or_name + + cfg = get_config(config) + endpoints = cfg.get_api_endpoints() + + if url_or_name in endpoints: + return endpoints[url_or_name] + + rprint(f"[yellow]Endpoint '{url_or_name}' not found. Using as URL.[/yellow]") + return url_or_name + + +def _parse_headers(headers: tuple) -> Dict[str, str]: + """Parse headers from command line.""" + result = {} + for header in headers: + if ":" in header: + key, value = header.split(":", 1) + result[key.strip()] = value.strip() + return result + + +def _parse_auth(auth: Optional[str]) -> Optional[tuple]: + """Parse authentication from command line.""" + if not auth: + return None + if ":" in auth: + parts = auth.split(":", 1) + return (parts[0], parts[1]) + return (auth, "") + + +def _display_response(response: requests.Response, verbose: bool = False): + """Display API response.""" + status_color = "green" if response.status_code < 400 else "red" + + info_table = Table(show_header=False) + info_table.add_column("Key", style="cyan") + info_table.add_column("Value", style="green") + + info_table.add_row( + "Status", + f"[{status_color}]{response.status_code} {response.reason}[/{status_color}]" + ) + info_table.add_row("URL", response.url) + info_table.add_row( + "Time", + format_duration(response.elapsed.total_seconds()) + ) + info_table.add_row( + "Size", + f"{len(response.content)} bytes" + ) + + console.print(info_table) + + if verbose: + headers_table = Table(title="Response Headers") + headers_table.add_column("Header", style="cyan") + headers_table.add_column("Value", style="green") + + for key, value in response.headers.items(): + headers_table.add_row(key, value) + + console.print(headers_table) + + content_type = response.headers.get("Content-Type", "") + + if content_type.startswith("application/json"): + try: + rprint("\n[bold cyan]Response Body:[/bold cyan]") + rprint(format_json(response.json())) + except json.JSONDecodeError: + rprint("\n[yellow]Response is not valid JSON[/yellow]") + elif content_type.startswith("text/"): + rprint(f"\n[bold cyan]Response:[/bold cyan]\n{response.text[:2000]}")