Files
curl-to-code-cli/.curl_to_code/parser/curl_parser.py

191 lines
5.9 KiB
Python

import shlex
import re
from typing import List, Tuple, Optional
from .models import (
RequestMethod, Header, Auth, AuthType, RequestData, DataType, ParsedCurl
)
def curl_tokenize(command: str) -> List[str]:
"""Tokenize a curl command string into arguments."""
try:
tokens = shlex.split(command)
except ValueError:
tokens = command.split()
result = []
i = 0
while i < len(tokens):
token = tokens[i]
if token.startswith("'"):
j = i
while j < len(tokens) and not tokens[j].endswith("'"):
j += 1
if j < len(tokens):
result.append(" ".join(tokens[i:j + 1]).strip("'"))
i = j + 1
continue
result.append(token)
i += 1
return result
def parse_curl_command(command: str) -> ParsedCurl:
"""Parse a curl command and return a ParsedCurl object."""
tokens = curl_tokenize(command)
url = ""
method = RequestMethod.GET
headers: List[Header] = []
auth: Optional[Auth] = None
data: Optional[RequestData] = None
cookies: dict = {}
user_agent: Optional[str] = None
follow_redirects = True
timeout: Optional[int] = None
i = 0
while i < len(tokens):
token = tokens[i]
if token in ("-X", "--request"):
if i + 1 < len(tokens):
method_map = {
"GET": RequestMethod.GET,
"POST": RequestMethod.POST,
"PUT": RequestMethod.PUT,
"PATCH": RequestMethod.PATCH,
"DELETE": RequestMethod.DELETE,
"HEAD": RequestMethod.HEAD,
"OPTIONS": RequestMethod.OPTIONS,
}
method_str = tokens[i + 1].upper()
method = method_map.get(method_str, RequestMethod.GET)
i += 2
else:
i += 1
elif token in ("-H", "--header"):
if i + 1 < len(tokens):
header_str = tokens[i + 1]
if ": " in header_str:
key, value = header_str.split(": ", 1)
headers.append(Header(key=key, value=value))
i += 2
else:
i += 1
elif token in ("-d", "--data", "--data-raw", "--data-binary"):
if i + 1 < len(tokens):
data_str = tokens[i + 1]
if data_str.startswith("{") and data_str.endswith("}"):
import json
try:
json_data = json.loads(data_str)
data = RequestData(data_type=DataType.JSON, json_data=json_data)
except json.JSONDecodeError:
data = RequestData(data_type=DataType.RAW, content=data_str)
else:
data = RequestData(data_type=DataType.RAW, content=data_str)
i += 2
else:
i += 1
elif token == "--data-json":
if i + 1 < len(tokens):
import json
try:
json_data = json.loads(tokens[i + 1])
data = RequestData(data_type=DataType.JSON, json_data=json_data)
except json.JSONDecodeError:
data = RequestData(data_type=DataType.RAW, content=tokens[i + 1])
i += 2
else:
i += 1
elif token in ("-u", "--user"):
if i + 1 < len(tokens):
auth_str = tokens[i + 1]
if ":" in auth_str:
parts = auth_str.split(":", 1)
auth = Auth(auth_type=AuthType.BASIC, username=parts[0], password=parts[1])
else:
auth = Auth(auth_type=AuthType.BASIC, username=auth_str)
i += 2
else:
i += 1
elif token == "--bearer":
if i + 1 < len(tokens):
auth = Auth(auth_type=AuthType.BEARER, token=tokens[i + 1])
i += 2
else:
i += 1
elif token in ("-b", "--cookie"):
if i + 1 < len(tokens):
cookie_str = tokens[i + 1]
for pair in cookie_str.split(";"):
if "=" in pair:
k, v = pair.split("=", 1)
cookies[k.strip()] = v.strip()
i += 2
else:
i += 1
elif token in ("-A", "--user-agent"):
if i + 1 < len(tokens):
user_agent = tokens[i + 1]
i += 2
else:
i += 1
elif token == "--url":
if i + 1 < len(tokens):
url = tokens[i + 1]
i += 2
else:
i += 1
elif token == "-L":
follow_redirects = True
i += 1
elif token == "--connect-timeout":
if i + 1 < len(tokens):
try:
timeout = int(tokens[i + 1])
except ValueError:
pass
i += 2
else:
i += 1
elif token.startswith("http://") or token.startswith("https://"):
url = token
i += 1
elif not token.startswith("-"):
if not url:
url = token
i += 1
else:
i += 1
if auth and auth.get_header():
auth_header = auth.get_header()
if auth_header:
headers.append(auth_header)
return ParsedCurl(
url=url,
method=method,
headers=headers,
auth=auth,
data=data,
cookies=cookies,
user_agent=user_agent,
follow_redirects=follow_redirects,
timeout=timeout,
)