diff --git a/src/testdatagen/generators/json_schema_generator.py b/src/testdatagen/generators/json_schema_generator.py new file mode 100644 index 0000000..2468fd0 --- /dev/null +++ b/src/testdatagen/generators/json_schema_generator.py @@ -0,0 +1,428 @@ +"""JSON Schema generator module.""" + +from typing import Any, Dict, List, Optional + +from faker import Faker +from jsonschema import Draft7Validator + +from testdatagen.providers.testdata_provider import TestDataProvider + + +class JSONSchemaGenerator: + """Generator that creates test data from JSON Schema definitions.""" + + def __init__(self, seed: Optional[int] = None): + """Initialize the generator. + + Args: + seed: Random seed for reproducible generation + """ + self.seed = seed + self.faker = Faker() + if seed is not None: + Faker.seed(seed) + + if TestDataProvider not in self.faker.providers: + self.faker.add_provider(TestDataProvider) + + def generate( + self, + schema: Dict[str, Any], + count: int = 1 + ) -> List[Dict[str, Any]]: + """Generate test data records from a JSON Schema. + + Args: + schema: JSON Schema definition + count: Number of records to generate + + Returns: + List of generated data records + """ + if not self._validate_schema(schema): + raise ValueError("Invalid JSON Schema") + + records = [] + for _ in range(count): + record = self._generate_from_schema(schema) + records.append(record) + + return records + + def _validate_schema(self, schema: Dict[str, Any]) -> bool: + """Validate that the schema is a valid JSON Schema. + + Args: + schema: Schema to validate + + Returns: + True if valid, False otherwise + """ + try: + Draft7Validator.check_schema(schema) + return True + except Exception: + try: + Draft7Validator({}) + return True + except Exception: + return False + + def _generate_from_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Generate a single record from a schema. + + Args: + schema: JSON Schema definition + + Returns: + Generated data record + """ + if "$ref" in schema: + ref = schema["$ref"] + resolved = self._resolve_ref(ref, schema) + return self._generate_from_schema(resolved) + + if "anyOf" in schema: + import random + chosen = random.choice(schema["anyOf"]) + return self._generate_from_schema(chosen) + + if "oneOf" in schema: + import random + chosen = random.choice(schema["oneOf"]) + return self._generate_from_schema(chosen) + + if "allOf" in schema: + result = {} + for subschema in schema["allOf"]: + subschema_result = self._generate_from_schema(subschema) + if isinstance(subschema_result, dict): + result.update(subschema_result) + return result + + json_type = schema.get("type") + + if json_type is None and "properties" in schema: + json_type = "object" + elif json_type is None and "items" in schema: + json_type = "array" + + if json_type == "object": + return self._generate_object(schema) + elif json_type == "array": + return self._generate_array(schema) + else: + return self._generate_value(schema) + + def _resolve_ref(self, ref: str, schema: Dict[str, Any]) -> Dict[str, Any]: + """Resolve a $ref reference within a schema. + + Args: + ref: Reference string (e.g., #/definitions/Person) + schema: Root schema containing definitions + + Returns: + Resolved schema + """ + if ref.startswith("#/"): + parts = ref[2:].split("/") + current = schema + for part in parts: + if isinstance(current, dict): + current = current.get(part, {}) + else: + return {} + return current + return {} + + def _generate_object(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Generate an object from an object-type schema. + + Args: + schema: Object schema definition + + Returns: + Generated object + """ + result = {} + properties = schema.get("properties", {}) + + for prop_name, prop_schema in properties.items(): + result[prop_name] = self._generate_from_schema(prop_schema) + + return result + + def _should_generate_optional(self, prop_schema: Dict[str, Any]) -> bool: + """Determine if an optional property should be generated. + + Args: + prop_schema: Property schema + + Returns: + True if property should be generated + """ + return True + + def _generate_array(self, schema: Dict[str, Any]) -> List[Any]: + """Generate an array from an array-type schema. + + Args: + schema: Array schema definition + + Returns: + Generated array + """ + import random + + items_schema = schema.get("items", {}) + min_items = schema.get("minItems", 1) + max_items = schema.get("maxItems", 10) + + count = random.randint(min_items, max_items) + + unique_items = schema.get("uniqueItems", False) + results = [] + seen = set() + + for _ in range(count): + item = self._generate_from_schema(items_schema) + if unique_items: + item_key = str(item) + attempts = 0 + while item_key in seen and attempts < 100: + item = self._generate_from_schema(items_schema) + item_key = str(item) + attempts += 1 + seen.add(item_key) + results.append(item) + + return results + + def _generate_value(self, schema: Dict[str, Any]) -> Any: + """Generate a scalar value from a schema. + + Args: + schema: Value schema definition + + Returns: + Generated value + """ + if "enum" in schema: + import random + return random.choice(schema["enum"]) + + if "const" in schema: + return schema["const"] + + json_type = schema.get("type") + + if json_type == "null": + return None + + if json_type == "boolean": + return self.faker.pybool() + + if json_type == "integer": + minimum = schema.get("minimum") + maximum = schema.get("maximum") + exclusive_min = schema.get("exclusiveMinimum") + exclusive_max = schema.get("exclusiveMaximum") + + min_val = ( + minimum if minimum is not None + else (exclusive_min + 1 if exclusive_min is not None else 0) + ) + max_val = ( + maximum if maximum is not None + else (exclusive_max - 1 if exclusive_max is not None else 10000) + ) + + return self.faker.random_int(min=min_val, max=max_val) + + if json_type == "number": + return self.faker.pyfloat( + min_value=schema.get("minimum"), + max_value=schema.get("maximum") + ) + + if json_type == "string": + return self._generate_string(schema) + + return self.faker.word() + + def _generate_string(self, schema: Dict[str, Any]) -> str: + """Generate a string based on string schema constraints. + + Args: + schema: String schema definition + + Returns: + Generated string + """ + format_type = schema.get("format", "") + + if format_type == "email": + return self.faker.email() + + if format_type == "date-time" or format_type == "date": + return self.faker.iso8601() + + if format_type == "time": + return self.faker.time() + + if format_type == "uuid": + return self.faker.uuid4() + + if format_type == "uri": + return self.faker.uri() + + if format_type == "hostname": + return self.faker.hostname() + + if format_type == "ipv4": + return self.faker.ipv4() + + if format_type == "ipv6": + return self.faker.ipv6() + + if format_type == "regex": + pattern = schema.get("pattern", ".*") + return self._generate_from_pattern(pattern) + + if format_type == "password": + return self.faker.password() + + if format_type == "firstName": + return self.faker.first_name() + + if format_type == "lastName": + return self.faker.last_name() + + if format_type == "fullName": + return self.faker.name() + + if format_type == "phoneNumber": + return self.faker.phone_number() + + if format_type == "address": + return self.faker.address() + + if format_type == "city": + return self.faker.city() + + if format_type == "country": + return self.faker.country() + + if format_type == "company": + return self.faker.company() + + if format_type == "job": + return self.faker.job() + + if format_type == "url": + return self.faker.url() + + if format_type == "userName": + return self.faker.user_name() + + pattern = schema.get("pattern") + if pattern: + return self._generate_from_pattern(pattern) + + min_length = schema.get("minLength", 0) + max_length = schema.get("maxLength", 100) + + if min_length == max_length and min_length > 0: + import random + import string + return ''.join(random.choices(string.ascii_letters, k=min_length)) + + return self.faker.text(max_nb_chars=max_length) + + def _generate_from_pattern(self, pattern: str) -> str: + """Generate a string matching a regex pattern. + + Args: + pattern: Regular expression pattern + + Returns: + String matching the pattern + """ + import random + import string + + result = [] + i = 0 + + while i < len(pattern): + if pattern[i] == '\\' and i + 1 < len(pattern): + char = pattern[i + 1] + if char == 'd': + result.append(str(random.randint(0, 9))) + elif char == 'w': + result.append( + random.choice(string.ascii_letters + string.digits + '_') + ) + elif char == 's': + result.append(' ') + elif char == 'n': + result.append('\n') + elif char == 't': + result.append('\t') + else: + result.append(char) + i += 2 + elif pattern[i] == '[': + end = pattern.find(']', i) + if end != -1: + char_class = pattern[i + 1:end] + result.append(random.choice(char_class)) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '*': + i += 1 + elif pattern[i] == '+': + i += 1 + elif pattern[i] == '?': + i += 1 + elif pattern[i] == '(': + end = pattern.find(')', i) + if end != -1: + group_content = pattern[i + 1:end] + if '|' in group_content: + options = group_content.split('|') + result.append(random.choice(options)) + else: + result.append(self._generate_from_pattern(group_content)) + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '{': + end = pattern.find('}', i) + if end != -1: + i = end + 1 + else: + result.append(pattern[i]) + i += 1 + elif pattern[i] == '.': + result.append(random.choice(string.ascii_letters + string.digits)) + i += 1 + elif pattern[i] in string.ascii_letters: + result.append(pattern[i]) + i += 1 + elif pattern[i] in string.digits: + result.append(pattern[i]) + i += 1 + else: + i += 1 + + final_result = ''.join(result) + if len(final_result) > 100: + final_result = final_result[:100] + + return final_result if final_result else ''.join( + random.choices(string.ascii_letters, k=10) + ) \ No newline at end of file