diff --git a/src/github_client.py b/src/github_client.py new file mode 100644 index 0000000..fbf1dd3 --- /dev/null +++ b/src/github_client.py @@ -0,0 +1,238 @@ +"""GitHub API client for Code Pattern Search CLI.""" + +import time +from datetime import datetime +from typing import Any, Optional +from dataclasses import dataclass + +import requests +from ghapi.core import GhApi + + +@dataclass +class RateLimitExceededError(Exception): + """Raised when GitHub API rate limit is exceeded.""" + + reset_time: datetime + message: str = "GitHub API rate limit exceeded" + + +class GitHubClient: + """Client for interacting with GitHub API.""" + + def __init__( + self, + token: Optional[str] = None, + verbose: bool = False, + timeout: int = 30, + ) -> None: + """Initialize the GitHub client.""" + self.token = token + self.verbose = verbose + self.timeout = timeout + self._api: Optional[GhApi] = None + self._rate_limit_remaining: int = 60 + + @property + def api(self) -> GhApi: + """Get or create the GhApi instance.""" + if self._api is None: + headers = {} + if self.token: + headers["Authorization"] = f"token {self.token}" + + self._api = GhApi( + headers=headers, + timeout=self.timeout, + ) + return self._api + + def _handle_rate_limit(self, response: requests.Response) -> None: + """Handle rate limit headers from response.""" + remaining = response.headers.get("X-RateLimit-Remaining") + reset = response.headers.get("X-RateLimit-Reset") + + if remaining: + self._rate_limit_remaining = int(remaining) + + if reset: + reset_time = datetime.fromtimestamp(int(reset)) + if self._rate_limit_remaining <= 0: + raise RateLimitExceededError( + reset_time=reset_time, + message="GitHub API rate limit exceeded. Please wait or provide a token.", + ) + + def _retry_with_backoff( + self, + func, + *args, + max_retries: int = 3, + backoff_factor: float = 0.5, + **kwargs, + ) -> Any: + """Execute function with exponential backoff on failure.""" + last_exception = None + + for attempt in range(max_retries): + try: + result = func(*args, **kwargs) + return result + except RateLimitExceededError: + raise + except requests.exceptions.RequestException as e: + last_exception = e + + if attempt < max_retries - 1: + sleep_time = backoff_factor * (2 ** attempt) + if self.verbose: + print(f"Retry {attempt + 1}/{max_retries} after {sleep_time}s: {e}") + time.sleep(sleep_time) + else: + if last_exception: + raise last_exception + raise requests.exceptions.RequestException("Unknown error occurred") from e + + if last_exception: + raise last_exception + raise requests.exceptions.RequestException("Unknown error occurred") + + def search_repositories( + self, + query: Optional[str] = None, + language: Optional[str] = None, + stars_min: Optional[int] = None, + stars_max: Optional[int] = None, + per_page: int = 10, + ) -> list[Any]: + """Search for repositories matching criteria.""" + search_query = [] + + if query: + search_query.append(query) + + if language: + search_query.append(f"language:{language}") + + if stars_min is not None: + search_query.append(f"stars:>={stars_min}") + + if stars_max is not None: + search_query.append(f"stars:<={stars_max}") + + search_query.append("sort:stars") + search_query.append("order:desc") + + full_query = " ".join(search_query) + + if self.verbose: + print(f"Search query: {full_query}") + + def _do_search() -> list[Any]: + response = self.api.search.repos( + q=full_query, + per_page=per_page, + page=1, + ) + + if hasattr(response, 'headers'): + self._handle_rate_limit(response) + + return response.get("items", []) + + return self._retry_with_backoff(_do_search) + + def get_file_tree( + self, + owner: str, + repo: str, + recursive: bool = True, + ) -> list[dict[str, Any]]: + """Get file tree for a repository.""" + def _do_fetch() -> list[dict[str, Any]]: + try: + response = self.api.git_trees.get( + owner=owner, + repo=repo, + recursive=1 if recursive else 0, + ) + + if hasattr(response, 'headers'): + self._handle_rate_limit(response) + + if response.truncated: + if self.verbose: + print(f"Warning: File tree truncated for {owner}/{repo}") + + return response.tree + + except Exception as e: + if self.verbose: + print(f"Error fetching file tree for {owner}/{repo}: {e}") + return [] + + return self._retry_with_backoff(_do_fetch) + + def get_file_content( + self, + owner: str, + repo: str, + path: str, + ) -> Optional[str]: + """Get the content of a file.""" + def _do_fetch() -> Optional[str]: + try: + response = self.api.repos.get_content( + owner=owner, + repo=repo, + path=path, + ) + + if hasattr(response, 'headers'): + self._handle_rate_limit(response) + + if isinstance(response, dict): + if response.get("encoding") == "base64": + import base64 + return base64.b64decode(response["content"]).decode("utf-8") + + return None + + except Exception as e: + if self.verbose: + print(f"Error fetching {path} from {owner}/{repo}: {e}") + return None + + return self._retry_with_backoff(_do_fetch) + + def get_repo_info( + self, + owner: str, + repo: str, + ) -> Optional[dict[str, Any]]: + """Get repository information.""" + def _do_fetch() -> Optional[dict[str, Any]]: + try: + response = self.api.repos.get( + owner=owner, + repo=repo, + ) + return response + except Exception as e: + if self.verbose: + print(f"Error fetching repo info for {owner}/{repo}: {e}") + return None + + return self._retry_with_backoff(_do_fetch) + + def get_rate_limit_status(self) -> dict[str, Any]: + """Get current rate limit status.""" + try: + response = self.api.rate_limit.get() + return { + "remaining": self._rate_limit_remaining, + "limit": response.get("resources", {}).get("core", {}).get("limit"), + "reset": response.get("resources", {}).get("core", {}).get("reset"), + } + except Exception as e: + return {"error": str(e)}