diff --git a/src/core/fuzzer.py b/src/core/fuzzer.py new file mode 100644 index 0000000..01c9e4f --- /dev/null +++ b/src/core/fuzzer.py @@ -0,0 +1,361 @@ +"""Fuzzer for generating edge case responses.""" + +import random +from typing import Any + + +class FuzzerError(Exception): + """Base exception for fuzzer errors.""" + + pass + + +class Fuzzer: + """Generates edge case data for resilience testing.""" + + NULL_VALUES = [None, "", [], {}, "null", "NULL", "Null"] + + INVALID_STRINGS = [ + "", + " ", + "\n\t", + "undefined", + "NaN", + "Infinity", + "-Infinity", + "true", + "false", + "123abc", + "", + "' OR '1'='1", + "../../../etc/passwd", + "\x00\x00\x00", + ] + + INVALID_NUMBERS = [ + 0, + -1, + -9999999, + 9999999, + float("inf"), + float("-inf"), + float("nan"), + 1e308, + -1e308, + ] + + BOUNDARY_VALUES = [ + -2147483649, + -2147483648, + 2147483647, + 2147483648, + -32769, + -32768, + 32767, + 32768, + -1, + 0, + 1, + ] + + ARRAY_BOUNDARIES = [ + [], + [None], + [None] * 100, + [{}] * 50, + [{"a": None}] * 25, + ] + + def __init__(self, seed: int | None = None) -> None: + """Initialize the fuzzer. + + Args: + seed: Optional seed for reproducible fuzzing. + """ + if seed is not None: + random.seed(seed) + + def fuzz_string(self, schema: dict[str, Any]) -> Any: + """Fuzz a string field with edge cases. + + Args: + schema: String schema definition. + + Returns: + Fuzzed string value. + """ + pattern = schema.get("pattern") + + if pattern and random.random() < 0.3: + return random.choice(self.INVALID_STRINGS) + + min_length = schema.get("minLength", 0) + max_length = schema.get("maxLength", 100) + + if min_length > 0 and random.random() < 0.2: + return "" + + if random.random() < 0.1: + return random.choice(self.INVALID_STRINGS) + + if random.random() < 0.05: + return "a" * (max_length + 1) + + length = max(0, min_length) + return "x" * length + + def fuzz_integer(self, schema: dict[str, Any]) -> int: + """Fuzz an integer field with boundary values. + + Args: + schema: Integer schema definition. + + Returns: + Fuzzed integer value. + """ + minimum = schema.get("minimum", -2147483648) + maximum = schema.get("maximum", 2147483647) + multiple_of = schema.get("multipleOf", 1) + + if random.random() < 0.2: + if random.random() < 0.5: + return minimum - 1 + else: + return maximum + 1 + + if random.random() < 0.15: + return 0 + + if random.random() < 0.1: + return random.choice(self.BOUNDARY_VALUES) + + value = random.randint(minimum, min(maximum, minimum + 1000)) + + if multiple_of > 1: + value = (value // multiple_of) * multiple_of + + return value + + def fuzz_number(self, schema: dict[str, Any]) -> float: + """Fuzz a number field with special values. + + Args: + schema: Number schema definition. + + Returns: + Fuzzed number value. + """ + if random.random() < 0.2: + return random.choice(self.INVALID_NUMBERS) + + minimum = schema.get("minimum", 0.0) + maximum = schema.get("maximum", 1000.0) + + value = random.uniform(minimum, maximum) + + if random.random() < 0.1: + value = round(value, 10) + + return value + + def fuzz_boolean(self, schema: dict[str, Any]) -> Any: + """Fuzz a boolean field with invalid values. + + Args: + schema: Boolean schema definition. + + Returns: + Fuzzed value (not necessarily boolean). + """ + if random.random() < 0.3: + return random.choice([0, 1, "true", "false", "yes", "no", ""]) + + return random.choice([True, False]) + + def fuzz_array(self, schema: dict[str, Any]) -> Any: + """Fuzz an array field with edge cases. + + Args: + schema: Array schema definition. + + Returns: + Fuzzed array value. + """ + min_items = schema.get("minItems", 0) + max_items = schema.get("maxItems", 10) + + if random.random() < 0.2: + return random.choice(self.ARRAY_BOUNDARIES) + + if min_items > 0 and random.random() < 0.2: + return [] + + count = random.randint(0, max(min_items, max_items + 5)) + return [None] * count + + def fuzz_object(self, schema: dict[str, Any]) -> dict[str, Any] | None: + """Fuzz an object field with edge cases. + + Args: + schema: Object schema definition. + + Returns: + Fuzzed object value. + """ + properties = schema.get("properties", {}) + required_fields = schema.get("required", []) + additional_properties = schema.get("additionalProperties", True) + + result: dict[str, Any] = {} + + for prop_name, prop_schema in properties.items(): + if prop_name in required_fields: + result[prop_name] = self.fuzz_schema(prop_schema) + elif additional_properties and random.random() < 0.5: + result[prop_name] = self.fuzz_schema(prop_schema) + + if random.random() < 0.2: + result["__fuzz__"] = "injected" + + if random.random() < 0.1: + return None + + return result + + def fuzz_null(self, schema: dict[str, Any]) -> Any: + """Fuzz a null field. + + Args: + schema: Null schema definition. + + Returns: + Null value or injected null. + """ + if random.random() < 0.7: + return None + return random.choice(self.NULL_VALUES) + + def fuzz_schema(self, schema: dict[str, Any]) -> Any: + """Fuzz a complete schema with edge cases. + + Args: + schema: JSON Schema definition. + + Returns: + Fuzzed value matching the schema type. + """ + if not schema: + return None + + if "$ref" in schema: + return self.fuzz_schema(schema.get("resolved", {})) + + schema_type = schema.get("type", "string").lower() + + if random.random() < 0.05: + return None + + if schema_type == "string": + return self.fuzz_string(schema) + elif schema_type == "integer": + return self.fuzz_integer(schema) + elif schema_type == "number": + return self.fuzz_number(schema) + elif schema_type == "boolean": + return self.fuzz_boolean(schema) + elif schema_type == "array": + return self.fuzz_array(schema) + elif schema_type == "object": + return self.fuzz_object(schema) + elif schema_type == "null": + return self.fuzz_null(schema) + else: + return None + + def fuzz_request_body(self, schema: dict[str, Any] | None) -> Any: + """Fuzz a request body schema. + + Args: + schema: Request body schema. + + Returns: + Fuzzed request body. + """ + if schema is None: + return None + + return self.fuzz_schema(schema) + + def fuzz_response( + self, schema: dict[str, Any] | None, status_code: int = 200 + ) -> dict[str, Any]: + """Generate a complete fuzzed response. + + Args: + schema: Response schema. + status_code: Response status code. + + Returns: + Fuzzed response dictionary. + """ + body = self.fuzz_schema(schema) if schema else None + + if random.random() < 0.1: + body = None + + return { + "status_code": status_code, + "body": body, + } + + def inject_null_values(self, data: Any, depth: int = 0, max_depth: int = 3) -> Any: + """Inject null values into data structure. + + Args: + data: Data structure to inject nulls into. + depth: Current recursion depth. + max_depth: Maximum recursion depth. + + Returns: + Data with injected nulls. + """ + if depth >= max_depth: + return data + + if isinstance(data, dict): + result = {} + for key, value in data.items(): + if random.random() < 0.1: + result[key] = None + else: + result[key] = self.inject_null_values(value, depth + 1, max_depth) + return result + + elif isinstance(data, list): + return [ + self.inject_null_values(item, depth + 1, max_depth) + for item in data + ] + + return data + + def create_edge_case_response( + self, schema: dict[str, Any] | None = None + ) -> dict[str, Any]: + """Create an edge case response for testing. + + Args: + schema: Optional schema to base the response on. + + Returns: + Edge case response dictionary. + """ + if schema: + body = self.fuzz_schema(schema) + else: + body = {"message": "edge case test", "data": None} + + return { + "status_code": random.choice([200, 400, 404, 500]), + "body": body, + }