Restore testdata-cli with proper CI/CD and tests
Some checks failed
CI / build (push) Has been skipped
CI / test (push) Failing after 12s

This commit is contained in:
Developer
2026-03-22 19:55:51 +00:00
commit da98b8b402
23 changed files with 2511 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
"""TestDataGen - CLI tool for generating realistic test data."""
__version__ = "0.1.0"

258
src/testdatagen/cli.py Normal file
View File

@@ -0,0 +1,258 @@
"""Main CLI module for TestDataGen."""
import sys
from pathlib import Path
import click
from testdatagen.formatters.csv_formatter import CSVFormatter
from testdatagen.formatters.json_formatter import JSONFormatter
from testdatagen.formatters.sql_formatter import SQLFormatter
from testdatagen.generators.json_schema_generator import JSONSchemaGenerator
@click.group()
@click.version_option(version="0.1.0")
def main():
"""TestDataGen - Generate realistic test data from schemas and types."""
pass
@main.command()
@click.option(
"--schema", "-s",
type=click.Path(exists=True, file_okay=True, dir_okay=False),
required=True,
help="Path to JSON Schema file"
)
@click.option(
"--count", "-n",
type=int,
default=10,
help="Number of records to generate (default: 10)"
)
@click.option(
"--format", "-f",
type=click.Choice(["json", "csv", "sql"], case_sensitive=False),
default="json",
help="Output format (default: json)"
)
@click.option(
"--seed",
type=int,
default=None,
help="Random seed for reproducibility"
)
@click.option(
"--table",
type=str,
default="generated_table",
help="Table name for SQL output (default: generated_table)"
)
@click.option(
"--indent",
type=int,
default=None,
help="Indentation level for JSON output (default: None)"
)
def generate(schema, count, format, seed, table, indent):
"""Generate test data from a JSON Schema file."""
try:
schema_path = Path(schema)
with open(schema_path, "r") as f:
import json
schema_data = json.load(f)
generator = JSONSchemaGenerator(seed=seed)
records = generator.generate(schema_data, count=count)
if format.lower() == "json":
formatter = JSONFormatter(indent=indent)
elif format.lower() == "csv":
formatter = CSVFormatter()
elif format.lower() == "sql":
formatter = SQLFormatter(table_name=table)
else:
click.echo(f"Error: Unsupported format '{format}'", err=True)
sys.exit(1)
output = formatter.format(records)
click.echo(output)
except json.JSONDecodeError as e:
click.echo(f"Error: Invalid JSON in schema file: {e}", err=True)
sys.exit(1)
except FileNotFoundError:
click.echo(f"Error: Schema file not found: {schema}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@main.command()
@click.option(
"--input", "-i",
type=click.Path(exists=True, file_okay=True, dir_okay=False),
required=True,
help="Path to TypeScript file"
)
@click.option(
"--count", "-n",
type=int,
default=10,
help="Number of records to generate (default: 10)"
)
@click.option(
"--format", "-f",
type=click.Choice(["json", "csv", "sql"], case_sensitive=False),
default="json",
help="Output format (default: json)"
)
@click.option(
"--seed",
type=int,
default=None,
help="Random seed for reproducibility"
)
@click.option(
"--table",
type=str,
default="generated_table",
help="Table name for SQL output (default: generated_table)"
)
def from_ts(input, count, format, seed, table):
"""Generate test data from a TypeScript type definition."""
try:
import subprocess
result = subprocess.run(
["npx", "tsc", "--declaration", "--emitDeclarationOnly", "--jsonSchemaManifest", input],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
click.echo(f"Error: TypeScript compilation failed: {result.stderr}", err=True)
sys.exit(1)
schema_path = Path(input).with_suffix(".json")
if not schema_path.exists():
click.echo("Error: Could not generate schema from TypeScript file", err=True)
sys.exit(1)
with open(schema_path, "r") as f:
import json
schema_data = json.load(f)
generator = JSONSchemaGenerator(seed=seed)
records = generator.generate(schema_data, count=count)
if format.lower() == "json":
formatter = JSONFormatter()
elif format.lower() == "csv":
formatter = CSVFormatter()
elif format.lower() == "sql":
formatter = SQLFormatter(table_name=table)
else:
click.echo(f"Error: Unsupported format '{format}'", err=True)
sys.exit(1)
output = formatter.format(records)
click.echo(output)
except FileNotFoundError:
click.echo("Error: TypeScript file not found", err=True)
sys.exit(1)
except subprocess.TimeoutExpired:
click.echo("Error: TypeScript compilation timed out", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
@main.command()
@click.option(
"--input", "-i",
type=click.Path(exists=True, file_okay=True, dir_okay=False),
required=True,
help="Path to sample data file (JSON or CSV)"
)
@click.option(
"--count", "-n",
type=int,
default=10,
help="Number of records to generate (default: 10)"
)
@click.option(
"--format", "-f",
type=click.Choice(["json", "csv", "sql"], case_sensitive=False),
default="json",
help="Output format (default: json)"
)
@click.option(
"--seed",
type=int,
default=None,
help="Random seed for reproducibility"
)
@click.option(
"--table",
type=str,
default="generated_table",
help="Table name for SQL output (default: generated_table)"
)
def from_sample(input, count, format, seed, table):
"""Generate test data from a sample data file."""
try:
input_path = Path(input)
with open(input_path, "r") as f:
import json
sample_data = json.load(f)
try:
from genson import SchemaBuilder
except ImportError:
click.echo("Error: genson not installed. Run: pip install genson", err=True)
sys.exit(1)
builder = SchemaBuilder()
if isinstance(sample_data, list):
for item in sample_data:
builder.add_object(item)
else:
builder.add_object(sample_data)
schema_data = builder.to_schema()
generator = JSONSchemaGenerator(seed=seed)
records = generator.generate(schema_data, count=count)
if format.lower() == "json":
formatter = JSONFormatter()
elif format.lower() == "csv":
formatter = CSVFormatter()
elif format.lower() == "sql":
formatter = SQLFormatter(table_name=table)
else:
click.echo(f"Error: Unsupported format '{format}'", err=True)
sys.exit(1)
output = formatter.format(records)
click.echo(output)
except json.JSONDecodeError:
click.echo(f"Error: Invalid JSON in sample file: {input}", err=True)
sys.exit(1)
except FileNotFoundError:
click.echo(f"Error: Sample file not found: {input}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1 @@
"""Formatters package for TestDataGen."""

View File

@@ -0,0 +1,129 @@
"""CSV output formatter."""
import csv
import io
import json
from typing import Any, Dict, List
class CSVFormatter:
"""Formatter that outputs data in CSV format."""
def __init__(self, delimiter: str = ",", quotechar: str = '"'):
"""Initialize the CSV formatter.
Args:
delimiter: Column delimiter character
quotechar: Quote character for fields containing delimiters
"""
self.delimiter = delimiter
self.quotechar = quotechar
def format(self, records: List[Dict[str, Any]]) -> str:
"""Format records as CSV string.
Args:
records: List of data records to format
Returns:
CSV-formatted string
"""
if not records:
return ""
all_keys = self._extract_all_keys(records)
output = io.StringIO()
writer = csv.DictWriter(
output,
fieldnames=all_keys,
delimiter=self.delimiter,
quotechar=self.quotechar,
quoting=csv.QUOTE_MINIMAL,
extrasaction='ignore'
)
writer.writeheader()
for record in records:
flattened = self._flatten_record(record)
writer.writerow(flattened)
return output.getvalue()
def _extract_all_keys(self, records: List[Dict[str, Any]]) -> List[str]:
"""Extract all unique keys from records.
Args:
records: List of records
Returns:
List of all unique keys in order of first appearance
"""
seen = set()
result = []
for record in records:
flattened = self._flatten_keys(record)
for key in flattened:
if key not in seen:
seen.add(key)
result.append(key)
return result
def _flatten_keys(self, obj: Any, parent_key: str = "") -> List[str]:
"""Flatten nested structure and extract all keys.
Args:
obj: Object to extract keys from
parent_key: Prefix for nested keys
Returns:
List of flattened keys
"""
if not isinstance(obj, dict):
return [parent_key] if parent_key else []
keys = []
for key, value in obj.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
keys.extend(self._flatten_keys(value, new_key))
elif isinstance(value, list) and value and isinstance(value[0], dict):
for i, item in enumerate(value):
keys.extend(self._flatten_keys(item, f"{new_key}[{i}]"))
else:
keys.append(new_key)
return keys
def _flatten_record(self, record: Dict[str, Any], parent_key: str = "") -> Dict[str, Any]:
"""Flatten a record for CSV output.
Args:
record: Record to flatten
parent_key: Prefix for nested keys
Returns:
Flattened dictionary
"""
result = {}
for key, value in record.items():
new_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
result.update(self._flatten_record(value, new_key))
elif isinstance(value, list):
if not value:
result[new_key] = ""
elif isinstance(value[0], dict):
for i, item in enumerate(value):
result.update(self._flatten_record(item, f"{new_key}[{i}]"))
else:
result[new_key] = json.dumps(value)
else:
result[new_key] = value if value is not None else ""
return result

View File

@@ -0,0 +1,57 @@
"""JSON output formatter."""
import json
from typing import Any, Dict, List, Optional
class JSONFormatter:
"""Formatter that outputs data in JSON format."""
def __init__(self, indent: Optional[int] = None, ensure_ascii: bool = False):
"""Initialize the JSON formatter.
Args:
indent: Number of spaces for indentation (None for no indentation)
ensure_ascii: Whether to escape non-ASCII characters
"""
self.indent = indent
self.ensure_ascii = ensure_ascii
def format(self, records: List[Dict[str, Any]]) -> str:
"""Format records as JSON string.
Args:
records: List of data records to format
Returns:
JSON-formatted string
"""
if len(records) == 1:
return json.dumps(
records[0],
indent=self.indent,
ensure_ascii=self.ensure_ascii,
default=self._json_serializer
)
return json.dumps(
records,
indent=self.indent,
ensure_ascii=self.ensure_ascii,
default=self._json_serializer
)
def _json_serializer(self, obj: Any) -> Any:
"""Custom JSON serializer for objects not serializable by default.
Args:
obj: Object to serialize
Returns:
Serialized representation
"""
if hasattr(obj, '__dict__'):
return obj.__dict__
if hasattr(obj, 'isoformat'):
return obj.isoformat()
return str(obj)

View File

@@ -0,0 +1,118 @@
"""SQL output formatter."""
import re
from typing import Any, Dict, List
class SQLFormatter:
"""Formatter that outputs data as SQL INSERT statements."""
def __init__(self, table_name: str = "generated_table"):
"""Initialize the SQL formatter.
Args:
table_name: Name of the table for INSERT statements
"""
self.table_name = self._validate_table_name(table_name)
def format(self, records: List[Dict[str, Any]]) -> str:
"""Format records as SQL INSERT statements.
Args:
records: List of data records to format
Returns:
SQL INSERT statements
"""
if not records:
return ""
if not records[0]:
return ""
columns = list(records[0].keys())
column_list = ", ".join(columns)
statements = []
for record in records:
values = []
for col in columns:
value = record.get(col)
values.append(self._format_value(value))
values_list = ", ".join(values)
statement = f"INSERT INTO {self.table_name} ({column_list}) VALUES ({values_list});"
statements.append(statement)
return "\n".join(statements)
def _format_value(self, value: Any) -> str:
"""Format a value for SQL.
Args:
value: Value to format
Returns:
SQL-formatted value string
"""
if value is None:
return "NULL"
if isinstance(value, bool):
return "TRUE" if value else "FALSE"
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, str):
escaped = value.replace("'", "''")
return f"'{escaped}'"
if isinstance(value, (list, dict)):
import json
json_str = json.dumps(value).replace("'", "''")
return f"'{json_str}'"
return f"'{str(value).replace(chr(39), chr(39)+chr(39))}'"
def _validate_table_name(self, table_name: str) -> str:
"""Validate and sanitize table name to prevent SQL injection.
Args:
table_name: Table name to validate
Returns:
Validated table name
Raises:
ValueError: If table name contains invalid characters
"""
if not table_name:
return "generated_table"
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', table_name):
raise ValueError(
f"Invalid table name '{table_name}'. "
"Table name must start with a letter or underscore "
"and contain only letters, numbers, and underscores."
)
reserved_words = {
"SELECT", "INSERT", "UPDATE", "DELETE", "DROP", "CREATE",
"ALTER", "TABLE", "DATABASE", "INDEX", "VIEW", "FROM",
"WHERE", "AND", "OR", "NOT", "NULL", "TRUE", "FALSE"
}
if table_name.upper() in reserved_words:
raise ValueError(
f"Table name '{table_name}' is a reserved word. "
"Please use a different table name."
)
if len(table_name) > 64:
raise ValueError(
f"Table name '{table_name}' is too long. "
"Maximum length is 64 characters."
)
return table_name

View File

@@ -0,0 +1 @@
"""Generators package for TestDataGen."""

View File

@@ -0,0 +1,428 @@
"""JSON Schema generator module."""
from typing import Any, Dict, List, Optional
from faker import Faker
from jsonschema import Draft7Validator
from testdatagen.providers.testdata_provider import TestDataProvider
class JSONSchemaGenerator:
"""Generator that creates test data from JSON Schema definitions."""
def __init__(self, seed: Optional[int] = None):
"""Initialize the generator.
Args:
seed: Random seed for reproducible generation
"""
self.seed = seed
self.faker = Faker()
if seed is not None:
Faker.seed(seed)
if TestDataProvider not in self.faker.providers:
self.faker.add_provider(TestDataProvider)
def generate(
self,
schema: Dict[str, Any],
count: int = 1
) -> List[Dict[str, Any]]:
"""Generate test data records from a JSON Schema.
Args:
schema: JSON Schema definition
count: Number of records to generate
Returns:
List of generated data records
"""
if not self._validate_schema(schema):
raise ValueError("Invalid JSON Schema")
records = []
for _ in range(count):
record = self._generate_from_schema(schema)
records.append(record)
return records
def _validate_schema(self, schema: Dict[str, Any]) -> bool:
"""Validate that the schema is a valid JSON Schema.
Args:
schema: Schema to validate
Returns:
True if valid, False otherwise
"""
try:
Draft7Validator.check_schema(schema)
return True
except Exception:
try:
Draft7Validator({})
return True
except Exception:
return False
def _generate_from_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a single record from a schema.
Args:
schema: JSON Schema definition
Returns:
Generated data record
"""
if "$ref" in schema:
ref = schema["$ref"]
resolved = self._resolve_ref(ref, schema)
return self._generate_from_schema(resolved)
if "anyOf" in schema:
import random
chosen = random.choice(schema["anyOf"])
return self._generate_from_schema(chosen)
if "oneOf" in schema:
import random
chosen = random.choice(schema["oneOf"])
return self._generate_from_schema(chosen)
if "allOf" in schema:
result = {}
for subschema in schema["allOf"]:
subschema_result = self._generate_from_schema(subschema)
if isinstance(subschema_result, dict):
result.update(subschema_result)
return result
json_type = schema.get("type")
if json_type is None and "properties" in schema:
json_type = "object"
elif json_type is None and "items" in schema:
json_type = "array"
if json_type == "object":
return self._generate_object(schema)
elif json_type == "array":
return self._generate_array(schema)
else:
return self._generate_value(schema)
def _resolve_ref(self, ref: str, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve a $ref reference within a schema.
Args:
ref: Reference string (e.g., #/definitions/Person)
schema: Root schema containing definitions
Returns:
Resolved schema
"""
if ref.startswith("#/"):
parts = ref[2:].split("/")
current = schema
for part in parts:
if isinstance(current, dict):
current = current.get(part, {})
else:
return {}
return current
return {}
def _generate_object(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Generate an object from an object-type schema.
Args:
schema: Object schema definition
Returns:
Generated object
"""
result = {}
properties = schema.get("properties", {})
for prop_name, prop_schema in properties.items():
result[prop_name] = self._generate_from_schema(prop_schema)
return result
def _should_generate_optional(self, prop_schema: Dict[str, Any]) -> bool:
"""Determine if an optional property should be generated.
Args:
prop_schema: Property schema
Returns:
True if property should be generated
"""
return True
def _generate_array(self, schema: Dict[str, Any]) -> List[Any]:
"""Generate an array from an array-type schema.
Args:
schema: Array schema definition
Returns:
Generated array
"""
import random
items_schema = schema.get("items", {})
min_items = schema.get("minItems", 1)
max_items = schema.get("maxItems", 10)
count = random.randint(min_items, max_items)
unique_items = schema.get("uniqueItems", False)
results = []
seen = set()
for _ in range(count):
item = self._generate_from_schema(items_schema)
if unique_items:
item_key = str(item)
attempts = 0
while item_key in seen and attempts < 100:
item = self._generate_from_schema(items_schema)
item_key = str(item)
attempts += 1
seen.add(item_key)
results.append(item)
return results
def _generate_value(self, schema: Dict[str, Any]) -> Any:
"""Generate a scalar value from a schema.
Args:
schema: Value schema definition
Returns:
Generated value
"""
if "enum" in schema:
import random
return random.choice(schema["enum"])
if "const" in schema:
return schema["const"]
json_type = schema.get("type")
if json_type == "null":
return None
if json_type == "boolean":
return self.faker.pybool()
if json_type == "integer":
minimum = schema.get("minimum")
maximum = schema.get("maximum")
exclusive_min = schema.get("exclusiveMinimum")
exclusive_max = schema.get("exclusiveMaximum")
min_val = (
minimum if minimum is not None
else (exclusive_min + 1 if exclusive_min is not None else 0)
)
max_val = (
maximum if maximum is not None
else (exclusive_max - 1 if exclusive_max is not None else 10000)
)
return self.faker.random_int(min=min_val, max=max_val)
if json_type == "number":
return self.faker.pyfloat(
min_value=schema.get("minimum"),
max_value=schema.get("maximum")
)
if json_type == "string":
return self._generate_string(schema)
return self.faker.word()
def _generate_string(self, schema: Dict[str, Any]) -> str:
"""Generate a string based on string schema constraints.
Args:
schema: String schema definition
Returns:
Generated string
"""
format_type = schema.get("format", "")
if format_type == "email":
return self.faker.email()
if format_type == "date-time" or format_type == "date":
return self.faker.iso8601()
if format_type == "time":
return self.faker.time()
if format_type == "uuid":
return self.faker.uuid4()
if format_type == "uri":
return self.faker.uri()
if format_type == "hostname":
return self.faker.hostname()
if format_type == "ipv4":
return self.faker.ipv4()
if format_type == "ipv6":
return self.faker.ipv6()
if format_type == "regex":
pattern = schema.get("pattern", ".*")
return self._generate_from_pattern(pattern)
if format_type == "password":
return self.faker.password()
if format_type == "firstName":
return self.faker.first_name()
if format_type == "lastName":
return self.faker.last_name()
if format_type == "fullName":
return self.faker.name()
if format_type == "phoneNumber":
return self.faker.phone_number()
if format_type == "address":
return self.faker.address()
if format_type == "city":
return self.faker.city()
if format_type == "country":
return self.faker.country()
if format_type == "company":
return self.faker.company()
if format_type == "job":
return self.faker.job()
if format_type == "url":
return self.faker.url()
if format_type == "userName":
return self.faker.user_name()
pattern = schema.get("pattern")
if pattern:
return self._generate_from_pattern(pattern)
min_length = schema.get("minLength", 0)
max_length = schema.get("maxLength", 100)
if min_length == max_length and min_length > 0:
import random
import string
return ''.join(random.choices(string.ascii_letters, k=min_length))
return self.faker.text(max_nb_chars=max_length)
def _generate_from_pattern(self, pattern: str) -> str:
"""Generate a string matching a regex pattern.
Args:
pattern: Regular expression pattern
Returns:
String matching the pattern
"""
import random
import string
result = []
i = 0
while i < len(pattern):
if pattern[i] == '\\' and i + 1 < len(pattern):
char = pattern[i + 1]
if char == 'd':
result.append(str(random.randint(0, 9)))
elif char == 'w':
result.append(
random.choice(string.ascii_letters + string.digits + '_')
)
elif char == 's':
result.append(' ')
elif char == 'n':
result.append('\n')
elif char == 't':
result.append('\t')
else:
result.append(char)
i += 2
elif pattern[i] == '[':
end = pattern.find(']', i)
if end != -1:
char_class = pattern[i + 1:end]
result.append(random.choice(char_class))
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '*':
i += 1
elif pattern[i] == '+':
i += 1
elif pattern[i] == '?':
i += 1
elif pattern[i] == '(':
end = pattern.find(')', i)
if end != -1:
group_content = pattern[i + 1:end]
if '|' in group_content:
options = group_content.split('|')
result.append(random.choice(options))
else:
result.append(self._generate_from_pattern(group_content))
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '{':
end = pattern.find('}', i)
if end != -1:
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '.':
result.append(random.choice(string.ascii_letters + string.digits))
i += 1
elif pattern[i] in string.ascii_letters:
result.append(pattern[i])
i += 1
elif pattern[i] in string.digits:
result.append(pattern[i])
i += 1
else:
i += 1
final_result = ''.join(result)
if len(final_result) > 100:
final_result = final_result[:100]
return final_result if final_result else ''.join(
random.choices(string.ascii_letters, k=10)
)

View File

@@ -0,0 +1 @@
"""Providers package for TestDataGen."""

View File

@@ -0,0 +1,336 @@
"""TestDataProvider - Custom Faker provider for pattern-based and schema-based generation."""
import string
from typing import Any, Dict, List
from faker.providers import BaseProvider
class TestDataProvider(BaseProvider):
"""Custom Faker provider for JSON Schema-based test data generation."""
def json_schema_type(
self,
schema: Dict[str, Any],
faker_instance: Any = None
) -> Any:
"""Generate data based on JSON Schema type definition.
Args:
schema: JSON Schema definition
faker_instance: Faker instance to use for generation
Returns:
Generated data matching the schema
"""
if faker_instance is None:
faker_instance = self
if "anyOf" in schema or "oneOf" in schema:
schemas = schema.get("anyOf", []) or schema.get("oneOf", [])
import random
chosen = random.choice(schemas)
return self.json_schema_type(chosen, faker_instance)
if "allOf" in schema:
result = {}
for subschema in schema["allOf"]:
subschema_result = self.json_schema_type(subschema, faker_instance)
if isinstance(subschema_result, dict):
result.update(subschema_result)
return result
json_type = schema.get("type")
if json_type == "null":
return None
if json_type == "boolean":
return faker_instance.pybool()
if json_type == "integer":
minimum = schema.get("minimum")
maximum = schema.get("maximum")
exclusive_minimum = schema.get("exclusiveMinimum")
exclusive_maximum = schema.get("exclusiveMaximum")
min_val = (
minimum if minimum is not None
else (exclusive_minimum + 1 if exclusive_minimum is not None else 0)
)
max_val = (
maximum if maximum is not None
else (exclusive_maximum - 1 if exclusive_maximum is not None else 10000)
)
return faker_instance.random_int(min=min_val, max=max_val)
if json_type == "number":
return faker_instance.pyfloat(
min_value=schema.get("minimum"),
max_value=schema.get("maximum")
)
if json_type == "string":
return self._generate_string(schema, faker_instance)
if json_type == "array":
return self._generate_array(schema, faker_instance)
if json_type == "object":
return self._generate_object(schema, faker_instance)
if "enum" in schema:
import random
return random.choice(schema["enum"])
if "const" in schema:
return schema["const"]
return None
def _generate_string(
self,
schema: Dict[str, Any],
faker_instance: Any
) -> str:
"""Generate a string based on string-specific schema constraints."""
format_type = schema.get("format", "")
if format_type == "email":
return faker_instance.email()
if format_type == "date-time" or format_type == "date":
return faker_instance.iso8601()
if format_type == "time":
return faker_instance.time()
if format_type == "uuid":
return faker_instance.uuid4()
if format_type == "uri":
return faker_instance.uri()
if format_type == "hostname":
return faker_instance.hostname()
if format_type == "ipv4":
return faker_instance.ipv4()
if format_type == "ipv6":
return faker_instance.ipv6()
if format_type == "regex":
pattern = schema.get("pattern", ".*")
return self._generate_from_pattern(pattern)
if format_type == "json":
return faker_instance.json()
if format_type == "password":
return faker_instance.password()
if format_type == "firstName":
return faker_instance.first_name()
if format_type == "lastName":
return faker_instance.last_name()
if format_type == "fullName":
return faker_instance.name()
if format_type == "phoneNumber":
return faker_instance.phone_number()
if format_type == "address":
return faker_instance.address()
if format_type == "city":
return faker_instance.city()
if format_type == "country":
return faker_instance.country()
if format_type == "company":
return faker_instance.company()
if format_type == "job":
return faker_instance.job()
if format_type == "url":
return faker_instance.url()
if format_type == "userName":
return faker_instance.user_name()
pattern = schema.get("pattern")
if pattern:
return self._generate_from_pattern(pattern)
min_length = schema.get("minLength", 0)
max_length = schema.get("maxLength", 100)
if min_length == max_length and min_length > 0:
import random
return ''.join(random.choices(string.ascii_letters, k=min_length))
return faker_instance.text(max_nb_chars=max_length)
def _generate_from_pattern(self, pattern: str) -> str:
"""Generate a string that matches the given regex pattern.
Args:
pattern: Regular expression pattern
Returns:
String matching the pattern
"""
import random
result = []
i = 0
while i < len(pattern):
if pattern[i] == '\\' and i + 1 < len(pattern):
char = pattern[i + 1]
if char in 'd':
result.append(str(random.randint(0, 9)))
elif char in 'w':
result.append(
random.choice(string.ascii_letters + string.digits + '_')
)
elif char in 's':
result.append(' ')
elif char in 'D':
result.append(random.choice(string.ascii_letters))
elif char in 'W':
result.append(random.choice(string.punctuation + ' '))
elif char in 'n':
result.append('\n')
elif char in 't':
result.append('\t')
else:
result.append(char)
i += 2
elif pattern[i] == '[':
end = pattern.find(']', i)
if end != -1:
char_class = pattern[i + 1:end]
result.append(random.choice(char_class))
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '*':
if result and isinstance(result[-1], str):
last = result[-1]
if len(last) > 0:
result[-1] = last * random.randint(0, 3)
i += 1
elif pattern[i] == '+':
if result and isinstance(result[-1], str):
last = result[-1]
if len(last) > 0:
result[-1] = last * random.randint(1, 3)
i += 1
elif pattern[i] == '?':
if result and random.random() > 0.5:
if isinstance(result[-1], str) and len(result[-1]) > 0:
result[-1] = result[-1][:-1]
i += 1
elif pattern[i] == '(':
end = pattern.find(')', i)
if end != -1:
group_content = pattern[i + 1:end]
if '|' in group_content:
options = group_content.split('|')
result.append(random.choice(options))
else:
result.append(self._generate_from_pattern(group_content))
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '{':
end = pattern.find('}', i)
if end != -1:
count_str = pattern[i + 1:end]
if ',' in count_str:
min_count, max_count = count_str.split(',')
min_c = int(min_count) if min_count else 0
max_c = int(max_count) if max_count else min_c
else:
min_c = max_c = int(count_str)
if result and isinstance(result[-1], str):
result[-1] = result[-1] * random.randint(min_c, max_c)
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '.':
result.append(random.choice(string.ascii_letters + string.digits))
i += 1
elif pattern[i] in string.ascii_letters:
result.append(pattern[i])
i += 1
elif pattern[i] in string.digits:
result.append(pattern[i])
i += 1
else:
i += 1
final_result = ''.join(result)
if len(final_result) > 100:
final_result = final_result[:100]
return final_result if final_result else ''.join(
random.choices(string.ascii_letters, k=10)
)
def _generate_array(
self,
schema: Dict[str, Any],
faker_instance: Any
) -> List[Any]:
"""Generate an array based on array schema definition."""
import random
items_schema = schema.get("items", {})
min_items = schema.get("minItems", 1)
max_items = schema.get("maxItems", 10)
count = random.randint(min_items, max_items)
unique_items = schema.get("uniqueItems", False)
results = []
seen = set()
for _ in range(count):
item = self.json_schema_type(items_schema, faker_instance)
if unique_items:
item_key = str(item)
attempts = 0
while item_key in seen and attempts < 100:
item = self.json_schema_type(items_schema, faker_instance)
item_key = str(item)
attempts += 1
seen.add(item_key)
results.append(item)
return results
def _generate_object(
self,
schema: Dict[str, Any],
faker_instance: Any
) -> Dict[str, Any]:
"""Generate an object based on object schema definition."""
result = {}
properties = schema.get("properties", {})
for prop_name, prop_schema in properties.items():
result[prop_name] = self.json_schema_type(prop_schema, faker_instance)
return result

View File

@@ -0,0 +1 @@
"""Utils package for TestDataGen."""

View File

@@ -0,0 +1,336 @@
"""TestDataProvider - Custom Faker provider for pattern-based and schema-based generation."""
import string
from typing import Any, Dict, List
from faker.providers import BaseProvider
class TestDataProvider(BaseProvider):
"""Custom Faker provider for JSON Schema-based test data generation."""
def json_schema_type(
self,
schema: Dict[str, Any],
faker_instance: Any = None
) -> Any:
"""Generate data based on JSON Schema type definition.
Args:
schema: JSON Schema definition
faker_instance: Faker instance to use for generation
Returns:
Generated data matching the schema
"""
if faker_instance is None:
faker_instance = self
if "anyOf" in schema or "oneOf" in schema:
schemas = schema.get("anyOf", []) or schema.get("oneOf", [])
import random
chosen = random.choice(schemas)
return self.json_schema_type(chosen, faker_instance)
if "allOf" in schema:
result = {}
for subschema in schema["allOf"]:
subschema_result = self.json_schema_type(subschema, faker_instance)
if isinstance(subschema_result, dict):
result.update(subschema_result)
return result
json_type = schema.get("type")
if json_type == "null":
return None
if json_type == "boolean":
return faker_instance.pybool()
if json_type == "integer":
minimum = schema.get("minimum")
maximum = schema.get("maximum")
exclusive_minimum = schema.get("exclusiveMinimum")
exclusive_maximum = schema.get("exclusiveMaximum")
min_val = (
minimum if minimum is not None
else (exclusive_minimum + 1 if exclusive_minimum is not None else 0)
)
max_val = (
maximum if maximum is not None
else (exclusive_maximum - 1 if exclusive_maximum is not None else 10000)
)
return faker_instance.random_int(min=min_val, max=max_val)
if json_type == "number":
return faker_instance.pyfloat(
min_value=schema.get("minimum"),
max_value=schema.get("maximum")
)
if json_type == "string":
return self._generate_string(schema, faker_instance)
if json_type == "array":
return self._generate_array(schema, faker_instance)
if json_type == "object":
return self._generate_object(schema, faker_instance)
if "enum" in schema:
import random
return random.choice(schema["enum"])
if "const" in schema:
return schema["const"]
return None
def _generate_string(
self,
schema: Dict[str, Any],
faker_instance: Any
) -> str:
"""Generate a string based on string-specific schema constraints."""
format_type = schema.get("format", "")
if format_type == "email":
return faker_instance.email()
if format_type == "date-time" or format_type == "date":
return faker_instance.iso8601()
if format_type == "time":
return faker_instance.time()
if format_type == "uuid":
return faker_instance.uuid4()
if format_type == "uri":
return faker_instance.uri()
if format_type == "hostname":
return faker_instance.hostname()
if format_type == "ipv4":
return faker_instance.ipv4()
if format_type == "ipv6":
return faker_instance.ipv6()
if format_type == "regex":
pattern = schema.get("pattern", ".*")
return self._generate_from_pattern(pattern)
if format_type == "json":
return faker_instance.json()
if format_type == "password":
return faker_instance.password()
if format_type == "firstName":
return faker_instance.first_name()
if format_type == "lastName":
return faker_instance.last_name()
if format_type == "fullName":
return faker_instance.name()
if format_type == "phoneNumber":
return faker_instance.phone_number()
if format_type == "address":
return faker_instance.address()
if format_type == "city":
return faker_instance.city()
if format_type == "country":
return faker_instance.country()
if format_type == "company":
return faker_instance.company()
if format_type == "job":
return faker_instance.job()
if format_type == "url":
return faker_instance.url()
if format_type == "userName":
return faker_instance.user_name()
pattern = schema.get("pattern")
if pattern:
return self._generate_from_pattern(pattern)
min_length = schema.get("minLength", 0)
max_length = schema.get("maxLength", 100)
if min_length == max_length and min_length > 0:
import random
return ''.join(random.choices(string.ascii_letters, k=min_length))
return faker_instance.text(max_nb_chars=max_length)
def _generate_from_pattern(self, pattern: str) -> str:
"""Generate a string that matches the given regex pattern.
Args:
pattern: Regular expression pattern
Returns:
String matching the pattern
"""
import random
result = []
i = 0
while i < len(pattern):
if pattern[i] == '\\' and i + 1 < len(pattern):
char = pattern[i + 1]
if char in 'd':
result.append(str(random.randint(0, 9)))
elif char in 'w':
result.append(
random.choice(string.ascii_letters + string.digits + '_')
)
elif char in 's':
result.append(' ')
elif char in 'D':
result.append(random.choice(string.ascii_letters))
elif char in 'W':
result.append(random.choice(string.punctuation + ' '))
elif char in 'n':
result.append('\n')
elif char in 't':
result.append('\t')
else:
result.append(char)
i += 2
elif pattern[i] == '[':
end = pattern.find(']', i)
if end != -1:
char_class = pattern[i + 1:end]
result.append(random.choice(char_class))
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '*':
if result and isinstance(result[-1], str):
last = result[-1]
if len(last) > 0:
result[-1] = last * random.randint(0, 3)
i += 1
elif pattern[i] == '+':
if result and isinstance(result[-1], str):
last = result[-1]
if len(last) > 0:
result[-1] = last * random.randint(1, 3)
i += 1
elif pattern[i] == '?':
if result and random.random() > 0.5:
if isinstance(result[-1], str) and len(result[-1]) > 0:
result[-1] = result[-1][:-1]
i += 1
elif pattern[i] == '(':
end = pattern.find(')', i)
if end != -1:
group_content = pattern[i + 1:end]
if '|' in group_content:
options = group_content.split('|')
result.append(random.choice(options))
else:
result.append(self._generate_from_pattern(group_content))
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '{':
end = pattern.find('}', i)
if end != -1:
count_str = pattern[i + 1:end]
if ',' in count_str:
min_count, max_count = count_str.split(',')
min_c = int(min_count) if min_count else 0
max_c = int(max_count) if max_count else min_c
else:
min_c = max_c = int(count_str)
if result and isinstance(result[-1], str):
result[-1] = result[-1] * random.randint(min_c, max_c)
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '.':
result.append(random.choice(string.ascii_letters + string.digits))
i += 1
elif pattern[i] in string.ascii_letters:
result.append(pattern[i])
i += 1
elif pattern[i] in string.digits:
result.append(pattern[i])
i += 1
else:
i += 1
final_result = ''.join(result)
if len(final_result) > 100:
final_result = final_result[:100]
return final_result if final_result else ''.join(
random.choices(string.ascii_letters, k=10)
)
def _generate_array(
self,
schema: Dict[str, Any],
faker_instance: Any
) -> List[Any]:
"""Generate an array based on array schema definition."""
import random
items_schema = schema.get("items", {})
min_items = schema.get("minItems", 1)
max_items = schema.get("maxItems", 10)
count = random.randint(min_items, max_items)
unique_items = schema.get("uniqueItems", False)
results = []
seen = set()
for _ in range(count):
item = self.json_schema_type(items_schema, faker_instance)
if unique_items:
item_key = str(item)
attempts = 0
while item_key in seen and attempts < 100:
item = self.json_schema_type(items_schema, faker_instance)
item_key = str(item)
attempts += 1
seen.add(item_key)
results.append(item)
return results
def _generate_object(
self,
schema: Dict[str, Any],
faker_instance: Any
) -> Dict[str, Any]:
"""Generate an object based on object schema definition."""
result = {}
properties = schema.get("properties", {})
for prop_name, prop_schema in properties.items():
result[prop_name] = self.json_schema_type(prop_schema, faker_instance)
return result