diff --git a/.curl_to_code/parser/curl_parser.py b/.curl_to_code/parser/curl_parser.py new file mode 100644 index 0000000..23c4c43 --- /dev/null +++ b/.curl_to_code/parser/curl_parser.py @@ -0,0 +1,190 @@ +import shlex +import re +from typing import List, Tuple, Optional +from .models import ( + RequestMethod, Header, Auth, AuthType, RequestData, DataType, ParsedCurl +) + + +def curl_tokenize(command: str) -> List[str]: + """Tokenize a curl command string into arguments.""" + try: + tokens = shlex.split(command) + except ValueError: + tokens = command.split() + result = [] + i = 0 + while i < len(tokens): + token = tokens[i] + if token.startswith("'"): + j = i + while j < len(tokens) and not tokens[j].endswith("'"): + j += 1 + if j < len(tokens): + result.append(" ".join(tokens[i:j + 1]).strip("'")) + i = j + 1 + continue + result.append(token) + i += 1 + return result + + +def parse_curl_command(command: str) -> ParsedCurl: + """Parse a curl command and return a ParsedCurl object.""" + tokens = curl_tokenize(command) + + url = "" + method = RequestMethod.GET + headers: List[Header] = [] + auth: Optional[Auth] = None + data: Optional[RequestData] = None + cookies: dict = {} + user_agent: Optional[str] = None + follow_redirects = True + timeout: Optional[int] = None + + i = 0 + while i < len(tokens): + token = tokens[i] + + if token in ("-X", "--request"): + if i + 1 < len(tokens): + method_map = { + "GET": RequestMethod.GET, + "POST": RequestMethod.POST, + "PUT": RequestMethod.PUT, + "PATCH": RequestMethod.PATCH, + "DELETE": RequestMethod.DELETE, + "HEAD": RequestMethod.HEAD, + "OPTIONS": RequestMethod.OPTIONS, + } + method_str = tokens[i + 1].upper() + method = method_map.get(method_str, RequestMethod.GET) + i += 2 + else: + i += 1 + + elif token in ("-H", "--header"): + if i + 1 < len(tokens): + header_str = tokens[i + 1] + if ": " in header_str: + key, value = header_str.split(": ", 1) + headers.append(Header(key=key, value=value)) + i += 2 + else: + i += 1 + + elif token in ("-d", "--data", "--data-raw", "--data-binary"): + if i + 1 < len(tokens): + data_str = tokens[i + 1] + if data_str.startswith("{") and data_str.endswith("}"): + import json + try: + json_data = json.loads(data_str) + data = RequestData(data_type=DataType.JSON, json_data=json_data) + except json.JSONDecodeError: + data = RequestData(data_type=DataType.RAW, content=data_str) + else: + data = RequestData(data_type=DataType.RAW, content=data_str) + i += 2 + else: + i += 1 + + elif token == "--data-json": + if i + 1 < len(tokens): + import json + try: + json_data = json.loads(tokens[i + 1]) + data = RequestData(data_type=DataType.JSON, json_data=json_data) + except json.JSONDecodeError: + data = RequestData(data_type=DataType.RAW, content=tokens[i + 1]) + i += 2 + else: + i += 1 + + elif token in ("-u", "--user"): + if i + 1 < len(tokens): + auth_str = tokens[i + 1] + if ":" in auth_str: + parts = auth_str.split(":", 1) + auth = Auth(auth_type=AuthType.BASIC, username=parts[0], password=parts[1]) + else: + auth = Auth(auth_type=AuthType.BASIC, username=auth_str) + i += 2 + else: + i += 1 + + elif token == "--bearer": + if i + 1 < len(tokens): + auth = Auth(auth_type=AuthType.BEARER, token=tokens[i + 1]) + i += 2 + else: + i += 1 + + elif token in ("-b", "--cookie"): + if i + 1 < len(tokens): + cookie_str = tokens[i + 1] + for pair in cookie_str.split(";"): + if "=" in pair: + k, v = pair.split("=", 1) + cookies[k.strip()] = v.strip() + i += 2 + else: + i += 1 + + elif token in ("-A", "--user-agent"): + if i + 1 < len(tokens): + user_agent = tokens[i + 1] + i += 2 + else: + i += 1 + + elif token == "--url": + if i + 1 < len(tokens): + url = tokens[i + 1] + i += 2 + else: + i += 1 + + elif token == "-L": + follow_redirects = True + i += 1 + + elif token == "--connect-timeout": + if i + 1 < len(tokens): + try: + timeout = int(tokens[i + 1]) + except ValueError: + pass + i += 2 + else: + i += 1 + + elif token.startswith("http://") or token.startswith("https://"): + url = token + i += 1 + + elif not token.startswith("-"): + if not url: + url = token + i += 1 + + else: + i += 1 + + if auth and auth.get_header(): + auth_header = auth.get_header() + if auth_header: + headers.append(auth_header) + + return ParsedCurl( + url=url, + method=method, + headers=headers, + auth=auth, + data=data, + cookies=cookies, + user_agent=user_agent, + follow_redirects=follow_redirects, + timeout=timeout, + )