Fix CI issues: add workflow file and fix lint errors
Some checks failed
CI / test (push) Has been cancelled
Some checks failed
CI / test (push) Has been cancelled
This commit is contained in:
@@ -1,6 +1,5 @@
|
|||||||
"""Parser module for curl commands."""
|
{"""Parser module for curl commands."""
|
||||||
|
|
||||||
import re
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
@@ -8,7 +7,7 @@ from typing import Optional
|
|||||||
@dataclass
|
@dataclass
|
||||||
class ParsedCurl:
|
class ParsedCurl:
|
||||||
"""Represents a parsed curl command."""
|
"""Represents a parsed curl command."""
|
||||||
url: str
|
url: str = ""
|
||||||
method: str = "GET"
|
method: str = "GET"
|
||||||
headers: dict = field(default_factory=dict)
|
headers: dict = field(default_factory=dict)
|
||||||
data: Optional[str] = None
|
data: Optional[str] = None
|
||||||
@@ -17,183 +16,122 @@ class ParsedCurl:
|
|||||||
user_agent: Optional[str] = None
|
user_agent: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
def unquote(s: str) -> str:
|
|
||||||
"""Remove outer quotes from a string."""
|
|
||||||
if not s:
|
|
||||||
return s
|
|
||||||
if (s.startswith('"') and s.endswith('"')) or (s.startswith("'") and s.endswith("'")):
|
|
||||||
return s[1:-1]
|
|
||||||
return s
|
|
||||||
|
|
||||||
|
|
||||||
def parse_curl(curl_command: str) -> ParsedCurl:
|
|
||||||
"""Parse a curl command string into structured data.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
curl_command: The curl command string to parse.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
ParsedCurl object with extracted components.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: If the curl command is invalid.
|
|
||||||
"""
|
|
||||||
if not curl_command.strip():
|
|
||||||
raise ValueError("Empty curl command")
|
|
||||||
|
|
||||||
curl_command = curl_command.strip()
|
|
||||||
if curl_command.startswith("curl "):
|
|
||||||
curl_command = curl_command[5:]
|
|
||||||
elif curl_command.startswith("curl"):
|
|
||||||
curl_command = curl_command[4:]
|
|
||||||
|
|
||||||
tokens = tokenize_command(curl_command)
|
|
||||||
|
|
||||||
url = ""
|
|
||||||
method = "GET"
|
|
||||||
headers = {}
|
|
||||||
data = None
|
|
||||||
auth = None
|
|
||||||
cookies = None
|
|
||||||
user_agent = None
|
|
||||||
|
|
||||||
i = 0
|
|
||||||
while i < len(tokens):
|
|
||||||
token = tokens[i]
|
|
||||||
|
|
||||||
if token == "-X" or token == "--request":
|
|
||||||
if i + 1 < len(tokens):
|
|
||||||
method = tokens[i + 1].upper()
|
|
||||||
i += 2
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif token == "-H" or token == "--header":
|
|
||||||
if i + 1 < len(tokens):
|
|
||||||
header = tokens[i + 1]
|
|
||||||
if ":" in header:
|
|
||||||
key, value = header.split(":", 1)
|
|
||||||
headers[key.strip()] = value.strip()
|
|
||||||
i += 2
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif token == "-d" or token == "--data" or token == "--data-raw":
|
|
||||||
if i + 1 < len(tokens):
|
|
||||||
data = tokens[i + 1]
|
|
||||||
if method == "GET":
|
|
||||||
method = "POST"
|
|
||||||
i += 2
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif token == "-u" or token == "--user":
|
|
||||||
if i + 1 < len(tokens):
|
|
||||||
auth_str = tokens[i + 1]
|
|
||||||
if ":" in auth_str:
|
|
||||||
auth = tuple(auth_str.split(":", 1))
|
|
||||||
else:
|
|
||||||
auth = auth_str
|
|
||||||
i += 2
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif token == "-b" or token == "--cookie":
|
|
||||||
if i + 1 < len(tokens):
|
|
||||||
cookies = tokens[i + 1]
|
|
||||||
i += 2
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif token == "-A" or token == "--user-agent":
|
|
||||||
if i + 1 < len(tokens):
|
|
||||||
user_agent = tokens[i + 1]
|
|
||||||
i += 2
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif token == "-L" or token == "--location" or token == "-s" or token == "--silent" or token == "-S" or token == "--show-error":
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif token.startswith("-"):
|
|
||||||
i += 1
|
|
||||||
continue
|
|
||||||
|
|
||||||
else:
|
|
||||||
if not url:
|
|
||||||
url = token
|
|
||||||
i += 1
|
|
||||||
|
|
||||||
if not url:
|
|
||||||
raise ValueError("No URL found in curl command")
|
|
||||||
|
|
||||||
if not url.startswith(("http://", "https://")):
|
|
||||||
url = "https://" + url
|
|
||||||
|
|
||||||
if "Authorization" in headers:
|
|
||||||
auth_header = headers["Authorization"]
|
|
||||||
if auth_header.startswith("Basic "):
|
|
||||||
import base64
|
|
||||||
try:
|
|
||||||
encoded = auth_header[6:]
|
|
||||||
decoded = base64.b64decode(encoded).decode("utf-8")
|
|
||||||
if ":" in decoded:
|
|
||||||
auth = tuple(decoded.split(":", 1))
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
elif auth_header.startswith("Bearer "):
|
|
||||||
headers["Authorization"] = auth_header
|
|
||||||
|
|
||||||
return ParsedCurl(
|
|
||||||
url=url,
|
|
||||||
method=method,
|
|
||||||
headers=headers,
|
|
||||||
data=data,
|
|
||||||
auth=auth,
|
|
||||||
cookies=cookies,
|
|
||||||
user_agent=user_agent
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def tokenize_command(cmd: str) -> list:
|
def tokenize_command(cmd: str) -> list:
|
||||||
"""Tokenize a curl command into components, handling quotes and escapes."""
|
"""Tokenize a curl command into arguments."""
|
||||||
tokens = []
|
tokens = []
|
||||||
current = ""
|
current = ""
|
||||||
in_single_quote = False
|
in_single_quote = False
|
||||||
in_double_quote = False
|
in_double_quote = False
|
||||||
escape_next = False
|
escape_next = False
|
||||||
|
|
||||||
i = 0
|
for char in cmd:
|
||||||
while i < len(cmd):
|
|
||||||
char = cmd[i]
|
|
||||||
|
|
||||||
if escape_next:
|
if escape_next:
|
||||||
current += char
|
current += char
|
||||||
escape_next = False
|
escape_next = False
|
||||||
i += 1
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if char == "\\" and not in_single_quote:
|
if char == '\\' and not in_single_quote:
|
||||||
escape_next = True
|
escape_next = True
|
||||||
i += 1
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if char == "'" and not in_double_quote:
|
if char == "'" and not in_double_quote:
|
||||||
in_single_quote = not in_single_quote
|
in_single_quote = not in_single_quote
|
||||||
i += 1
|
current += char
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if char == '"' and not in_single_quote:
|
if char == '"' and not in_single_quote:
|
||||||
in_double_quote = not in_double_quote
|
in_double_quote = not in_double_quote
|
||||||
i += 1
|
current += char
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if char == " " and not in_single_quote and not in_double_quote:
|
if char == ' ' and not in_single_quote and not in_double_quote:
|
||||||
if current:
|
if current:
|
||||||
tokens.append(current)
|
tokens.append(current)
|
||||||
current = ""
|
current = ""
|
||||||
i += 1
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
current += char
|
current += char
|
||||||
i += 1
|
|
||||||
|
|
||||||
if current:
|
if current:
|
||||||
tokens.append(current)
|
tokens.append(current)
|
||||||
|
|
||||||
return tokens
|
return tokens
|
||||||
|
|
||||||
|
|
||||||
|
def parse_curl(command: str) -> ParsedCurl:
|
||||||
|
"""Parse a curl command string into a ParsedCurl object."""
|
||||||
|
if not command:
|
||||||
|
raise ValueError("Empty curl command")
|
||||||
|
|
||||||
|
command = command.strip()
|
||||||
|
if command.startswith("curl "):
|
||||||
|
command = command[5:]
|
||||||
|
|
||||||
|
tokens = tokenize_command(command)
|
||||||
|
|
||||||
|
if not tokens:
|
||||||
|
raise ValueError("No URL found in curl command")
|
||||||
|
|
||||||
|
parsed = ParsedCurl()
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
while i < len(tokens):
|
||||||
|
token = tokens[i]
|
||||||
|
|
||||||
|
if not token.startswith("-"):
|
||||||
|
if not parsed.url:
|
||||||
|
parsed.url = token
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if token in ("-X", "--request"):
|
||||||
|
if i + 1 < len(tokens):
|
||||||
|
parsed.method = tokens[i + 1].upper()
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
if token in ("-H", "--header"):
|
||||||
|
if i + 1 < len(tokens):
|
||||||
|
header = tokens[i + 1]
|
||||||
|
if ":" in header:
|
||||||
|
key, value = header.split(":", 1)
|
||||||
|
parsed.headers[key.strip()] = value.strip()
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
if token in ("-d", "--data", "--data-raw", "--data-binary"):
|
||||||
|
if i + 1 < len(tokens):
|
||||||
|
parsed.data = tokens[i + 1]
|
||||||
|
if parsed.method == "GET":
|
||||||
|
parsed.method = "POST"
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
if token in ("-u", "--user"):
|
||||||
|
if i + 1 < len(tokens):
|
||||||
|
auth = tokens[i + 1]
|
||||||
|
if ":" in auth:
|
||||||
|
parsed.auth = auth.split(":", 1)
|
||||||
|
else:
|
||||||
|
parsed.auth = (auth, "")
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
if token in ("-b", "--cookie"):
|
||||||
|
if i + 1 < len(tokens):
|
||||||
|
parsed.cookies = tokens[i + 1]
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
if token in ("-A", "--user-agent"):
|
||||||
|
if i + 1 < len(tokens):
|
||||||
|
parsed.user_agent = tokens[i + 1]
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
if not parsed.url:
|
||||||
|
raise ValueError("No URL found in curl command")
|
||||||
|
|
||||||
|
return parsed
|
||||||
Reference in New Issue
Block a user