141 lines
4.3 KiB
Python
141 lines
4.3 KiB
Python
import re
|
|
import csv
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from .patterns import detect_pattern_type
|
|
|
|
|
|
def parse_table(text: str) -> List[Dict[str, str]]:
|
|
"""Parse tabulated CLI output into list of dictionaries."""
|
|
lines = text.strip().split('\n')
|
|
if not lines:
|
|
return []
|
|
|
|
headers: List[str] = []
|
|
rows: List[Dict[str, str]] = []
|
|
|
|
separator_pattern = re.compile(r'^[\s]*[+|][-+|]+[+.]$|^[\s]*[+|]([-+|]+)[+|].*$')
|
|
space_separator_pattern = re.compile(r'^[\s]*[-=]{10,}[\s]*$')
|
|
|
|
has_pipe = '|' in text
|
|
|
|
for line in lines:
|
|
line = line.rstrip('\n')
|
|
if separator_pattern.match(line):
|
|
continue
|
|
|
|
if has_pipe:
|
|
if '|' in line:
|
|
parts = [p.strip() for p in line.split('|')]
|
|
parts = [p for p in parts if p]
|
|
|
|
if not headers:
|
|
if len(parts) >= 2:
|
|
headers = parts
|
|
else:
|
|
row_data = {}
|
|
for i, header in enumerate(headers):
|
|
if i < len(parts):
|
|
row_data[header] = parts[i]
|
|
else:
|
|
row_data[header] = ''
|
|
rows.append(row_data)
|
|
else:
|
|
if space_separator_pattern.match(line):
|
|
continue
|
|
|
|
if not headers:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
headers = parts
|
|
else:
|
|
parts = line.split()
|
|
row_data = {}
|
|
for i, header in enumerate(headers):
|
|
if i < len(parts):
|
|
row_data[header] = parts[i]
|
|
else:
|
|
row_data[header] = ''
|
|
rows.append(row_data)
|
|
|
|
return rows
|
|
|
|
|
|
def parse_key_value(text: str, delimiter: str = ':') -> List[Dict[str, str]]:
|
|
"""Parse key-value pair output into dictionary."""
|
|
result = {}
|
|
|
|
if delimiter == ':':
|
|
pattern = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_\-\.]*)\s*:\s*(.+)$', re.MULTILINE)
|
|
else:
|
|
pattern = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_\-\.]*)\s*=\s*(.+)$', re.MULTILINE)
|
|
|
|
for match in pattern.finditer(text):
|
|
key = match.group(1).strip()
|
|
value = match.group(2).strip()
|
|
result[key] = value
|
|
|
|
return [result] if result else []
|
|
|
|
|
|
def parse_delimited(text: str, delimiter: str = ',') -> List[Dict[str, str]]:
|
|
"""Parse delimited text (CSV, TSV, etc.) into list of dictionaries."""
|
|
lines = text.strip().split('\n')
|
|
if not lines:
|
|
return []
|
|
|
|
reader = csv.DictReader(lines, delimiter=delimiter)
|
|
rows = [row for row in reader]
|
|
|
|
return rows
|
|
|
|
|
|
def parse_raw(text: str) -> List[Dict[str, str]]:
|
|
"""Fallback parser for unrecognized formats."""
|
|
lines = text.strip().split('\n')
|
|
if not lines:
|
|
return []
|
|
|
|
result = []
|
|
for i, line in enumerate(lines):
|
|
if line.strip():
|
|
result.append({'line': i + 1, 'content': line.strip()})
|
|
|
|
return result
|
|
|
|
|
|
def parse_text(text: str, pattern_type: Optional[str] = None) -> Tuple[List[Dict[str, str]], str]:
|
|
"""
|
|
Parse unstructured CLI output into structured format.
|
|
|
|
Args:
|
|
text: The unstructured CLI output text
|
|
pattern_type: Optional explicit pattern type, auto-detected if None
|
|
|
|
Returns:
|
|
Tuple of (parsed data as list of dicts, detected pattern type)
|
|
"""
|
|
if not text or not text.strip():
|
|
return [], 'empty'
|
|
|
|
detected_type = pattern_type or detect_pattern_type(text)
|
|
|
|
if detected_type == 'table':
|
|
data = parse_table(text)
|
|
elif detected_type in ('key_value_colon', 'key_value_equals'):
|
|
data = parse_key_value(text, ':' if detected_type == 'key_value_colon' else '=')
|
|
elif detected_type in ('delimited_comma', 'delimited_tab', 'delimited_semicolon'):
|
|
delim_map = {
|
|
'delimited_comma': ',',
|
|
'delimited_tab': '\t',
|
|
'delimited_semicolon': ';'
|
|
}
|
|
data = parse_delimited(text, delim_map.get(detected_type, ','))
|
|
elif detected_type == 'json_like':
|
|
data = [{'raw': text}]
|
|
elif detected_type == 'key_value_block':
|
|
data = parse_key_value(text, ':')
|
|
else:
|
|
data = parse_raw(text)
|
|
|
|
return data, detected_type
|