Initial upload with CI/CD workflow
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-02-02 18:27:15 +00:00
parent b2dc4e1bc7
commit dcb506867a

238
src/github_client.py Normal file
View File

@@ -0,0 +1,238 @@
"""GitHub API client for Code Pattern Search CLI."""
import time
from datetime import datetime
from typing import Any, Optional
from dataclasses import dataclass
import requests
from ghapi.core import GhApi
@dataclass
class RateLimitExceededError(Exception):
"""Raised when GitHub API rate limit is exceeded."""
reset_time: datetime
message: str = "GitHub API rate limit exceeded"
class GitHubClient:
"""Client for interacting with GitHub API."""
def __init__(
self,
token: Optional[str] = None,
verbose: bool = False,
timeout: int = 30,
) -> None:
"""Initialize the GitHub client."""
self.token = token
self.verbose = verbose
self.timeout = timeout
self._api: Optional[GhApi] = None
self._rate_limit_remaining: int = 60
@property
def api(self) -> GhApi:
"""Get or create the GhApi instance."""
if self._api is None:
headers = {}
if self.token:
headers["Authorization"] = f"token {self.token}"
self._api = GhApi(
headers=headers,
timeout=self.timeout,
)
return self._api
def _handle_rate_limit(self, response: requests.Response) -> None:
"""Handle rate limit headers from response."""
remaining = response.headers.get("X-RateLimit-Remaining")
reset = response.headers.get("X-RateLimit-Reset")
if remaining:
self._rate_limit_remaining = int(remaining)
if reset:
reset_time = datetime.fromtimestamp(int(reset))
if self._rate_limit_remaining <= 0:
raise RateLimitExceededError(
reset_time=reset_time,
message="GitHub API rate limit exceeded. Please wait or provide a token.",
)
def _retry_with_backoff(
self,
func,
*args,
max_retries: int = 3,
backoff_factor: float = 0.5,
**kwargs,
) -> Any:
"""Execute function with exponential backoff on failure."""
last_exception = None
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
return result
except RateLimitExceededError:
raise
except requests.exceptions.RequestException as e:
last_exception = e
if attempt < max_retries - 1:
sleep_time = backoff_factor * (2 ** attempt)
if self.verbose:
print(f"Retry {attempt + 1}/{max_retries} after {sleep_time}s: {e}")
time.sleep(sleep_time)
else:
if last_exception:
raise last_exception
raise requests.exceptions.RequestException("Unknown error occurred") from e
if last_exception:
raise last_exception
raise requests.exceptions.RequestException("Unknown error occurred")
def search_repositories(
self,
query: Optional[str] = None,
language: Optional[str] = None,
stars_min: Optional[int] = None,
stars_max: Optional[int] = None,
per_page: int = 10,
) -> list[Any]:
"""Search for repositories matching criteria."""
search_query = []
if query:
search_query.append(query)
if language:
search_query.append(f"language:{language}")
if stars_min is not None:
search_query.append(f"stars:>={stars_min}")
if stars_max is not None:
search_query.append(f"stars:<={stars_max}")
search_query.append("sort:stars")
search_query.append("order:desc")
full_query = " ".join(search_query)
if self.verbose:
print(f"Search query: {full_query}")
def _do_search() -> list[Any]:
response = self.api.search.repos(
q=full_query,
per_page=per_page,
page=1,
)
if hasattr(response, 'headers'):
self._handle_rate_limit(response)
return response.get("items", [])
return self._retry_with_backoff(_do_search)
def get_file_tree(
self,
owner: str,
repo: str,
recursive: bool = True,
) -> list[dict[str, Any]]:
"""Get file tree for a repository."""
def _do_fetch() -> list[dict[str, Any]]:
try:
response = self.api.git_trees.get(
owner=owner,
repo=repo,
recursive=1 if recursive else 0,
)
if hasattr(response, 'headers'):
self._handle_rate_limit(response)
if response.truncated:
if self.verbose:
print(f"Warning: File tree truncated for {owner}/{repo}")
return response.tree
except Exception as e:
if self.verbose:
print(f"Error fetching file tree for {owner}/{repo}: {e}")
return []
return self._retry_with_backoff(_do_fetch)
def get_file_content(
self,
owner: str,
repo: str,
path: str,
) -> Optional[str]:
"""Get the content of a file."""
def _do_fetch() -> Optional[str]:
try:
response = self.api.repos.get_content(
owner=owner,
repo=repo,
path=path,
)
if hasattr(response, 'headers'):
self._handle_rate_limit(response)
if isinstance(response, dict):
if response.get("encoding") == "base64":
import base64
return base64.b64decode(response["content"]).decode("utf-8")
return None
except Exception as e:
if self.verbose:
print(f"Error fetching {path} from {owner}/{repo}: {e}")
return None
return self._retry_with_backoff(_do_fetch)
def get_repo_info(
self,
owner: str,
repo: str,
) -> Optional[dict[str, Any]]:
"""Get repository information."""
def _do_fetch() -> Optional[dict[str, Any]]:
try:
response = self.api.repos.get(
owner=owner,
repo=repo,
)
return response
except Exception as e:
if self.verbose:
print(f"Error fetching repo info for {owner}/{repo}: {e}")
return None
return self._retry_with_backoff(_do_fetch)
def get_rate_limit_status(self) -> dict[str, Any]:
"""Get current rate limit status."""
try:
response = self.api.rate_limit.get()
return {
"remaining": self._rate_limit_remaining,
"limit": response.get("resources", {}).get("core", {}).get("limit"),
"reset": response.get("resources", {}).get("core", {}).get("reset"),
}
except Exception as e:
return {"error": str(e)}