diff --git a/src/schema2mock/generators/mock_generator.py b/src/schema2mock/generators/mock_generator.py index 70705e7..43a195d 100644 --- a/src/schema2mock/generators/mock_generator.py +++ b/src/schema2mock/generators/mock_generator.py @@ -1 +1,480 @@ -read \ No newline at end of file +```python +"""Mock data generator that respects JSON Schema constraints.""" + +import random +import re +import string +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional + +from faker import Faker + +from schema2mock.core.schema_parser import SchemaParser + + +@dataclass +class GeneratorConfig: + """Configuration for mock data generation.""" + + locale: str = "en_US" + seed: Optional[int] = None + min_string_length: int = 1 + max_string_length: int = 100 + min_array_items: int = 1 + max_array_items: int = 10 + default_minimum: float = 0 + default_maximum: float = 1000 + null_probability: float = 0.0 + + +class MockGenerator: + """Generate realistic mock data from JSON Schema.""" + + FORMAT_MAPPING = { + "date-time": "date_time", + "date": "date", + "time": "time", + "email": "email", + "idn-email": "email", + "hostname": "hostname", + "idn-hostname": "hostname", + "ipv4": "ipv4", + "ipv6": "ipv6", + "uri": "uri", + "uri-reference": "uri", + "uuid": "uuid", + } + + TYPE_MAPPING = { + "string": "text", + "number": "pyfloat", + "integer": "random_int", + "boolean": "boolean", + "array": "word", + "object": "pydict", + "null": "null_object", + } + + def __init__(self, config: Optional[GeneratorConfig] = None): + self.config = config or GeneratorConfig() + self.faker = Faker(self.config.locale) + if self.config.seed is not None: + Faker.seed(self.config.seed) + random.seed(self.config.seed) + self._custom_providers: Dict[str, Callable] = {} + + def register_provider(self, name: str, func: Callable) -> None: + """Register a custom Faker provider.""" + self._custom_providers[name] = func + + def generate(self, schema: Dict[str, Any]) -> Any: + """Generate mock data from a schema.""" + if "$ref" in schema: + raise ValueError("Schema must be resolved before generation") + + schema_type = schema.get("type") + + if "const" in schema: + return schema["const"] + + if "enum" in schema: + return self.generate_enum(schema["enum"]) + + if "allOf" in schema: + return self.handle_allof(schema["allOf"]) + + if "anyOf" in schema: + return self.handle_anyof(schema["anyOf"]) + + if "oneOf" in schema: + return self.handle_oneof(schema["oneOf"]) + + if "not" in schema: + return self.handle_not(schema["not"]) + + if schema_type is None: + if "properties" in schema: + schema_type = "object" + elif "items" in schema: + schema_type = "array" + + if schema_type == "object": + return self.generate_object(schema) + elif schema_type == "array": + return self.generate_array(schema) + elif schema_type == "string": + return self.generate_string(schema) + elif schema_type == "number": + return self.generate_number(schema) + elif schema_type == "integer": + return self.generate_integer(schema) + elif schema_type == "boolean": + return self.generate_boolean(schema) + elif schema_type == "null": + return None + else: + return self.faker.word() + + def generate_string(self, schema: Dict[str, Any]) -> str: + """Generate a string respecting constraints.""" + constraints = schema.get("type") == "string" and schema or {**schema, "type": "string"} + + min_length = constraints.get("minLength", self.config.min_string_length) + max_length = constraints.get("maxLength", self.config.max_string_length) + max_length = max(min_length, max_length) + + pattern = constraints.get("pattern") + if pattern: + return self._generate_from_pattern(pattern, min_length, max_length) + + format_type = constraints.get("format") + if format_type and format_type in self.FORMAT_MAPPING: + faker_method = self.FORMAT_MAPPING[format_type] + if hasattr(self.faker, faker_method): + result = getattr(self.faker, faker_method)() + if isinstance(result, str): + if len(result) < min_length: + result = result * ((min_length // len(result)) + 1) + if len(result) > max_length: + result = result[:max_length] + return result + + if format_type == "binary": + length = random.randint(min_length, max_length) + return "".join(random.choices(string.digits + string.ascii_letters, k=length)) + + if format_type == "byte": + length = random.randint(min_length, max_length) + return "".join(random.choices(string.ascii_letters + string.digits + "+/", k=length)) + + if format_type == "password": + length = random.randint(min_length, max_length) + return "".join(random.choices(string.ascii_letters + string.digits + "!@#$%^&*", k=length)) + + result = self.faker.text(max_nb_chars=max_length) + if len(result) < min_length: + result = result * ((min_length // len(result)) + 1) + return result[:max_length] + + def _generate_from_pattern(self, pattern: str, min_length: int, max_length: int) -> str: + """Generate a string matching a regex pattern.""" + try: + regex = re.compile(pattern) + + if regex.pattern.startswith("^"): + regex = re.compile(regex.pattern.lstrip("^")) + + if hasattr(regex, "pattern") and regex.pattern.endswith("$"): + regex = re.compile(regex.pattern.rstrip("$")) + + if regex.pattern.startswith("[") and "]" in regex.pattern: + return self._generate_from_character_class(regex.pattern, min_length, max_length) + + attempts = 0 + max_attempts = 100 + while attempts < max_attempts: + result = self._generate_from_regex_simple(pattern) + if result and min_length <= len(result) <= max_length: + try: + if regex.fullmatch(result): + return result + except re.error: + pass + attempts += 1 + + return self._fallback_string(min_length, max_length) + + except re.error: + return self._fallback_string(min_length, max_length) + + def _generate_from_character_class(self, char_class: str, min_length: int, max_length: int) -> str: + """Generate a string from a character class pattern.""" + char_class = char_class.lstrip("[").rstrip("]") + + negated = False + if char_class.startswith("^"): + negated = True + char_class = char_class.lstrip("^") + + chars = [] + i = 0 + while i < len(char_class): + if i + 2 < len(char_class) and char_class[i + 1] == "-": + start = ord(char_class[i]) + end = ord(char_class[i + 2]) + if start <= end: + for c in range(start, end + 1): + chars.append(chr(c)) + i += 3 + else: + chars.append(char_class[i]) + i += 1 + + if negated: + all_chars = string.ascii_letters + string.digits + chars = [c for c in all_chars if c not in chars] + + if not chars: + chars = ["a"] + + length = random.randint(min_length, max_length) + return "".join(random.choice(chars) for _ in range(length)) + + def _generate_from_regex_simple(self, pattern: str) -> str: + """Simple regex pattern generation.""" + result = "" + i = 0 + while i < len(pattern): + if pattern[i] == "\\" and i + 1 < len(pattern): + escaped = pattern[i + 1] + if escaped == "d": + result += random.choice(string.digits) + elif escaped == "w": + result += random.choice(string.ascii_letters + string.digits) + elif escaped == "s": + result += " " + elif escaped == "D": + result += random.choice(string.ascii_letters) + elif escaped == "W": + result += random.choice(string.punctuation) + else: + result += escaped + i += 2 + elif pattern[i] == "[": + end = pattern.find("]", i) + if end != -1: + char_class = pattern[i:end + 1] + result += self._generate_from_character_class(char_class, 1, 1) + i = end + 1 + else: + result += pattern[i] + i += 1 + elif pattern[i] == "*": + if result: + last_char = result[-1] + count = random.randint(0, 3) + result += last_char * count + i += 1 + elif pattern[i] == "+": + if result: + last_char = result[-1] + count = random.randint(1, 3) + result += last_char * count + i += 1 + elif pattern[i] == "?": + if result and random.random() > 0.5: + result = result[:-1] + i += 1 + elif pattern[i] == "(": + end = pattern.find(")", i) + if end != -1: + inner = pattern[i + 1:end] + if "|" in inner: + options = inner.split("|") + result += random.choice(options) + i = end + 1 + else: + i += 1 + else: + result += pattern[i] + i += 1 + return result + + def _fallback_string(self, min_length: int, max_length: int) -> str: + """Generate a fallback string.""" + length = random.randint(min_length, max_length) + return "".join(random.choices(string.ascii_letters + string.digits, k=length)) + + def generate_number(self, schema: Dict[str, Any]) -> float: + """Generate a number respecting constraints.""" + minimum = schema.get("minimum", self.config.default_minimum) + maximum = schema.get("maximum", self.config.default_maximum) + exclusive_min = schema.get("exclusiveMinimum", minimum) + exclusive_max = schema.get("exclusiveMaximum", maximum) + multiple_of = schema.get("multipleOf") + + if exclusive_min > minimum: + minimum = exclusive_min + if exclusive_max < maximum: + maximum = exclusive_max + + result = random.uniform(minimum, maximum) + + if multiple_of: + result = round(result / multiple_of) * multiple_of + + return round(result, 10) + + def generate_integer(self, schema: Dict[str, Any]) -> int: + """Generate an integer respecting constraints.""" + minimum = schema.get("minimum", self.config.default_minimum) + maximum = schema.get("maximum", self.config.default_maximum) + exclusive_min = schema.get("exclusiveMinimum") + exclusive_max = schema.get("exclusiveMaximum") + multiple_of = schema.get("multipleOf") + + if exclusive_min is not None: + minimum = exclusive_min + 1 + if exclusive_max is not None: + maximum = exclusive_max - 1 + + result = random.randint(int(minimum), int(maximum)) + + if multiple_of: + result = round(result / multiple_of) * int(multiple_of) + + return result + + def generate_boolean(self, schema: Dict[str, Any]) -> bool: + """Generate a boolean value.""" + return self.faker.boolean() + + def generate_array(self, schema: Dict[str, Any]) -> List[Any]: + """Generate an array respecting constraints.""" + items_schema = schema.get("items", {}) + min_items = schema.get("minItems", self.config.min_array_items) + max_items = schema.get("maxItems", self.config.max_array_items) + max_items = max(min_items, max_items) + unique = schema.get("uniqueItems", False) + + length = random.randint(min_items, max_items) + + if "enum" in items_schema: + if unique: + values = list(set(items_schema["enum"])) + if len(values) >= length: + return random.sample(values, length) + return values + random.choices(values, k=length - len(values)) + return random.choices(items_schema["enum"], k=length) + + results = [] + for _ in range(length): + results.append(self.generate(items_schema)) + + if unique and len(results) > 1: + seen = set() + unique_results = [] + for item in results: + item_key = str(item) + if item_key not in seen: + seen.add(item_key) + unique_results.append(item) + while len(unique_results) < length: + new_item = self.generate(items_schema) + item_key = str(new_item) + if item_key not in seen: + seen.add(item_key) + unique_results.append(new_item) + results = unique_results + + return results + + def generate_object(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Generate an object respecting constraints.""" + properties = schema.get("properties", {}) + required = schema.get("required", []) + additional_props = schema.get("additionalProperties", True) + + result = {} + + for prop_name, prop_schema in properties.items(): + if prop_name in required or self._should_include_optional(): + if prop_name in self._custom_providers: + result[prop_name] = self._custom_providers[prop_name]() + else: + result[prop_name] = self.generate(prop_schema) + + if additional_props and isinstance(additional_props, dict): + pattern_props = schema.get("patternProperties", {}) + for pattern, prop_schema in pattern_props.items(): + pass + + return result + + def _should_include_optional(self, probability: float = 0.7) -> bool: + """Determine if an optional property should be included.""" + return random.random() < probability + + def generate_enum(self, enum_values: List[Any]) -> Any: + """Generate a value from an enum.""" + return random.choice(enum_values) + + def handle_allof(self, schemas: List[Dict[str, Any]]) -> Dict[str, Any]: + """Handle allOf composition by merging schemas.""" + merged = {} + required = set() + + for schema in schemas: + resolved = self._resolve_composition_schema(schema) + + if "properties" in resolved: + merged.update(resolved["properties"]) + + if "required" in resolved: + required.update(resolved["required"]) + + if "enum" in resolved: + return self.generate(resolved) + + result = {} + for prop_name, prop_schema in merged.items(): + if prop_name in required or self._should_include_optional(): + result[prop_name] = self.generate(prop_schema) + + return result + + def handle_anyof(self, schemas: List[Dict[str, Any]]) -> Any: + """Handle anyOf composition by randomly selecting one branch.""" + schema = random.choice(schemas) + resolved = self._resolve_composition_schema(schema) + return self.generate(resolved) + + def handle_oneof(self, schemas: List[Dict[str, Any]]) -> Any: + """Handle oneOf composition (same as anyOf for generation).""" + return self.handle_anyof(schemas) + + def handle_not(self, schema: Dict[str, Any]) -> Any: + """Handle not composition.""" + return self.generate(schema) + + def _resolve_composition_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]: + """Resolve a schema that may contain references.""" + if "$ref" in schema: + raise ValueError("Schema must be resolved before generation") + return schema + + def generate_from_parser(self, parser: SchemaParser) -> List[Dict[str, Any]]: + """Generate mock data from a parsed schema.""" + parsed = parser.parse() + + if isinstance(parsed, dict) and "operations" in parsed: + results = [] + for op in parsed["operations"]: + mock_data = self.generate_operation(op, parser) + results.append({ + "path": op.get("path"), + "method": op.get("method"), + "operationId": op.get("operationId"), + "mock_data": mock_data + }) + return results + else: + return [self.generate(parsed)] + + def generate_operation(self, operation: Dict[str, Any], parser: SchemaParser) -> Dict[str, Any]: + """Generate mock data for an OpenAPI operation.""" + responses = operation.get("responses", {}) + + response_200 = responses.get("200") or responses.get("201") or responses.get("default") + if not response_200: + for status in ["200", "201", "202", "204"]: + if status in responses: + response_200 = responses[status] + break + + if response_200 and response_200.get("schema"): + schema = parser.resolve_schema(response_200["schema"]) + return self.generate(schema) + + return {"message": "No schema available for this operation"} +``` \ No newline at end of file