This commit is contained in:
369
src/http_convert/parsers.py
Normal file
369
src/http_convert/parsers.py
Normal file
@@ -0,0 +1,369 @@
|
||||
import re
|
||||
from typing import Optional, Dict, List, Tuple
|
||||
from .models import HTTPRequest, HttpMethod
|
||||
|
||||
|
||||
class CurlParser:
|
||||
METHOD_MAP = {
|
||||
"-X": "method",
|
||||
"--request": "method",
|
||||
}
|
||||
|
||||
HEADER_MAP = {
|
||||
"-H": "header",
|
||||
"--header": "header",
|
||||
}
|
||||
|
||||
DATA_MAP = {
|
||||
"-d": "data",
|
||||
"--data": "data",
|
||||
"--data-raw": "data",
|
||||
"--data-binary": "data",
|
||||
}
|
||||
|
||||
URL_MAP = {
|
||||
"URL": "url",
|
||||
}
|
||||
|
||||
QUERY_PARAM_MAP = {
|
||||
"-G": "get_data",
|
||||
"--get": "get_data",
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def parse(curl_command: str) -> HTTPRequest:
|
||||
curl_command = curl_command.strip()
|
||||
|
||||
if not curl_command.startswith("curl"):
|
||||
raise ValueError("Input must be a cURL command")
|
||||
|
||||
tokens = CurlParser._tokenize(curl_command[4:].strip())
|
||||
|
||||
method = HttpMethod.GET
|
||||
url = ""
|
||||
headers: Dict[str, str] = {}
|
||||
params: Dict[str, str] = {}
|
||||
body = ""
|
||||
data_binary = False
|
||||
|
||||
i = 0
|
||||
while i < len(tokens):
|
||||
token = tokens[i]
|
||||
|
||||
if token in ["-X", "--request"]:
|
||||
if i + 1 < len(tokens):
|
||||
method = HttpMethod(tokens[i + 1].upper())
|
||||
i += 2
|
||||
continue
|
||||
|
||||
elif token in ["-H", "--header"]:
|
||||
if i + 1 < len(tokens):
|
||||
header_line = tokens[i + 1]
|
||||
if ": " in header_line:
|
||||
key, value = header_line.split(": ", 1)
|
||||
headers[key] = value
|
||||
i += 2
|
||||
continue
|
||||
|
||||
elif token in ["-d", "--data", "--data-raw", "--data-binary"]:
|
||||
if i + 1 < len(tokens):
|
||||
body = tokens[i + 1]
|
||||
data_binary = token == "--data-binary"
|
||||
i += 2
|
||||
continue
|
||||
|
||||
elif token in ["-G", "--get"]:
|
||||
if i + 1 < len(tokens):
|
||||
get_data = tokens[i + 1]
|
||||
if "=" in get_data:
|
||||
key, value = get_data.split("=", 1)
|
||||
params[key] = value
|
||||
i += 2
|
||||
continue
|
||||
|
||||
elif token.startswith("http://") or token.startswith("https://"):
|
||||
url = token
|
||||
i += 1
|
||||
continue
|
||||
|
||||
elif token.startswith("'"):
|
||||
potential_url = token.strip("'")
|
||||
if potential_url.startswith("http://") or potential_url.startswith("https://"):
|
||||
url = potential_url
|
||||
i += 1
|
||||
continue
|
||||
|
||||
elif token.startswith('"'):
|
||||
potential_url = token.strip('"')
|
||||
if potential_url.startswith("http://") or potential_url.startswith("https://"):
|
||||
url = potential_url
|
||||
i += 1
|
||||
continue
|
||||
|
||||
i += 1
|
||||
|
||||
if not url:
|
||||
raise ValueError("URL not found in cURL command")
|
||||
|
||||
return HTTPRequest(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
body=body if body else None
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _tokenize(command: str) -> List[str]:
|
||||
tokens = []
|
||||
current = ""
|
||||
in_quotes = False
|
||||
quote_char = None
|
||||
i = 0
|
||||
|
||||
while i < len(command):
|
||||
char = command[i]
|
||||
|
||||
if not in_quotes and char in ["'", '"']:
|
||||
in_quotes = True
|
||||
quote_char = char
|
||||
current += char
|
||||
elif in_quotes and char == quote_char:
|
||||
in_quotes = False
|
||||
quote_char = None
|
||||
current += char
|
||||
elif not in_quotes and char == " ":
|
||||
if current:
|
||||
tokens.append(current)
|
||||
current = ""
|
||||
else:
|
||||
current += char
|
||||
i += 1
|
||||
|
||||
if current:
|
||||
tokens.append(current)
|
||||
|
||||
return tokens
|
||||
|
||||
|
||||
class HTTPieParser:
|
||||
@staticmethod
|
||||
def parse(httpie_command: str) -> HTTPRequest:
|
||||
httpie_command = httpie_command.strip()
|
||||
|
||||
if not httpie_command.startswith("http"):
|
||||
raise ValueError("Input must be an HTTPie command")
|
||||
|
||||
parts = httpie_command.split()
|
||||
if len(parts) < 2:
|
||||
raise ValueError("Invalid HTTPie command")
|
||||
|
||||
method = HttpMethod.GET
|
||||
url = ""
|
||||
headers: Dict[str, str] = {}
|
||||
params: Dict[str, str] = {}
|
||||
body = ""
|
||||
body_json: Optional[Dict[str, str]] = None
|
||||
|
||||
i = 0
|
||||
while i < len(parts):
|
||||
part = parts[i]
|
||||
|
||||
if part.upper() in [m.value for m in HttpMethod]:
|
||||
method = HttpMethod(part.upper())
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if part.startswith("http://") or part.startswith("https://"):
|
||||
url = part
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if ":" in part and not part.startswith(":"):
|
||||
colon_idx = part.find(":")
|
||||
key = part[:colon_idx]
|
||||
value = part[colon_idx + 1:]
|
||||
headers[key] = value
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if part.startswith(":") and i + 1 < len(parts):
|
||||
header_key = part[1:]
|
||||
header_value = parts[i + 1]
|
||||
headers[header_key] = header_value
|
||||
i += 2
|
||||
continue
|
||||
|
||||
if "==" in part:
|
||||
key, value = part.split("==", 1)
|
||||
params[key] = value
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if part.startswith("@"):
|
||||
body = part[1:]
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if part == ":" or (part == "JSON:" and i + 1 < len(parts)):
|
||||
if i + 1 < len(parts) and parts[i + 1].startswith("{"):
|
||||
body_json_str = parts[i + 1]
|
||||
j = i + 1
|
||||
while j < len(parts) and not parts[j].endswith("}"):
|
||||
j += 1
|
||||
if j < len(parts):
|
||||
body_json_str = " ".join(parts[i + 1:j + 1])
|
||||
try:
|
||||
import json
|
||||
body_json = json.loads(body_json_str)
|
||||
except:
|
||||
body = body_json_str
|
||||
i = j + 1
|
||||
continue
|
||||
|
||||
i += 1
|
||||
|
||||
if not url:
|
||||
raise ValueError("URL not found in HTTPie command")
|
||||
|
||||
return HTTPRequest(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
body=body if body else None,
|
||||
body_json=body_json
|
||||
)
|
||||
|
||||
|
||||
class FetchParser:
|
||||
@staticmethod
|
||||
def parse(fetch_code: str) -> HTTPRequest:
|
||||
fetch_code = fetch_code.strip()
|
||||
|
||||
if not ("fetch(" in fetch_code or "fetch (" in fetch_code):
|
||||
raise ValueError("Input must be a fetch() call")
|
||||
|
||||
method = HttpMethod.GET
|
||||
url = ""
|
||||
headers: Dict[str, str] = {}
|
||||
params: Dict[str, str] = {}
|
||||
body = ""
|
||||
|
||||
url_match = re.search(r'fetch\s*\(\s*["']([^"']+)["']', fetch_code)
|
||||
if not url_match:
|
||||
url_match = re.search(r'fetch\s*\(\s*`([^`]+)`', fetch_code)
|
||||
if not url_match:
|
||||
raise ValueError("URL not found in fetch call")
|
||||
|
||||
url = url_match.group(1)
|
||||
|
||||
method_match = re.search(r'method\s*:\s*["']([^"']+)["']', fetch_code, re.IGNORECASE)
|
||||
if not method_match:
|
||||
method_match = re.search(r'\.method\s*=\s*["']([^"']+)["']', fetch_code, re.IGNORECASE)
|
||||
if method_match:
|
||||
method = HttpMethod(method_match.group(1).upper())
|
||||
|
||||
headers_section_match = re.search(r'headers\s*:\s*\{([^}]*)\}', fetch_code, re.DOTALL)
|
||||
if headers_section_match:
|
||||
headers_str = headers_section_match.group(1)
|
||||
header_matches = re.findall(r'["']?(\w+)["']?\s*:\s*["']([^"']+)["']', headers_str)
|
||||
for key, value in header_matches:
|
||||
headers[key] = value
|
||||
|
||||
body_match = re.search(r'body\s*:\s*["']([^"']+)["']', fetch_code)
|
||||
if not body_match:
|
||||
body_match = re.search(r'\.body\s*=\s*["']([^"']+)["']', fetch_code)
|
||||
if body_match:
|
||||
body = body_match.group(1)
|
||||
|
||||
return HTTPRequest(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
body=body if body else None
|
||||
)
|
||||
|
||||
|
||||
class AxiosParser:
|
||||
@staticmethod
|
||||
def parse(axios_code: str) -> HTTPRequest:
|
||||
axios_code = axios_code.strip()
|
||||
|
||||
if not ("axios(" in axios_code or "axios.request(" in axios_code):
|
||||
raise ValueError("Input must be an axios call")
|
||||
|
||||
method = HttpMethod.GET
|
||||
url = ""
|
||||
headers: Dict[str, str] = {}
|
||||
params: Dict[str, str] = {}
|
||||
body = ""
|
||||
body_json: Optional[Dict[str, str]] = None
|
||||
|
||||
url_match = re.search(r'url\s*:\s*["']([^"']+)["']', axios_code)
|
||||
if not url_match:
|
||||
url_match = re.search(r'["'](https?://[^"']+)["']', axios_code)
|
||||
if url_match:
|
||||
url = url_match.group(1)
|
||||
|
||||
method_match = re.search(r'method\s*:\s*["']([^"']+)["']', axios_code, re.IGNORECASE)
|
||||
if method_match:
|
||||
method = HttpMethod(method_match.group(1).upper())
|
||||
|
||||
headers_match = re.search(r'headers\s*:\s*\{([^}]*)\}', axios_code, re.DOTALL)
|
||||
if headers_match:
|
||||
headers_str = headers_match.group(1)
|
||||
header_matches = re.findall(r'["']?(\w+)["']?\s*:\s*["']([^"']+)["']', headers_str)
|
||||
for key, value in header_matches:
|
||||
headers[key] = value
|
||||
|
||||
data_match = re.search(r'data\s*:\s*(\{[^}]+\})', axios_code, re.DOTALL)
|
||||
if data_match:
|
||||
body_json_str = data_match.group(1)
|
||||
try:
|
||||
import json
|
||||
body_json = json.loads(body_json_str.replace("'", '"'))
|
||||
except:
|
||||
body = body_json_str
|
||||
|
||||
return HTTPRequest(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
body=body if body else None,
|
||||
body_json=body_json
|
||||
)
|
||||
|
||||
|
||||
class Parser:
|
||||
@staticmethod
|
||||
def parse(input_str: str, format: str) -> HTTPRequest:
|
||||
format_lower = format.lower()
|
||||
|
||||
if format_lower == "curl":
|
||||
return CurlParser.parse(input_str)
|
||||
elif format_lower == "httpie":
|
||||
return HTTPieParser.parse(input_str)
|
||||
elif format_lower == "fetch":
|
||||
return FetchParser.parse(input_str)
|
||||
elif format_lower == "axios":
|
||||
return AxiosParser.parse(input_str)
|
||||
else:
|
||||
raise ValueError(f"Unsupported format: {format}")
|
||||
|
||||
@staticmethod
|
||||
def detect_format(input_str: str) -> str:
|
||||
input_str = input_str.strip()
|
||||
|
||||
if input_str.startswith("curl"):
|
||||
return "curl"
|
||||
elif input_str.startswith("http"):
|
||||
return "httpie"
|
||||
elif "fetch(" in input_str or "fetch (" in input_str:
|
||||
return "fetch"
|
||||
elif "axios(" in input_str or "axios.request(" in input_str:
|
||||
return "axios"
|
||||
else:
|
||||
raise ValueError("Could not detect input format")
|
||||
Reference in New Issue
Block a user