Initial upload: cmdparse CLI tool with comprehensive documentation and CI/CD
This commit is contained in:
105
cmdparse/extractors.py
Normal file
105
cmdparse/extractors.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""Field extraction syntax implementation with dot notation support."""
|
||||
|
||||
import re
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
|
||||
def get_nested_value(data: Any, path: str) -> Any:
|
||||
"""Get a value from nested data structure using dot notation."""
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
parts = path.split('.')
|
||||
current = data
|
||||
|
||||
for part in parts:
|
||||
if isinstance(current, dict):
|
||||
if part in current:
|
||||
current = current[part]
|
||||
else:
|
||||
return None
|
||||
elif isinstance(current, list):
|
||||
try:
|
||||
index = int(part)
|
||||
if 0 <= index < len(current):
|
||||
current = current[index]
|
||||
else:
|
||||
return None
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
return current
|
||||
|
||||
|
||||
def extract_array_index(path: str) -> Tuple[str, Optional[int], Optional[str]]:
|
||||
"""Extract array index from path like 'items[0]' or 'items[0].name'."""
|
||||
match = re.match(r'([^\[]+)\[(\d+)\](.*)', path)
|
||||
if match:
|
||||
base = match.group(1)
|
||||
index = int(match.group(2))
|
||||
rest = match.group(3)
|
||||
if rest and rest.startswith('.'):
|
||||
rest = rest[1:]
|
||||
return base, index, rest
|
||||
return path, None, None
|
||||
|
||||
|
||||
def extract_fields(data: List[dict], fields: List[str]) -> List[dict]:
|
||||
"""
|
||||
Extract specific fields from parsed data.
|
||||
|
||||
Args:
|
||||
data: List of dictionaries containing parsed data
|
||||
fields: List of field paths to extract (supports dot notation)
|
||||
|
||||
Returns:
|
||||
List of dictionaries with only the requested fields
|
||||
"""
|
||||
if not fields or not data:
|
||||
return data
|
||||
|
||||
result = []
|
||||
for row in data:
|
||||
extracted_row = {}
|
||||
for field in fields:
|
||||
base, index, rest = extract_array_index(field)
|
||||
|
||||
if index is not None:
|
||||
array_value = row.get(base, [])
|
||||
if isinstance(array_value, list) and 0 <= index < len(array_value):
|
||||
item = array_value[index]
|
||||
if rest:
|
||||
value = get_nested_value(item, rest)
|
||||
else:
|
||||
value = item
|
||||
else:
|
||||
value = None
|
||||
else:
|
||||
value = get_nested_value(row, field)
|
||||
|
||||
if value is not None:
|
||||
extracted_row[field] = value
|
||||
|
||||
result.append(extracted_row)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def flatten_dict(d: dict, parent_key: str = '', sep: str = '.') -> dict:
|
||||
"""Flatten a nested dictionary."""
|
||||
items = {}
|
||||
for k, v in d.items():
|
||||
new_key = f"{parent_key}{sep}{k}" if parent_key else k
|
||||
if isinstance(v, dict):
|
||||
items.update(flatten_dict(v, new_key, sep))
|
||||
elif isinstance(v, list):
|
||||
for i, item in enumerate(v):
|
||||
if isinstance(item, dict):
|
||||
items.update(flatten_dict(item, f"{new_key}[{i}]", sep))
|
||||
else:
|
||||
items[f"{new_key}[{i}]"] = item
|
||||
else:
|
||||
items[new_key] = v
|
||||
return items
|
||||
Reference in New Issue
Block a user