diff --git a/src/testdatagen/formatters/csv_formatter.py b/src/testdatagen/formatters/csv_formatter.py new file mode 100644 index 0000000..b92abe2 --- /dev/null +++ b/src/testdatagen/formatters/csv_formatter.py @@ -0,0 +1,129 @@ +"""CSV output formatter.""" + +import csv +import io +import json +from typing import Any, Dict, List + + +class CSVFormatter: + """Formatter that outputs data in CSV format.""" + + def __init__(self, delimiter: str = ",", quotechar: str = '"'): + """Initialize the CSV formatter. + + Args: + delimiter: Column delimiter character + quotechar: Quote character for fields containing delimiters + """ + self.delimiter = delimiter + self.quotechar = quotechar + + def format(self, records: List[Dict[str, Any]]) -> str: + """Format records as CSV string. + + Args: + records: List of data records to format + + Returns: + CSV-formatted string + """ + if not records: + return "" + + all_keys = self._extract_all_keys(records) + + output = io.StringIO() + writer = csv.DictWriter( + output, + fieldnames=all_keys, + delimiter=self.delimiter, + quotechar=self.quotechar, + quoting=csv.QUOTE_MINIMAL, + extrasaction='ignore' + ) + + writer.writeheader() + + for record in records: + flattened = self._flatten_record(record) + writer.writerow(flattened) + + return output.getvalue() + + def _extract_all_keys(self, records: List[Dict[str, Any]]) -> List[str]: + """Extract all unique keys from records. + + Args: + records: List of records + + Returns: + List of all unique keys in order of first appearance + """ + seen = set() + result = [] + + for record in records: + flattened = self._flatten_keys(record) + for key in flattened: + if key not in seen: + seen.add(key) + result.append(key) + + return result + + def _flatten_keys(self, obj: Any, parent_key: str = "") -> List[str]: + """Flatten nested structure and extract all keys. + + Args: + obj: Object to extract keys from + parent_key: Prefix for nested keys + + Returns: + List of flattened keys + """ + if not isinstance(obj, dict): + return [parent_key] if parent_key else [] + + keys = [] + for key, value in obj.items(): + new_key = f"{parent_key}.{key}" if parent_key else key + if isinstance(value, dict): + keys.extend(self._flatten_keys(value, new_key)) + elif isinstance(value, list) and value and isinstance(value[0], dict): + for i, item in enumerate(value): + keys.extend(self._flatten_keys(item, f"{new_key}[{i}]")) + else: + keys.append(new_key) + + return keys + + def _flatten_record(self, record: Dict[str, Any], parent_key: str = "") -> Dict[str, Any]: + """Flatten a record for CSV output. + + Args: + record: Record to flatten + parent_key: Prefix for nested keys + + Returns: + Flattened dictionary + """ + result = {} + + for key, value in record.items(): + new_key = f"{parent_key}.{key}" if parent_key else key + + if isinstance(value, dict): + result.update(self._flatten_record(value, new_key)) + elif isinstance(value, list): + if not value: + result[new_key] = "" + elif isinstance(value[0], dict): + for i, item in enumerate(value): + result.update(self._flatten_record(item, f"{new_key}[{i}]")) + else: + result[new_key] = json.dumps(value) + else: + result[new_key] = value if value is not None else "" + + return result \ No newline at end of file