Initial upload: ScaffoldForge CLI tool with full codebase, tests, and CI/CD
This commit is contained in:
340
scaffoldforge/parsers/issue_parser.py
Normal file
340
scaffoldforge/parsers/issue_parser.py
Normal file
@@ -0,0 +1,340 @@
|
|||||||
|
"""GitHub issue parsing functionality."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
from github import Github
|
||||||
|
from github.Issue import Issue
|
||||||
|
from github.Label import Label
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ChecklistItem:
|
||||||
|
"""Represents a checklist item from a GitHub issue."""
|
||||||
|
|
||||||
|
text: str
|
||||||
|
completed: bool
|
||||||
|
line_number: Optional[int] = None
|
||||||
|
category: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class IssueData:
|
||||||
|
"""Structured data extracted from a GitHub issue."""
|
||||||
|
|
||||||
|
number: int
|
||||||
|
title: str
|
||||||
|
body: str
|
||||||
|
body_html: str
|
||||||
|
labels: List[str]
|
||||||
|
state: str
|
||||||
|
url: str
|
||||||
|
repository: str
|
||||||
|
author: str
|
||||||
|
created_at: str
|
||||||
|
updated_at: str
|
||||||
|
checklist: List[ChecklistItem] = field(default_factory=list)
|
||||||
|
requirements: List[str] = field(default_factory=list)
|
||||||
|
acceptance_criteria: List[str] = field(default_factory=list)
|
||||||
|
suggested_files: List[str] = field(default_factory=list)
|
||||||
|
suggested_directories: List[str] = field(default_factory=list)
|
||||||
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
def get_todo_items(self) -> List[str]:
|
||||||
|
"""Get all todo items from checklist."""
|
||||||
|
return [item.text for item in self.checklist if not item.completed]
|
||||||
|
|
||||||
|
def get_completed_items(self) -> List[str]:
|
||||||
|
"""Get completed checklist items."""
|
||||||
|
return [item.text for item in self.checklist if item.completed]
|
||||||
|
|
||||||
|
def generate_todo_comments(self) -> str:
|
||||||
|
"""Generate TODO comments from checklist items."""
|
||||||
|
todos = self.get_todo_items()
|
||||||
|
if not todos:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
lines = ["", "# TODO Items from GitHub Issue", ""]
|
||||||
|
for i, todo in enumerate(todos, 1):
|
||||||
|
lines.append(f"# TODO #{i}: {todo}")
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
class IssueParser:
|
||||||
|
"""Parser for GitHub issues."""
|
||||||
|
|
||||||
|
LABEL_LANGUAGE_MAP = {
|
||||||
|
"python": ["python", "py", "python3"],
|
||||||
|
"javascript": ["javascript", "js", "node", "nodejs"],
|
||||||
|
"go": ["go", "golang"],
|
||||||
|
"rust": ["rust", "rs"],
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, token: Optional[str] = None):
|
||||||
|
"""Initialize the issue parser.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
token: GitHub personal access token for API access.
|
||||||
|
"""
|
||||||
|
self.token = token or os.environ.get("GITHUB_TOKEN")
|
||||||
|
if self.token:
|
||||||
|
self.github = Github(self.token)
|
||||||
|
else:
|
||||||
|
self.github = Github()
|
||||||
|
|
||||||
|
def parse_issue(
|
||||||
|
self, owner: str, repo: str, issue_number: int, max_retries: int = 3
|
||||||
|
) -> IssueData:
|
||||||
|
"""Parse a GitHub issue and extract structured data.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
owner: Repository owner.
|
||||||
|
repo: Repository name.
|
||||||
|
issue_number: Issue number.
|
||||||
|
max_retries: Maximum number of retries on rate limit.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
IssueData object with extracted information.
|
||||||
|
"""
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
repository = self.github.get_repo(f"{owner}/{repo}")
|
||||||
|
issue = repository.get_issue(issue_number)
|
||||||
|
return self._extract_issue_data(issue, f"{owner}/{repo}")
|
||||||
|
except Exception as e:
|
||||||
|
if "rate limit" in str(e).lower() and attempt < max_retries - 1:
|
||||||
|
time.sleep(60 * (attempt + 1))
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _extract_issue_data(self, issue: Issue, repository: str) -> IssueData:
|
||||||
|
"""Extract structured data from a GitHub issue.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
issue: PyGithub Issue object.
|
||||||
|
repository: Repository identifier (owner/repo).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
IssueData object with extracted information.
|
||||||
|
"""
|
||||||
|
labels = [label.name for label in issue.labels]
|
||||||
|
|
||||||
|
checklist = self._parse_checklist(issue.body)
|
||||||
|
requirements = self._parse_requirements(issue.body)
|
||||||
|
acceptance_criteria = self._parse_acceptance_criteria(issue.body)
|
||||||
|
suggested_files = self._parse_file_paths(issue.body)
|
||||||
|
suggested_directories = self._parse_directory_paths(issue.body)
|
||||||
|
|
||||||
|
return IssueData(
|
||||||
|
number=issue.number,
|
||||||
|
title=issue.title,
|
||||||
|
body=issue.body or "",
|
||||||
|
body_html=issue.body_html or "",
|
||||||
|
labels=labels,
|
||||||
|
state=issue.state,
|
||||||
|
url=issue.html_url,
|
||||||
|
repository=repository,
|
||||||
|
author=issue.user.login if issue.user else "unknown",
|
||||||
|
created_at=issue.created_at.isoformat() if issue.created_at else "",
|
||||||
|
updated_at=issue.updated_at.isoformat() if issue.updated_at else "",
|
||||||
|
checklist=checklist,
|
||||||
|
requirements=requirements,
|
||||||
|
acceptance_criteria=acceptance_criteria,
|
||||||
|
suggested_files=suggested_files,
|
||||||
|
suggested_directories=suggested_directories,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _parse_checklist(self, body: str) -> List[ChecklistItem]:
|
||||||
|
"""Parse markdown checklist items from issue body.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body: Issue body text.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of ChecklistItem objects.
|
||||||
|
"""
|
||||||
|
checklist = []
|
||||||
|
if not body:
|
||||||
|
return checklist
|
||||||
|
|
||||||
|
lines = body.split("\n")
|
||||||
|
in_checklist = False
|
||||||
|
current_category = None
|
||||||
|
|
||||||
|
for i, line in enumerate(lines):
|
||||||
|
category_match = re.match(r"^\s*(?:###|##|#)\s+(.+)", line)
|
||||||
|
if category_match:
|
||||||
|
current_category = category_match.group(1)
|
||||||
|
in_checklist = False
|
||||||
|
continue
|
||||||
|
|
||||||
|
checklist_match = re.match(r"^\s*[-*]\s+\[([ xX])\]\s+(.+)$", line)
|
||||||
|
if checklist_match:
|
||||||
|
in_checklist = True
|
||||||
|
checked = checklist_match.group(1).lower() == "x"
|
||||||
|
text = checklist_match.group(2).strip()
|
||||||
|
checklist.append(
|
||||||
|
ChecklistItem(
|
||||||
|
text=text,
|
||||||
|
completed=checked,
|
||||||
|
line_number=i,
|
||||||
|
category=current_category,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return checklist
|
||||||
|
|
||||||
|
def _parse_requirements(self, body: str) -> List[str]:
|
||||||
|
"""Parse requirements from issue body.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body: Issue body text.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of requirement strings.
|
||||||
|
"""
|
||||||
|
requirements = []
|
||||||
|
if not body:
|
||||||
|
return requirements
|
||||||
|
|
||||||
|
lines = body.split("\n")
|
||||||
|
in_requirements_section = False
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
if re.match(r"^##?\s*Requirements\s*$", line, re.IGNORECASE):
|
||||||
|
in_requirements_section = True
|
||||||
|
continue
|
||||||
|
if in_requirements_section:
|
||||||
|
if line.startswith("##"):
|
||||||
|
break
|
||||||
|
req_match = re.match(r"^[-*]\s+(.+)$", line)
|
||||||
|
if req_match:
|
||||||
|
requirements.append(req_match.group(1))
|
||||||
|
|
||||||
|
return requirements
|
||||||
|
|
||||||
|
def _parse_acceptance_criteria(self, body: str) -> List[str]:
|
||||||
|
"""Parse acceptance criteria from issue body.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body: Issue body text.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of acceptance criteria strings.
|
||||||
|
"""
|
||||||
|
criteria = []
|
||||||
|
if not body:
|
||||||
|
return criteria
|
||||||
|
|
||||||
|
lines = body.split("\n")
|
||||||
|
in_criteria_section = False
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
if re.match(r"^##?\s*(Acceptance Criteria|AC)\s*$", line, re.IGNORECASE):
|
||||||
|
in_criteria_section = True
|
||||||
|
continue
|
||||||
|
if in_criteria_section:
|
||||||
|
if line.startswith("##"):
|
||||||
|
break
|
||||||
|
crit_match = re.match(r"^[-*]\s+\[([ xX])\]\s*(.+)$", line)
|
||||||
|
if crit_match:
|
||||||
|
criteria.append(crit_match.group(2).strip())
|
||||||
|
|
||||||
|
return criteria
|
||||||
|
|
||||||
|
def _parse_file_paths(self, body: str) -> List[str]:
|
||||||
|
"""Parse suggested file paths from issue body.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body: Issue body text.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of file path strings.
|
||||||
|
"""
|
||||||
|
files = []
|
||||||
|
if not body:
|
||||||
|
return files
|
||||||
|
|
||||||
|
patterns = [
|
||||||
|
r"`([^`/]+\.(py|js|go|rs|ts|json|yaml|yml|toml))`",
|
||||||
|
r"file:\s*([^\s]+)",
|
||||||
|
r"(src/[^\s]+)",
|
||||||
|
r"(lib/[^\s]+)",
|
||||||
|
]
|
||||||
|
|
||||||
|
for pattern in patterns:
|
||||||
|
matches = re.findall(pattern, body, re.IGNORECASE)
|
||||||
|
files.extend(matches)
|
||||||
|
|
||||||
|
return list(set(files))
|
||||||
|
|
||||||
|
def _parse_directory_paths(self, body: str) -> List[str]:
|
||||||
|
"""Parse suggested directory paths from issue body.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body: Issue body text.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of directory path strings.
|
||||||
|
"""
|
||||||
|
directories = []
|
||||||
|
if not body:
|
||||||
|
return directories
|
||||||
|
|
||||||
|
patterns = [
|
||||||
|
r"directory:\s*([^\s]+)",
|
||||||
|
r"(?:src|lib|tests?|docs?|examples?)/[^\s]*",
|
||||||
|
]
|
||||||
|
|
||||||
|
for pattern in patterns:
|
||||||
|
matches = re.findall(pattern, body, re.IGNORECASE)
|
||||||
|
directories.extend(matches)
|
||||||
|
|
||||||
|
return list(set(directories))
|
||||||
|
|
||||||
|
def detect_language(self, issue_data: IssueData) -> Optional[str]:
|
||||||
|
"""Detect the programming language from issue labels and content.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
issue_data: IssueData object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Detected language string or None.
|
||||||
|
"""
|
||||||
|
labels_lower = [label.lower() for label in issue_data.labels]
|
||||||
|
|
||||||
|
for lang, keywords in self.LABEL_LANGUAGE_MAP.items():
|
||||||
|
if any(kw in labels_lower for kw in keywords):
|
||||||
|
return lang
|
||||||
|
|
||||||
|
body_lower = issue_data.body.lower()
|
||||||
|
for lang, keywords in self.LABEL_LANGUAGE_MAP.items():
|
||||||
|
if any(kw in body_lower for kw in keywords):
|
||||||
|
return lang
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def detect_project_type(self, issue_data: IssueData) -> str:
|
||||||
|
"""Detect project type from issue content.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
issue_data: IssueData object.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Project type string.
|
||||||
|
"""
|
||||||
|
body_lower = issue_data.body.lower()
|
||||||
|
|
||||||
|
if any(kw in body_lower for kw in ["cli", "command", "tool"]):
|
||||||
|
return "cli"
|
||||||
|
if any(kw in body_lower for kw in ["api", "rest", "endpoint"]):
|
||||||
|
return "api"
|
||||||
|
if any(kw in body_lower for kw in ["web", "frontend", "ui"]):
|
||||||
|
return "web"
|
||||||
|
if any(kw in body_lower for kw in ["library", "package", "module"]):
|
||||||
|
return "library"
|
||||||
|
|
||||||
|
return "application"
|
||||||
Reference in New Issue
Block a user