diff --git a/src/http_convert/parsers.py b/src/http_convert/parsers.py new file mode 100644 index 0000000..19b98d2 --- /dev/null +++ b/src/http_convert/parsers.py @@ -0,0 +1,369 @@ +import re +from typing import Optional, Dict, List, Tuple +from .models import HTTPRequest, HttpMethod + + +class CurlParser: + METHOD_MAP = { + "-X": "method", + "--request": "method", + } + + HEADER_MAP = { + "-H": "header", + "--header": "header", + } + + DATA_MAP = { + "-d": "data", + "--data": "data", + "--data-raw": "data", + "--data-binary": "data", + } + + URL_MAP = { + "URL": "url", + } + + QUERY_PARAM_MAP = { + "-G": "get_data", + "--get": "get_data", + } + + @staticmethod + def parse(curl_command: str) -> HTTPRequest: + curl_command = curl_command.strip() + + if not curl_command.startswith("curl"): + raise ValueError("Input must be a cURL command") + + tokens = CurlParser._tokenize(curl_command[4:].strip()) + + method = HttpMethod.GET + url = "" + headers: Dict[str, str] = {} + params: Dict[str, str] = {} + body = "" + data_binary = False + + i = 0 + while i < len(tokens): + token = tokens[i] + + if token in ["-X", "--request"]: + if i + 1 < len(tokens): + method = HttpMethod(tokens[i + 1].upper()) + i += 2 + continue + + elif token in ["-H", "--header"]: + if i + 1 < len(tokens): + header_line = tokens[i + 1] + if ": " in header_line: + key, value = header_line.split(": ", 1) + headers[key] = value + i += 2 + continue + + elif token in ["-d", "--data", "--data-raw", "--data-binary"]: + if i + 1 < len(tokens): + body = tokens[i + 1] + data_binary = token == "--data-binary" + i += 2 + continue + + elif token in ["-G", "--get"]: + if i + 1 < len(tokens): + get_data = tokens[i + 1] + if "=" in get_data: + key, value = get_data.split("=", 1) + params[key] = value + i += 2 + continue + + elif token.startswith("http://") or token.startswith("https://"): + url = token + i += 1 + continue + + elif token.startswith("'"): + potential_url = token.strip("'") + if potential_url.startswith("http://") or potential_url.startswith("https://"): + url = potential_url + i += 1 + continue + + elif token.startswith('"'): + potential_url = token.strip('"') + if potential_url.startswith("http://") or potential_url.startswith("https://"): + url = potential_url + i += 1 + continue + + i += 1 + + if not url: + raise ValueError("URL not found in cURL command") + + return HTTPRequest( + method=method, + url=url, + headers=headers, + params=params, + body=body if body else None + ) + + @staticmethod + def _tokenize(command: str) -> List[str]: + tokens = [] + current = "" + in_quotes = False + quote_char = None + i = 0 + + while i < len(command): + char = command[i] + + if not in_quotes and char in ["'", '"']: + in_quotes = True + quote_char = char + current += char + elif in_quotes and char == quote_char: + in_quotes = False + quote_char = None + current += char + elif not in_quotes and char == " ": + if current: + tokens.append(current) + current = "" + else: + current += char + i += 1 + + if current: + tokens.append(current) + + return tokens + + +class HTTPieParser: + @staticmethod + def parse(httpie_command: str) -> HTTPRequest: + httpie_command = httpie_command.strip() + + if not httpie_command.startswith("http"): + raise ValueError("Input must be an HTTPie command") + + parts = httpie_command.split() + if len(parts) < 2: + raise ValueError("Invalid HTTPie command") + + method = HttpMethod.GET + url = "" + headers: Dict[str, str] = {} + params: Dict[str, str] = {} + body = "" + body_json: Optional[Dict[str, str]] = None + + i = 0 + while i < len(parts): + part = parts[i] + + if part.upper() in [m.value for m in HttpMethod]: + method = HttpMethod(part.upper()) + i += 1 + continue + + if part.startswith("http://") or part.startswith("https://"): + url = part + i += 1 + continue + + if ":" in part and not part.startswith(":"): + colon_idx = part.find(":") + key = part[:colon_idx] + value = part[colon_idx + 1:] + headers[key] = value + i += 1 + continue + + if part.startswith(":") and i + 1 < len(parts): + header_key = part[1:] + header_value = parts[i + 1] + headers[header_key] = header_value + i += 2 + continue + + if "==" in part: + key, value = part.split("==", 1) + params[key] = value + i += 1 + continue + + if part.startswith("@"): + body = part[1:] + i += 1 + continue + + if part == ":" or (part == "JSON:" and i + 1 < len(parts)): + if i + 1 < len(parts) and parts[i + 1].startswith("{"): + body_json_str = parts[i + 1] + j = i + 1 + while j < len(parts) and not parts[j].endswith("}"): + j += 1 + if j < len(parts): + body_json_str = " ".join(parts[i + 1:j + 1]) + try: + import json + body_json = json.loads(body_json_str) + except: + body = body_json_str + i = j + 1 + continue + + i += 1 + + if not url: + raise ValueError("URL not found in HTTPie command") + + return HTTPRequest( + method=method, + url=url, + headers=headers, + params=params, + body=body if body else None, + body_json=body_json + ) + + +class FetchParser: + @staticmethod + def parse(fetch_code: str) -> HTTPRequest: + fetch_code = fetch_code.strip() + + if not ("fetch(" in fetch_code or "fetch (" in fetch_code): + raise ValueError("Input must be a fetch() call") + + method = HttpMethod.GET + url = "" + headers: Dict[str, str] = {} + params: Dict[str, str] = {} + body = "" + + url_match = re.search(r'fetch\s*\(\s*["']([^"']+)["']', fetch_code) + if not url_match: + url_match = re.search(r'fetch\s*\(\s*`([^`]+)`', fetch_code) + if not url_match: + raise ValueError("URL not found in fetch call") + + url = url_match.group(1) + + method_match = re.search(r'method\s*:\s*["']([^"']+)["']', fetch_code, re.IGNORECASE) + if not method_match: + method_match = re.search(r'\.method\s*=\s*["']([^"']+)["']', fetch_code, re.IGNORECASE) + if method_match: + method = HttpMethod(method_match.group(1).upper()) + + headers_section_match = re.search(r'headers\s*:\s*\{([^}]*)\}', fetch_code, re.DOTALL) + if headers_section_match: + headers_str = headers_section_match.group(1) + header_matches = re.findall(r'["']?(\w+)["']?\s*:\s*["']([^"']+)["']', headers_str) + for key, value in header_matches: + headers[key] = value + + body_match = re.search(r'body\s*:\s*["']([^"']+)["']', fetch_code) + if not body_match: + body_match = re.search(r'\.body\s*=\s*["']([^"']+)["']', fetch_code) + if body_match: + body = body_match.group(1) + + return HTTPRequest( + method=method, + url=url, + headers=headers, + params=params, + body=body if body else None + ) + + +class AxiosParser: + @staticmethod + def parse(axios_code: str) -> HTTPRequest: + axios_code = axios_code.strip() + + if not ("axios(" in axios_code or "axios.request(" in axios_code): + raise ValueError("Input must be an axios call") + + method = HttpMethod.GET + url = "" + headers: Dict[str, str] = {} + params: Dict[str, str] = {} + body = "" + body_json: Optional[Dict[str, str]] = None + + url_match = re.search(r'url\s*:\s*["']([^"']+)["']', axios_code) + if not url_match: + url_match = re.search(r'["'](https?://[^"']+)["']', axios_code) + if url_match: + url = url_match.group(1) + + method_match = re.search(r'method\s*:\s*["']([^"']+)["']', axios_code, re.IGNORECASE) + if method_match: + method = HttpMethod(method_match.group(1).upper()) + + headers_match = re.search(r'headers\s*:\s*\{([^}]*)\}', axios_code, re.DOTALL) + if headers_match: + headers_str = headers_match.group(1) + header_matches = re.findall(r'["']?(\w+)["']?\s*:\s*["']([^"']+)["']', headers_str) + for key, value in header_matches: + headers[key] = value + + data_match = re.search(r'data\s*:\s*(\{[^}]+\})', axios_code, re.DOTALL) + if data_match: + body_json_str = data_match.group(1) + try: + import json + body_json = json.loads(body_json_str.replace("'", '"')) + except: + body = body_json_str + + return HTTPRequest( + method=method, + url=url, + headers=headers, + params=params, + body=body if body else None, + body_json=body_json + ) + + +class Parser: + @staticmethod + def parse(input_str: str, format: str) -> HTTPRequest: + format_lower = format.lower() + + if format_lower == "curl": + return CurlParser.parse(input_str) + elif format_lower == "httpie": + return HTTPieParser.parse(input_str) + elif format_lower == "fetch": + return FetchParser.parse(input_str) + elif format_lower == "axios": + return AxiosParser.parse(input_str) + else: + raise ValueError(f"Unsupported format: {format}") + + @staticmethod + def detect_format(input_str: str) -> str: + input_str = input_str.strip() + + if input_str.startswith("curl"): + return "curl" + elif input_str.startswith("http"): + return "httpie" + elif "fetch(" in input_str or "fetch (" in input_str: + return "fetch" + elif "axios(" in input_str or "axios.request(" in input_str: + return "axios" + else: + raise ValueError("Could not detect input format")