191 lines
5.9 KiB
Python
191 lines
5.9 KiB
Python
import shlex
|
|
import re
|
|
from typing import List, Tuple, Optional
|
|
from .models import (
|
|
RequestMethod, Header, Auth, AuthType, RequestData, DataType, ParsedCurl
|
|
)
|
|
|
|
|
|
def curl_tokenize(command: str) -> List[str]:
|
|
"""Tokenize a curl command string into arguments."""
|
|
try:
|
|
tokens = shlex.split(command)
|
|
except ValueError:
|
|
tokens = command.split()
|
|
result = []
|
|
i = 0
|
|
while i < len(tokens):
|
|
token = tokens[i]
|
|
if token.startswith("'"):
|
|
j = i
|
|
while j < len(tokens) and not tokens[j].endswith("'"):
|
|
j += 1
|
|
if j < len(tokens):
|
|
result.append(" ".join(tokens[i:j + 1]).strip("'"))
|
|
i = j + 1
|
|
continue
|
|
result.append(token)
|
|
i += 1
|
|
return result
|
|
|
|
|
|
def parse_curl_command(command: str) -> ParsedCurl:
|
|
"""Parse a curl command and return a ParsedCurl object."""
|
|
tokens = curl_tokenize(command)
|
|
|
|
url = ""
|
|
method = RequestMethod.GET
|
|
headers: List[Header] = []
|
|
auth: Optional[Auth] = None
|
|
data: Optional[RequestData] = None
|
|
cookies: dict = {}
|
|
user_agent: Optional[str] = None
|
|
follow_redirects = True
|
|
timeout: Optional[int] = None
|
|
|
|
i = 0
|
|
while i < len(tokens):
|
|
token = tokens[i]
|
|
|
|
if token in ("-X", "--request"):
|
|
if i + 1 < len(tokens):
|
|
method_map = {
|
|
"GET": RequestMethod.GET,
|
|
"POST": RequestMethod.POST,
|
|
"PUT": RequestMethod.PUT,
|
|
"PATCH": RequestMethod.PATCH,
|
|
"DELETE": RequestMethod.DELETE,
|
|
"HEAD": RequestMethod.HEAD,
|
|
"OPTIONS": RequestMethod.OPTIONS,
|
|
}
|
|
method_str = tokens[i + 1].upper()
|
|
method = method_map.get(method_str, RequestMethod.GET)
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token in ("-H", "--header"):
|
|
if i + 1 < len(tokens):
|
|
header_str = tokens[i + 1]
|
|
if ": " in header_str:
|
|
key, value = header_str.split(": ", 1)
|
|
headers.append(Header(key=key, value=value))
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token in ("-d", "--data", "--data-raw", "--data-binary"):
|
|
if i + 1 < len(tokens):
|
|
data_str = tokens[i + 1]
|
|
if data_str.startswith("{") and data_str.endswith("}"):
|
|
import json
|
|
try:
|
|
json_data = json.loads(data_str)
|
|
data = RequestData(data_type=DataType.JSON, json_data=json_data)
|
|
except json.JSONDecodeError:
|
|
data = RequestData(data_type=DataType.RAW, content=data_str)
|
|
else:
|
|
data = RequestData(data_type=DataType.RAW, content=data_str)
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token == "--data-json":
|
|
if i + 1 < len(tokens):
|
|
import json
|
|
try:
|
|
json_data = json.loads(tokens[i + 1])
|
|
data = RequestData(data_type=DataType.JSON, json_data=json_data)
|
|
except json.JSONDecodeError:
|
|
data = RequestData(data_type=DataType.RAW, content=tokens[i + 1])
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token in ("-u", "--user"):
|
|
if i + 1 < len(tokens):
|
|
auth_str = tokens[i + 1]
|
|
if ":" in auth_str:
|
|
parts = auth_str.split(":", 1)
|
|
auth = Auth(auth_type=AuthType.BASIC, username=parts[0], password=parts[1])
|
|
else:
|
|
auth = Auth(auth_type=AuthType.BASIC, username=auth_str)
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token == "--bearer":
|
|
if i + 1 < len(tokens):
|
|
auth = Auth(auth_type=AuthType.BEARER, token=tokens[i + 1])
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token in ("-b", "--cookie"):
|
|
if i + 1 < len(tokens):
|
|
cookie_str = tokens[i + 1]
|
|
for pair in cookie_str.split(";"):
|
|
if "=" in pair:
|
|
k, v = pair.split("=", 1)
|
|
cookies[k.strip()] = v.strip()
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token in ("-A", "--user-agent"):
|
|
if i + 1 < len(tokens):
|
|
user_agent = tokens[i + 1]
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token == "--url":
|
|
if i + 1 < len(tokens):
|
|
url = tokens[i + 1]
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token == "-L":
|
|
follow_redirects = True
|
|
i += 1
|
|
|
|
elif token == "--connect-timeout":
|
|
if i + 1 < len(tokens):
|
|
try:
|
|
timeout = int(tokens[i + 1])
|
|
except ValueError:
|
|
pass
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
elif token.startswith("http://") or token.startswith("https://"):
|
|
url = token
|
|
i += 1
|
|
|
|
elif not token.startswith("-"):
|
|
if not url:
|
|
url = token
|
|
i += 1
|
|
|
|
else:
|
|
i += 1
|
|
|
|
if auth and auth.get_header():
|
|
auth_header = auth.get_header()
|
|
if auth_header:
|
|
headers.append(auth_header)
|
|
|
|
return ParsedCurl(
|
|
url=url,
|
|
method=method,
|
|
headers=headers,
|
|
auth=auth,
|
|
data=data,
|
|
cookies=cookies,
|
|
user_agent=user_agent,
|
|
follow_redirects=follow_redirects,
|
|
timeout=timeout,
|
|
)
|