Add source files: generator, validator, fuzzer modules
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
2026-02-02 19:55:34 +00:00
parent f111799e4f
commit 320929ebb2

376
src/core/generator.py Normal file
View File

@@ -0,0 +1,376 @@
"""Response Generator for creating mock responses from JSON Schema."""
import random
from datetime import date, datetime
from typing import Any
from faker import Faker
class ResponseGeneratorError(Exception):
"""Base exception for response generator errors."""
pass
class UnsupportedSchemaType(ResponseGeneratorError):
"""Raised when an unsupported schema type is encountered."""
pass
class ResponseGenerator:
"""Generates realistic mock responses based on JSON Schema definitions."""
TYPE_MAPPING = {
"string": "string",
"integer": "integer",
"number": "number",
"boolean": "boolean",
"array": "array",
"object": "object",
"null": "null",
}
FORMAT_MAPPING = {
"date": "date",
"date-time": "date_time",
"time": "time",
"email": "email",
"uri": "uri",
"uuid": "uuid",
"hostname": "hostname",
"ipv4": "ipv4",
"ipv6": "ipv6",
"password": "password",
}
def __init__(self, seed: int | None = None) -> None:
"""Initialize the response generator.
Args:
seed: Optional seed for reproducible random generation.
"""
self.faker = Faker()
if seed is not None:
Faker.seed(seed)
random.seed(seed)
def generate(self, schema: dict[str, Any]) -> Any:
"""Generate a mock value based on the given schema.
Args:
schema: JSON Schema definition.
Returns:
Generated mock value.
Raises:
UnsupportedSchemaType: If the schema type is not supported.
"""
if not schema:
return None
schema_type = schema.get("type")
if schema_type is None and "$ref" in schema:
return self._resolve_ref(schema["$ref"], schema)
if schema_type is None:
return None
type_key = schema_type.lower()
if type_key not in self.TYPE_MAPPING:
raise UnsupportedSchemaType(f"Unsupported schema type: {schema_type}")
generator_method = getattr(
self, f"_generate_{type_key}", self._generate_default
)
return generator_method(schema)
def _generate_string(self, schema: dict[str, Any]) -> str:
"""Generate a string value based on the schema.
Args:
schema: String schema definition.
Returns:
Generated string value.
"""
fmt = schema.get("format", "")
pattern = schema.get("pattern")
if fmt and fmt.lower() in self.FORMAT_MAPPING:
format_method_name = self.FORMAT_MAPPING[fmt.lower()]
format_method = getattr(self.faker, format_method_name, None)
if format_method:
return format_method()
if pattern:
return self._generate_by_pattern(pattern)
min_length = schema.get("minLength", 0)
max_length = schema.get("maxLength", 100)
enum_values = schema.get("enum")
if enum_values:
return random.choice(enum_values)
actual_max = max(min_length, max_length)
length = random.randint(min_length, actual_max) if actual_max > 0 else 10
length = max(length, 5)
return self.faker.text(max_nb_chars=length)
def _generate_by_pattern(self, pattern: str) -> str:
"""Generate a string matching a regex pattern.
Args:
pattern: Regex pattern.
Returns:
Generated string matching the pattern.
"""
if pattern == r"^[a-zA-Z0-9_-]{1,50}$":
return self.faker.slug()
return self.faker.word()
def _generate_integer(self, schema: dict[str, Any]) -> int:
"""Generate an integer value based on the schema.
Args:
schema: Integer schema definition.
Returns:
Generated integer value.
"""
minimum = schema.get("minimum", 0)
maximum = schema.get("maximum", 1000)
multiple_of = schema.get("multipleOf", 1)
value = random.randint(minimum, maximum)
if multiple_of > 1:
value = (value // multiple_of) * multiple_of
return value
def _generate_number(self, schema: dict[str, Any]) -> float:
"""Generate a number value based on the schema.
Args:
schema: Number schema definition.
Returns:
Generated number value.
"""
minimum = schema.get("minimum", 0.0)
maximum = schema.get("maximum", 1000.0)
multiple_of = schema.get("multipleOf", 1.0)
value = random.uniform(minimum, maximum)
if multiple_of > 1:
value = round(value / multiple_of) * multiple_of
return round(value, 2)
def _generate_boolean(self, schema: dict[str, Any]) -> bool:
"""Generate a boolean value based on the schema.
Args:
schema: Boolean schema definition.
Returns:
Generated boolean value.
"""
return random.choice([True, False])
def _generate_array(self, schema: dict[str, Any]) -> list[Any]:
"""Generate an array based on the schema.
Args:
schema: Array schema definition.
Returns:
Generated array of values.
"""
items_schema = schema.get("items", {})
min_items = schema.get("minItems", 0)
max_items = schema.get("maxItems", 10)
enum_values = schema.get("enum")
count = random.randint(min_items, max_items)
if enum_values:
return [random.choice(enum_values) for _ in range(count)]
return [self.generate(items_schema) for _ in range(count)]
def _generate_object(self, schema: dict[str, Any]) -> dict[str, Any]:
"""Generate an object based on the schema.
Args:
schema: Object schema definition.
Returns:
Generated object as a dictionary.
"""
properties = schema.get("properties", {})
required_fields = schema.get("required", [])
additional_properties = schema.get("additionalProperties", True)
result: dict[str, Any] = {}
for prop_name, prop_schema in properties.items():
if prop_name in required_fields or additional_properties:
value = self.generate(prop_schema)
result[prop_name] = self._convert_datetime(value)
return result
def _convert_datetime(self, value: Any) -> Any:
"""Convert datetime objects to ISO format strings.
Args:
value: The value to convert.
Returns:
Converted value.
"""
if isinstance(value, datetime):
return value.isoformat()
elif isinstance(value, date):
return value.isoformat()
return value
def _generate_null(self, schema: dict[str, Any]) -> None:
"""Generate a null value.
Args:
schema: Null schema definition.
Returns:
None.
"""
return None
def _generate_default(self, schema: dict[str, Any]) -> Any:
"""Default generator for unknown types.
Args:
schema: Schema definition.
Returns:
None as default value.
"""
return None
def _resolve_ref(
self, ref: str, schema: dict[str, Any]
) -> Any:
"""Resolve a $ref reference.
Args:
ref: The reference string.
schema: The current schema containing the reference.
Returns:
The resolved value.
"""
if ref.startswith("#/components/schemas/"):
ref_name = ref.split("/")[-1]
return schema.get("resolved_schemas", {}).get(ref_name, {})
return None
def set_resolved_schemas(
self, schemas: dict[str, Any]
) -> None:
"""Set resolved schema references.
Args:
schemas: Dictionary of resolved schema names to schema objects.
"""
self._resolved_schemas = schemas
def generate_response(
self,
schema: dict[str, Any],
status_code: int = 200,
headers: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Generate a complete HTTP response.
Args:
schema: Response schema definition.
status_code: HTTP status code.
headers: Optional response headers.
Returns:
Complete response dictionary.
"""
response_body = self.generate(schema)
response: dict[str, Any] = {
"status_code": status_code,
"body": response_body,
}
if headers:
response["headers"] = headers
return response
def generate_list_response(
self,
item_schema: dict[str, Any],
count: int | None = None,
status_code: int = 200,
) -> dict[str, Any]:
"""Generate a list/array response.
Args:
item_schema: Schema for array items.
count: Number of items to generate (random if not specified).
status_code: HTTP status code.
Returns:
Complete response with array body.
"""
if count is None:
count = random.randint(1, 10)
items = [self.generate(item_schema) for _ in range(count)]
return {
"status_code": status_code,
"body": items,
}
def generate_error_response(
self,
message: str,
status_code: int = 400,
error_type: str = "Bad Request",
) -> dict[str, Any]:
"""Generate an error response.
Args:
message: Error message.
status_code: HTTP status code.
error_type: Type of error.
Returns:
Error response dictionary.
"""
return {
"status_code": status_code,
"body": {
"error": {
"type": error_type,
"message": message,
}
},
}