Initial upload: testdata-cli with CI/CD workflow
This commit is contained in:
129
src/testdatagen/formatters/csv_formatter.py
Normal file
129
src/testdatagen/formatters/csv_formatter.py
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
"""CSV output formatter."""
|
||||||
|
|
||||||
|
import csv
|
||||||
|
import io
|
||||||
|
import json
|
||||||
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
|
|
||||||
|
class CSVFormatter:
|
||||||
|
"""Formatter that outputs data in CSV format."""
|
||||||
|
|
||||||
|
def __init__(self, delimiter: str = ",", quotechar: str = '"'):
|
||||||
|
"""Initialize the CSV formatter.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
delimiter: Column delimiter character
|
||||||
|
quotechar: Quote character for fields containing delimiters
|
||||||
|
"""
|
||||||
|
self.delimiter = delimiter
|
||||||
|
self.quotechar = quotechar
|
||||||
|
|
||||||
|
def format(self, records: List[Dict[str, Any]]) -> str:
|
||||||
|
"""Format records as CSV string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
records: List of data records to format
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CSV-formatted string
|
||||||
|
"""
|
||||||
|
if not records:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
all_keys = self._extract_all_keys(records)
|
||||||
|
|
||||||
|
output = io.StringIO()
|
||||||
|
writer = csv.DictWriter(
|
||||||
|
output,
|
||||||
|
fieldnames=all_keys,
|
||||||
|
delimiter=self.delimiter,
|
||||||
|
quotechar=self.quotechar,
|
||||||
|
quoting=csv.QUOTE_MINIMAL,
|
||||||
|
extrasaction='ignore'
|
||||||
|
)
|
||||||
|
|
||||||
|
writer.writeheader()
|
||||||
|
|
||||||
|
for record in records:
|
||||||
|
flattened = self._flatten_record(record)
|
||||||
|
writer.writerow(flattened)
|
||||||
|
|
||||||
|
return output.getvalue()
|
||||||
|
|
||||||
|
def _extract_all_keys(self, records: List[Dict[str, Any]]) -> List[str]:
|
||||||
|
"""Extract all unique keys from records.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
records: List of records
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all unique keys in order of first appearance
|
||||||
|
"""
|
||||||
|
seen = set()
|
||||||
|
result = []
|
||||||
|
|
||||||
|
for record in records:
|
||||||
|
flattened = self._flatten_keys(record)
|
||||||
|
for key in flattened:
|
||||||
|
if key not in seen:
|
||||||
|
seen.add(key)
|
||||||
|
result.append(key)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _flatten_keys(self, obj: Any, parent_key: str = "") -> List[str]:
|
||||||
|
"""Flatten nested structure and extract all keys.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
obj: Object to extract keys from
|
||||||
|
parent_key: Prefix for nested keys
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of flattened keys
|
||||||
|
"""
|
||||||
|
if not isinstance(obj, dict):
|
||||||
|
return [parent_key] if parent_key else []
|
||||||
|
|
||||||
|
keys = []
|
||||||
|
for key, value in obj.items():
|
||||||
|
new_key = f"{parent_key}.{key}" if parent_key else key
|
||||||
|
if isinstance(value, dict):
|
||||||
|
keys.extend(self._flatten_keys(value, new_key))
|
||||||
|
elif isinstance(value, list) and value and isinstance(value[0], dict):
|
||||||
|
for i, item in enumerate(value):
|
||||||
|
keys.extend(self._flatten_keys(item, f"{new_key}[{i}]"))
|
||||||
|
else:
|
||||||
|
keys.append(new_key)
|
||||||
|
|
||||||
|
return keys
|
||||||
|
|
||||||
|
def _flatten_record(self, record: Dict[str, Any], parent_key: str = "") -> Dict[str, Any]:
|
||||||
|
"""Flatten a record for CSV output.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
record: Record to flatten
|
||||||
|
parent_key: Prefix for nested keys
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Flattened dictionary
|
||||||
|
"""
|
||||||
|
result = {}
|
||||||
|
|
||||||
|
for key, value in record.items():
|
||||||
|
new_key = f"{parent_key}.{key}" if parent_key else key
|
||||||
|
|
||||||
|
if isinstance(value, dict):
|
||||||
|
result.update(self._flatten_record(value, new_key))
|
||||||
|
elif isinstance(value, list):
|
||||||
|
if not value:
|
||||||
|
result[new_key] = ""
|
||||||
|
elif isinstance(value[0], dict):
|
||||||
|
for i, item in enumerate(value):
|
||||||
|
result.update(self._flatten_record(item, f"{new_key}[{i}]"))
|
||||||
|
else:
|
||||||
|
result[new_key] = json.dumps(value)
|
||||||
|
else:
|
||||||
|
result[new_key] = value if value is not None else ""
|
||||||
|
|
||||||
|
return result
|
||||||
Reference in New Issue
Block a user