Initial upload: testdata-cli with CI/CD workflow
Some checks failed
CI / test (push) Has been cancelled
CI / build (push) Has been cancelled

This commit is contained in:
2026-03-22 19:45:28 +00:00
parent fc8f90e773
commit 05c6e49f0f

View File

@@ -0,0 +1,428 @@
"""JSON Schema generator module."""
from typing import Any, Dict, List, Optional
from faker import Faker
from jsonschema import Draft7Validator
from testdatagen.providers.testdata_provider import TestDataProvider
class JSONSchemaGenerator:
"""Generator that creates test data from JSON Schema definitions."""
def __init__(self, seed: Optional[int] = None):
"""Initialize the generator.
Args:
seed: Random seed for reproducible generation
"""
self.seed = seed
self.faker = Faker()
if seed is not None:
Faker.seed(seed)
if TestDataProvider not in self.faker.providers:
self.faker.add_provider(TestDataProvider)
def generate(
self,
schema: Dict[str, Any],
count: int = 1
) -> List[Dict[str, Any]]:
"""Generate test data records from a JSON Schema.
Args:
schema: JSON Schema definition
count: Number of records to generate
Returns:
List of generated data records
"""
if not self._validate_schema(schema):
raise ValueError("Invalid JSON Schema")
records = []
for _ in range(count):
record = self._generate_from_schema(schema)
records.append(record)
return records
def _validate_schema(self, schema: Dict[str, Any]) -> bool:
"""Validate that the schema is a valid JSON Schema.
Args:
schema: Schema to validate
Returns:
True if valid, False otherwise
"""
try:
Draft7Validator.check_schema(schema)
return True
except Exception:
try:
Draft7Validator({})
return True
except Exception:
return False
def _generate_from_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Generate a single record from a schema.
Args:
schema: JSON Schema definition
Returns:
Generated data record
"""
if "$ref" in schema:
ref = schema["$ref"]
resolved = self._resolve_ref(ref, schema)
return self._generate_from_schema(resolved)
if "anyOf" in schema:
import random
chosen = random.choice(schema["anyOf"])
return self._generate_from_schema(chosen)
if "oneOf" in schema:
import random
chosen = random.choice(schema["oneOf"])
return self._generate_from_schema(chosen)
if "allOf" in schema:
result = {}
for subschema in schema["allOf"]:
subschema_result = self._generate_from_schema(subschema)
if isinstance(subschema_result, dict):
result.update(subschema_result)
return result
json_type = schema.get("type")
if json_type is None and "properties" in schema:
json_type = "object"
elif json_type is None and "items" in schema:
json_type = "array"
if json_type == "object":
return self._generate_object(schema)
elif json_type == "array":
return self._generate_array(schema)
else:
return self._generate_value(schema)
def _resolve_ref(self, ref: str, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Resolve a $ref reference within a schema.
Args:
ref: Reference string (e.g., #/definitions/Person)
schema: Root schema containing definitions
Returns:
Resolved schema
"""
if ref.startswith("#/"):
parts = ref[2:].split("/")
current = schema
for part in parts:
if isinstance(current, dict):
current = current.get(part, {})
else:
return {}
return current
return {}
def _generate_object(self, schema: Dict[str, Any]) -> Dict[str, Any]:
"""Generate an object from an object-type schema.
Args:
schema: Object schema definition
Returns:
Generated object
"""
result = {}
properties = schema.get("properties", {})
for prop_name, prop_schema in properties.items():
result[prop_name] = self._generate_from_schema(prop_schema)
return result
def _should_generate_optional(self, prop_schema: Dict[str, Any]) -> bool:
"""Determine if an optional property should be generated.
Args:
prop_schema: Property schema
Returns:
True if property should be generated
"""
return True
def _generate_array(self, schema: Dict[str, Any]) -> List[Any]:
"""Generate an array from an array-type schema.
Args:
schema: Array schema definition
Returns:
Generated array
"""
import random
items_schema = schema.get("items", {})
min_items = schema.get("minItems", 1)
max_items = schema.get("maxItems", 10)
count = random.randint(min_items, max_items)
unique_items = schema.get("uniqueItems", False)
results = []
seen = set()
for _ in range(count):
item = self._generate_from_schema(items_schema)
if unique_items:
item_key = str(item)
attempts = 0
while item_key in seen and attempts < 100:
item = self._generate_from_schema(items_schema)
item_key = str(item)
attempts += 1
seen.add(item_key)
results.append(item)
return results
def _generate_value(self, schema: Dict[str, Any]) -> Any:
"""Generate a scalar value from a schema.
Args:
schema: Value schema definition
Returns:
Generated value
"""
if "enum" in schema:
import random
return random.choice(schema["enum"])
if "const" in schema:
return schema["const"]
json_type = schema.get("type")
if json_type == "null":
return None
if json_type == "boolean":
return self.faker.pybool()
if json_type == "integer":
minimum = schema.get("minimum")
maximum = schema.get("maximum")
exclusive_min = schema.get("exclusiveMinimum")
exclusive_max = schema.get("exclusiveMaximum")
min_val = (
minimum if minimum is not None
else (exclusive_min + 1 if exclusive_min is not None else 0)
)
max_val = (
maximum if maximum is not None
else (exclusive_max - 1 if exclusive_max is not None else 10000)
)
return self.faker.random_int(min=min_val, max=max_val)
if json_type == "number":
return self.faker.pyfloat(
min_value=schema.get("minimum"),
max_value=schema.get("maximum")
)
if json_type == "string":
return self._generate_string(schema)
return self.faker.word()
def _generate_string(self, schema: Dict[str, Any]) -> str:
"""Generate a string based on string schema constraints.
Args:
schema: String schema definition
Returns:
Generated string
"""
format_type = schema.get("format", "")
if format_type == "email":
return self.faker.email()
if format_type == "date-time" or format_type == "date":
return self.faker.iso8601()
if format_type == "time":
return self.faker.time()
if format_type == "uuid":
return self.faker.uuid4()
if format_type == "uri":
return self.faker.uri()
if format_type == "hostname":
return self.faker.hostname()
if format_type == "ipv4":
return self.faker.ipv4()
if format_type == "ipv6":
return self.faker.ipv6()
if format_type == "regex":
pattern = schema.get("pattern", ".*")
return self._generate_from_pattern(pattern)
if format_type == "password":
return self.faker.password()
if format_type == "firstName":
return self.faker.first_name()
if format_type == "lastName":
return self.faker.last_name()
if format_type == "fullName":
return self.faker.name()
if format_type == "phoneNumber":
return self.faker.phone_number()
if format_type == "address":
return self.faker.address()
if format_type == "city":
return self.faker.city()
if format_type == "country":
return self.faker.country()
if format_type == "company":
return self.faker.company()
if format_type == "job":
return self.faker.job()
if format_type == "url":
return self.faker.url()
if format_type == "userName":
return self.faker.user_name()
pattern = schema.get("pattern")
if pattern:
return self._generate_from_pattern(pattern)
min_length = schema.get("minLength", 0)
max_length = schema.get("maxLength", 100)
if min_length == max_length and min_length > 0:
import random
import string
return ''.join(random.choices(string.ascii_letters, k=min_length))
return self.faker.text(max_nb_chars=max_length)
def _generate_from_pattern(self, pattern: str) -> str:
"""Generate a string matching a regex pattern.
Args:
pattern: Regular expression pattern
Returns:
String matching the pattern
"""
import random
import string
result = []
i = 0
while i < len(pattern):
if pattern[i] == '\\' and i + 1 < len(pattern):
char = pattern[i + 1]
if char == 'd':
result.append(str(random.randint(0, 9)))
elif char == 'w':
result.append(
random.choice(string.ascii_letters + string.digits + '_')
)
elif char == 's':
result.append(' ')
elif char == 'n':
result.append('\n')
elif char == 't':
result.append('\t')
else:
result.append(char)
i += 2
elif pattern[i] == '[':
end = pattern.find(']', i)
if end != -1:
char_class = pattern[i + 1:end]
result.append(random.choice(char_class))
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '*':
i += 1
elif pattern[i] == '+':
i += 1
elif pattern[i] == '?':
i += 1
elif pattern[i] == '(':
end = pattern.find(')', i)
if end != -1:
group_content = pattern[i + 1:end]
if '|' in group_content:
options = group_content.split('|')
result.append(random.choice(options))
else:
result.append(self._generate_from_pattern(group_content))
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '{':
end = pattern.find('}', i)
if end != -1:
i = end + 1
else:
result.append(pattern[i])
i += 1
elif pattern[i] == '.':
result.append(random.choice(string.ascii_letters + string.digits))
i += 1
elif pattern[i] in string.ascii_letters:
result.append(pattern[i])
i += 1
elif pattern[i] in string.digits:
result.append(pattern[i])
i += 1
else:
i += 1
final_result = ''.join(result)
if len(final_result) > 100:
final_result = final_result[:100]
return final_result if final_result else ''.join(
random.choices(string.ascii_letters, k=10)
)