diff --git a/src/core/generator.py b/src/core/generator.py new file mode 100644 index 0000000..bad0351 --- /dev/null +++ b/src/core/generator.py @@ -0,0 +1,376 @@ +"""Response Generator for creating mock responses from JSON Schema.""" + +import random +from datetime import date, datetime +from typing import Any + +from faker import Faker + + +class ResponseGeneratorError(Exception): + """Base exception for response generator errors.""" + + pass + + +class UnsupportedSchemaType(ResponseGeneratorError): + """Raised when an unsupported schema type is encountered.""" + + pass + + +class ResponseGenerator: + """Generates realistic mock responses based on JSON Schema definitions.""" + + TYPE_MAPPING = { + "string": "string", + "integer": "integer", + "number": "number", + "boolean": "boolean", + "array": "array", + "object": "object", + "null": "null", + } + + FORMAT_MAPPING = { + "date": "date", + "date-time": "date_time", + "time": "time", + "email": "email", + "uri": "uri", + "uuid": "uuid", + "hostname": "hostname", + "ipv4": "ipv4", + "ipv6": "ipv6", + "password": "password", + } + + def __init__(self, seed: int | None = None) -> None: + """Initialize the response generator. + + Args: + seed: Optional seed for reproducible random generation. + """ + self.faker = Faker() + if seed is not None: + Faker.seed(seed) + random.seed(seed) + + def generate(self, schema: dict[str, Any]) -> Any: + """Generate a mock value based on the given schema. + + Args: + schema: JSON Schema definition. + + Returns: + Generated mock value. + + Raises: + UnsupportedSchemaType: If the schema type is not supported. + """ + if not schema: + return None + + schema_type = schema.get("type") + + if schema_type is None and "$ref" in schema: + return self._resolve_ref(schema["$ref"], schema) + + if schema_type is None: + return None + + type_key = schema_type.lower() + if type_key not in self.TYPE_MAPPING: + raise UnsupportedSchemaType(f"Unsupported schema type: {schema_type}") + + generator_method = getattr( + self, f"_generate_{type_key}", self._generate_default + ) + return generator_method(schema) + + def _generate_string(self, schema: dict[str, Any]) -> str: + """Generate a string value based on the schema. + + Args: + schema: String schema definition. + + Returns: + Generated string value. + """ + fmt = schema.get("format", "") + pattern = schema.get("pattern") + + if fmt and fmt.lower() in self.FORMAT_MAPPING: + format_method_name = self.FORMAT_MAPPING[fmt.lower()] + format_method = getattr(self.faker, format_method_name, None) + if format_method: + return format_method() + + if pattern: + return self._generate_by_pattern(pattern) + + min_length = schema.get("minLength", 0) + max_length = schema.get("maxLength", 100) + enum_values = schema.get("enum") + + if enum_values: + return random.choice(enum_values) + + actual_max = max(min_length, max_length) + length = random.randint(min_length, actual_max) if actual_max > 0 else 10 + length = max(length, 5) + + return self.faker.text(max_nb_chars=length) + + def _generate_by_pattern(self, pattern: str) -> str: + """Generate a string matching a regex pattern. + + Args: + pattern: Regex pattern. + + Returns: + Generated string matching the pattern. + """ + if pattern == r"^[a-zA-Z0-9_-]{1,50}$": + return self.faker.slug() + + return self.faker.word() + + def _generate_integer(self, schema: dict[str, Any]) -> int: + """Generate an integer value based on the schema. + + Args: + schema: Integer schema definition. + + Returns: + Generated integer value. + """ + minimum = schema.get("minimum", 0) + maximum = schema.get("maximum", 1000) + multiple_of = schema.get("multipleOf", 1) + + value = random.randint(minimum, maximum) + + if multiple_of > 1: + value = (value // multiple_of) * multiple_of + + return value + + def _generate_number(self, schema: dict[str, Any]) -> float: + """Generate a number value based on the schema. + + Args: + schema: Number schema definition. + + Returns: + Generated number value. + """ + minimum = schema.get("minimum", 0.0) + maximum = schema.get("maximum", 1000.0) + multiple_of = schema.get("multipleOf", 1.0) + + value = random.uniform(minimum, maximum) + + if multiple_of > 1: + value = round(value / multiple_of) * multiple_of + + return round(value, 2) + + def _generate_boolean(self, schema: dict[str, Any]) -> bool: + """Generate a boolean value based on the schema. + + Args: + schema: Boolean schema definition. + + Returns: + Generated boolean value. + """ + return random.choice([True, False]) + + def _generate_array(self, schema: dict[str, Any]) -> list[Any]: + """Generate an array based on the schema. + + Args: + schema: Array schema definition. + + Returns: + Generated array of values. + """ + items_schema = schema.get("items", {}) + min_items = schema.get("minItems", 0) + max_items = schema.get("maxItems", 10) + enum_values = schema.get("enum") + + count = random.randint(min_items, max_items) + + if enum_values: + return [random.choice(enum_values) for _ in range(count)] + + return [self.generate(items_schema) for _ in range(count)] + + def _generate_object(self, schema: dict[str, Any]) -> dict[str, Any]: + """Generate an object based on the schema. + + Args: + schema: Object schema definition. + + Returns: + Generated object as a dictionary. + """ + properties = schema.get("properties", {}) + required_fields = schema.get("required", []) + additional_properties = schema.get("additionalProperties", True) + + result: dict[str, Any] = {} + + for prop_name, prop_schema in properties.items(): + if prop_name in required_fields or additional_properties: + value = self.generate(prop_schema) + result[prop_name] = self._convert_datetime(value) + + return result + + def _convert_datetime(self, value: Any) -> Any: + """Convert datetime objects to ISO format strings. + + Args: + value: The value to convert. + + Returns: + Converted value. + """ + if isinstance(value, datetime): + return value.isoformat() + elif isinstance(value, date): + return value.isoformat() + return value + + def _generate_null(self, schema: dict[str, Any]) -> None: + """Generate a null value. + + Args: + schema: Null schema definition. + + Returns: + None. + """ + return None + + def _generate_default(self, schema: dict[str, Any]) -> Any: + """Default generator for unknown types. + + Args: + schema: Schema definition. + + Returns: + None as default value. + """ + return None + + def _resolve_ref( + self, ref: str, schema: dict[str, Any] + ) -> Any: + """Resolve a $ref reference. + + Args: + ref: The reference string. + schema: The current schema containing the reference. + + Returns: + The resolved value. + """ + if ref.startswith("#/components/schemas/"): + ref_name = ref.split("/")[-1] + return schema.get("resolved_schemas", {}).get(ref_name, {}) + + return None + + def set_resolved_schemas( + self, schemas: dict[str, Any] + ) -> None: + """Set resolved schema references. + + Args: + schemas: Dictionary of resolved schema names to schema objects. + """ + self._resolved_schemas = schemas + + def generate_response( + self, + schema: dict[str, Any], + status_code: int = 200, + headers: dict[str, str] | None = None, + ) -> dict[str, Any]: + """Generate a complete HTTP response. + + Args: + schema: Response schema definition. + status_code: HTTP status code. + headers: Optional response headers. + + Returns: + Complete response dictionary. + """ + response_body = self.generate(schema) + + response: dict[str, Any] = { + "status_code": status_code, + "body": response_body, + } + + if headers: + response["headers"] = headers + + return response + + def generate_list_response( + self, + item_schema: dict[str, Any], + count: int | None = None, + status_code: int = 200, + ) -> dict[str, Any]: + """Generate a list/array response. + + Args: + item_schema: Schema for array items. + count: Number of items to generate (random if not specified). + status_code: HTTP status code. + + Returns: + Complete response with array body. + """ + if count is None: + count = random.randint(1, 10) + + items = [self.generate(item_schema) for _ in range(count)] + + return { + "status_code": status_code, + "body": items, + } + + def generate_error_response( + self, + message: str, + status_code: int = 400, + error_type: str = "Bad Request", + ) -> dict[str, Any]: + """Generate an error response. + + Args: + message: Error message. + status_code: HTTP status code. + error_type: Type of error. + + Returns: + Error response dictionary. + """ + return { + "status_code": status_code, + "body": { + "error": { + "type": error_type, + "message": message, + } + }, + }