This commit is contained in:
188
parser.py
Normal file
188
parser.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
cURL command parser module.
|
||||
"""
|
||||
|
||||
import re
|
||||
import shlex
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ParsedCurl:
|
||||
url: str = ""
|
||||
method: str = "GET"
|
||||
headers: dict = field(default_factory=dict)
|
||||
data: Optional[str] = None
|
||||
auth: Optional[tuple] = None
|
||||
content_type: Optional[str] = None
|
||||
insecure: bool = False
|
||||
|
||||
|
||||
class CurlParser:
|
||||
def __init__(self, curl_command: str):
|
||||
self.curl_command = curl_command
|
||||
|
||||
def parse(self) -> ParsedCurl:
|
||||
"""Parse a cURL command and return structured data."""
|
||||
result = ParsedCurl()
|
||||
|
||||
command = self._normalize_command()
|
||||
tokens = self._tokenize(command)
|
||||
|
||||
i = 0
|
||||
while i < len(tokens):
|
||||
token = tokens[i]
|
||||
|
||||
if token == 'curl':
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if token.startswith('-'):
|
||||
if token in ['-X', '--request']:
|
||||
i += 1
|
||||
result.method = tokens[i].upper() if i < len(tokens) else 'GET'
|
||||
elif token in ['-H', '--header']:
|
||||
i += 1
|
||||
header = ""
|
||||
if i < len(tokens):
|
||||
header = tokens[i]
|
||||
if header.endswith(':'):
|
||||
i += 1
|
||||
while i < len(tokens) and not tokens[i].startswith('-'):
|
||||
if '://' in tokens[i]:
|
||||
i -= 1
|
||||
break
|
||||
header += ' ' + tokens[i]
|
||||
i += 1
|
||||
if header:
|
||||
self._parse_header(header, result)
|
||||
continue
|
||||
elif token in ['-d', '--data', '--data-raw', '--data-binary']:
|
||||
i += 1
|
||||
result.data = tokens[i] if i < len(tokens) else ''
|
||||
if not result.method or result.method == 'GET':
|
||||
result.method = 'POST'
|
||||
continue
|
||||
elif token in ['-u', '--user']:
|
||||
i += 1
|
||||
if i < len(tokens):
|
||||
auth = tokens[i]
|
||||
if ':' in auth:
|
||||
parts = auth.split(':', 1)
|
||||
result.auth = (parts[0], parts[1])
|
||||
i += 1
|
||||
continue
|
||||
elif token == '--url':
|
||||
i += 1
|
||||
result.url = tokens[i] if i < len(tokens) else ''
|
||||
continue
|
||||
elif token in ['-b', '--cookie', '--cookie-jar']:
|
||||
i += 1
|
||||
cookie = tokens[i]
|
||||
if '=' in cookie:
|
||||
name, value = cookie.split('=', 1)
|
||||
result.headers['Cookie'] = value
|
||||
continue
|
||||
elif token in ['-A', '--user-agent']:
|
||||
i += 1
|
||||
result.headers['User-Agent'] = tokens[i] if i < len(tokens) else ''
|
||||
continue
|
||||
elif token == '--compressed':
|
||||
result.headers['Accept-Encoding'] = 'gzip, deflate'
|
||||
elif token in ['-k', '--insecure']:
|
||||
result.insecure = True
|
||||
i += 1
|
||||
else:
|
||||
if not result.url:
|
||||
result.url = token
|
||||
i += 1
|
||||
|
||||
return result
|
||||
|
||||
def _normalize_command(self) -> str:
|
||||
"""Normalize the cURL command for parsing."""
|
||||
command = self.curl_command.strip()
|
||||
command = re.sub(r'\\\n', '', command)
|
||||
command = re.sub(r'\s+', ' ', command)
|
||||
return command
|
||||
|
||||
def _tokenize(self, command: str) -> list:
|
||||
"""Tokenize the cURL command."""
|
||||
tokens = []
|
||||
current = ""
|
||||
in_quote = False
|
||||
quote_char = None
|
||||
|
||||
i = 0
|
||||
while i < len(command):
|
||||
char = command[i]
|
||||
|
||||
if char in ['"', "'"] and not in_quote:
|
||||
in_quote = True
|
||||
quote_char = char
|
||||
i += 1
|
||||
continue
|
||||
elif char == quote_char and in_quote:
|
||||
in_quote = False
|
||||
quote_char = None
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if in_quote:
|
||||
if char == '\\' and i + 1 < len(command):
|
||||
current += char
|
||||
current += command[i + 1]
|
||||
i += 2
|
||||
continue
|
||||
elif char == ' ':
|
||||
if current:
|
||||
tokens.append(current)
|
||||
current = ""
|
||||
i += 1
|
||||
continue
|
||||
|
||||
if char == ' ' and not in_quote:
|
||||
if current:
|
||||
tokens.append(current)
|
||||
current = ""
|
||||
i += 1
|
||||
continue
|
||||
|
||||
current += char
|
||||
i += 1
|
||||
|
||||
if current:
|
||||
tokens.append(current)
|
||||
|
||||
unescaped_tokens = []
|
||||
for token in tokens:
|
||||
token = token.replace('\\:', ':')
|
||||
token = token.replace('\\"', '"')
|
||||
token = token.replace("\\'", "'")
|
||||
token = token.replace('\\n', '\n')
|
||||
unescaped_tokens.append(token)
|
||||
|
||||
return unescaped_tokens
|
||||
|
||||
def _parse_header(self, header: str, result: ParsedCurl):
|
||||
"""Parse a header string and update the result."""
|
||||
if ':' in header:
|
||||
parts = header.split(':', 1)
|
||||
key = parts[0].strip()
|
||||
value = parts[1].strip()
|
||||
result.headers[key] = value
|
||||
|
||||
if key.lower() == 'content-type':
|
||||
result.content_type = value
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
parser = CurlParser(' '.join(sys.argv[1:]))
|
||||
result = parser.parse()
|
||||
print(f"URL: {result.url}")
|
||||
print(f"Method: {result.method}")
|
||||
print(f"Headers: {result.headers}")
|
||||
print(f"Data: {result.data}")
|
||||
print(f"Auth: {result.auth}")
|
||||
Reference in New Issue
Block a user