Add source files: generator, validator, fuzzer modules
Some checks failed
CI / test (push) Failing after 14s
Some checks failed
CI / test (push) Failing after 14s
This commit is contained in:
361
src/core/fuzzer.py
Normal file
361
src/core/fuzzer.py
Normal file
@@ -0,0 +1,361 @@
|
|||||||
|
"""Fuzzer for generating edge case responses."""
|
||||||
|
|
||||||
|
import random
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
class FuzzerError(Exception):
|
||||||
|
"""Base exception for fuzzer errors."""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Fuzzer:
|
||||||
|
"""Generates edge case data for resilience testing."""
|
||||||
|
|
||||||
|
NULL_VALUES = [None, "", [], {}, "null", "NULL", "Null"]
|
||||||
|
|
||||||
|
INVALID_STRINGS = [
|
||||||
|
"",
|
||||||
|
" ",
|
||||||
|
"\n\t",
|
||||||
|
"undefined",
|
||||||
|
"NaN",
|
||||||
|
"Infinity",
|
||||||
|
"-Infinity",
|
||||||
|
"true",
|
||||||
|
"false",
|
||||||
|
"123abc",
|
||||||
|
"<script>alert('xss')</script>",
|
||||||
|
"' OR '1'='1",
|
||||||
|
"../../../etc/passwd",
|
||||||
|
"\x00\x00\x00",
|
||||||
|
]
|
||||||
|
|
||||||
|
INVALID_NUMBERS = [
|
||||||
|
0,
|
||||||
|
-1,
|
||||||
|
-9999999,
|
||||||
|
9999999,
|
||||||
|
float("inf"),
|
||||||
|
float("-inf"),
|
||||||
|
float("nan"),
|
||||||
|
1e308,
|
||||||
|
-1e308,
|
||||||
|
]
|
||||||
|
|
||||||
|
BOUNDARY_VALUES = [
|
||||||
|
-2147483649,
|
||||||
|
-2147483648,
|
||||||
|
2147483647,
|
||||||
|
2147483648,
|
||||||
|
-32769,
|
||||||
|
-32768,
|
||||||
|
32767,
|
||||||
|
32768,
|
||||||
|
-1,
|
||||||
|
0,
|
||||||
|
1,
|
||||||
|
]
|
||||||
|
|
||||||
|
ARRAY_BOUNDARIES = [
|
||||||
|
[],
|
||||||
|
[None],
|
||||||
|
[None] * 100,
|
||||||
|
[{}] * 50,
|
||||||
|
[{"a": None}] * 25,
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(self, seed: int | None = None) -> None:
|
||||||
|
"""Initialize the fuzzer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
seed: Optional seed for reproducible fuzzing.
|
||||||
|
"""
|
||||||
|
if seed is not None:
|
||||||
|
random.seed(seed)
|
||||||
|
|
||||||
|
def fuzz_string(self, schema: dict[str, Any]) -> Any:
|
||||||
|
"""Fuzz a string field with edge cases.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: String schema definition.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed string value.
|
||||||
|
"""
|
||||||
|
pattern = schema.get("pattern")
|
||||||
|
|
||||||
|
if pattern and random.random() < 0.3:
|
||||||
|
return random.choice(self.INVALID_STRINGS)
|
||||||
|
|
||||||
|
min_length = schema.get("minLength", 0)
|
||||||
|
max_length = schema.get("maxLength", 100)
|
||||||
|
|
||||||
|
if min_length > 0 and random.random() < 0.2:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
if random.random() < 0.1:
|
||||||
|
return random.choice(self.INVALID_STRINGS)
|
||||||
|
|
||||||
|
if random.random() < 0.05:
|
||||||
|
return "a" * (max_length + 1)
|
||||||
|
|
||||||
|
length = max(0, min_length)
|
||||||
|
return "x" * length
|
||||||
|
|
||||||
|
def fuzz_integer(self, schema: dict[str, Any]) -> int:
|
||||||
|
"""Fuzz an integer field with boundary values.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Integer schema definition.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed integer value.
|
||||||
|
"""
|
||||||
|
minimum = schema.get("minimum", -2147483648)
|
||||||
|
maximum = schema.get("maximum", 2147483647)
|
||||||
|
multiple_of = schema.get("multipleOf", 1)
|
||||||
|
|
||||||
|
if random.random() < 0.2:
|
||||||
|
if random.random() < 0.5:
|
||||||
|
return minimum - 1
|
||||||
|
else:
|
||||||
|
return maximum + 1
|
||||||
|
|
||||||
|
if random.random() < 0.15:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if random.random() < 0.1:
|
||||||
|
return random.choice(self.BOUNDARY_VALUES)
|
||||||
|
|
||||||
|
value = random.randint(minimum, min(maximum, minimum + 1000))
|
||||||
|
|
||||||
|
if multiple_of > 1:
|
||||||
|
value = (value // multiple_of) * multiple_of
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
def fuzz_number(self, schema: dict[str, Any]) -> float:
|
||||||
|
"""Fuzz a number field with special values.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Number schema definition.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed number value.
|
||||||
|
"""
|
||||||
|
if random.random() < 0.2:
|
||||||
|
return random.choice(self.INVALID_NUMBERS)
|
||||||
|
|
||||||
|
minimum = schema.get("minimum", 0.0)
|
||||||
|
maximum = schema.get("maximum", 1000.0)
|
||||||
|
|
||||||
|
value = random.uniform(minimum, maximum)
|
||||||
|
|
||||||
|
if random.random() < 0.1:
|
||||||
|
value = round(value, 10)
|
||||||
|
|
||||||
|
return value
|
||||||
|
|
||||||
|
def fuzz_boolean(self, schema: dict[str, Any]) -> Any:
|
||||||
|
"""Fuzz a boolean field with invalid values.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Boolean schema definition.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed value (not necessarily boolean).
|
||||||
|
"""
|
||||||
|
if random.random() < 0.3:
|
||||||
|
return random.choice([0, 1, "true", "false", "yes", "no", ""])
|
||||||
|
|
||||||
|
return random.choice([True, False])
|
||||||
|
|
||||||
|
def fuzz_array(self, schema: dict[str, Any]) -> Any:
|
||||||
|
"""Fuzz an array field with edge cases.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Array schema definition.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed array value.
|
||||||
|
"""
|
||||||
|
min_items = schema.get("minItems", 0)
|
||||||
|
max_items = schema.get("maxItems", 10)
|
||||||
|
|
||||||
|
if random.random() < 0.2:
|
||||||
|
return random.choice(self.ARRAY_BOUNDARIES)
|
||||||
|
|
||||||
|
if min_items > 0 and random.random() < 0.2:
|
||||||
|
return []
|
||||||
|
|
||||||
|
count = random.randint(0, max(min_items, max_items + 5))
|
||||||
|
return [None] * count
|
||||||
|
|
||||||
|
def fuzz_object(self, schema: dict[str, Any]) -> dict[str, Any] | None:
|
||||||
|
"""Fuzz an object field with edge cases.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Object schema definition.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed object value.
|
||||||
|
"""
|
||||||
|
properties = schema.get("properties", {})
|
||||||
|
required_fields = schema.get("required", [])
|
||||||
|
additional_properties = schema.get("additionalProperties", True)
|
||||||
|
|
||||||
|
result: dict[str, Any] = {}
|
||||||
|
|
||||||
|
for prop_name, prop_schema in properties.items():
|
||||||
|
if prop_name in required_fields:
|
||||||
|
result[prop_name] = self.fuzz_schema(prop_schema)
|
||||||
|
elif additional_properties and random.random() < 0.5:
|
||||||
|
result[prop_name] = self.fuzz_schema(prop_schema)
|
||||||
|
|
||||||
|
if random.random() < 0.2:
|
||||||
|
result["__fuzz__"] = "injected"
|
||||||
|
|
||||||
|
if random.random() < 0.1:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def fuzz_null(self, schema: dict[str, Any]) -> Any:
|
||||||
|
"""Fuzz a null field.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Null schema definition.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Null value or injected null.
|
||||||
|
"""
|
||||||
|
if random.random() < 0.7:
|
||||||
|
return None
|
||||||
|
return random.choice(self.NULL_VALUES)
|
||||||
|
|
||||||
|
def fuzz_schema(self, schema: dict[str, Any]) -> Any:
|
||||||
|
"""Fuzz a complete schema with edge cases.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: JSON Schema definition.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed value matching the schema type.
|
||||||
|
"""
|
||||||
|
if not schema:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if "$ref" in schema:
|
||||||
|
return self.fuzz_schema(schema.get("resolved", {}))
|
||||||
|
|
||||||
|
schema_type = schema.get("type", "string").lower()
|
||||||
|
|
||||||
|
if random.random() < 0.05:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if schema_type == "string":
|
||||||
|
return self.fuzz_string(schema)
|
||||||
|
elif schema_type == "integer":
|
||||||
|
return self.fuzz_integer(schema)
|
||||||
|
elif schema_type == "number":
|
||||||
|
return self.fuzz_number(schema)
|
||||||
|
elif schema_type == "boolean":
|
||||||
|
return self.fuzz_boolean(schema)
|
||||||
|
elif schema_type == "array":
|
||||||
|
return self.fuzz_array(schema)
|
||||||
|
elif schema_type == "object":
|
||||||
|
return self.fuzz_object(schema)
|
||||||
|
elif schema_type == "null":
|
||||||
|
return self.fuzz_null(schema)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def fuzz_request_body(self, schema: dict[str, Any] | None) -> Any:
|
||||||
|
"""Fuzz a request body schema.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Request body schema.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed request body.
|
||||||
|
"""
|
||||||
|
if schema is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return self.fuzz_schema(schema)
|
||||||
|
|
||||||
|
def fuzz_response(
|
||||||
|
self, schema: dict[str, Any] | None, status_code: int = 200
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Generate a complete fuzzed response.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Response schema.
|
||||||
|
status_code: Response status code.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Fuzzed response dictionary.
|
||||||
|
"""
|
||||||
|
body = self.fuzz_schema(schema) if schema else None
|
||||||
|
|
||||||
|
if random.random() < 0.1:
|
||||||
|
body = None
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status_code": status_code,
|
||||||
|
"body": body,
|
||||||
|
}
|
||||||
|
|
||||||
|
def inject_null_values(self, data: Any, depth: int = 0, max_depth: int = 3) -> Any:
|
||||||
|
"""Inject null values into data structure.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Data structure to inject nulls into.
|
||||||
|
depth: Current recursion depth.
|
||||||
|
max_depth: Maximum recursion depth.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Data with injected nulls.
|
||||||
|
"""
|
||||||
|
if depth >= max_depth:
|
||||||
|
return data
|
||||||
|
|
||||||
|
if isinstance(data, dict):
|
||||||
|
result = {}
|
||||||
|
for key, value in data.items():
|
||||||
|
if random.random() < 0.1:
|
||||||
|
result[key] = None
|
||||||
|
else:
|
||||||
|
result[key] = self.inject_null_values(value, depth + 1, max_depth)
|
||||||
|
return result
|
||||||
|
|
||||||
|
elif isinstance(data, list):
|
||||||
|
return [
|
||||||
|
self.inject_null_values(item, depth + 1, max_depth)
|
||||||
|
for item in data
|
||||||
|
]
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
def create_edge_case_response(
|
||||||
|
self, schema: dict[str, Any] | None = None
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Create an edge case response for testing.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema: Optional schema to base the response on.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Edge case response dictionary.
|
||||||
|
"""
|
||||||
|
if schema:
|
||||||
|
body = self.fuzz_schema(schema)
|
||||||
|
else:
|
||||||
|
body = {"message": "edge case test", "data": None}
|
||||||
|
|
||||||
|
return {
|
||||||
|
"status_code": random.choice([200, 400, 404, 500]),
|
||||||
|
"body": body,
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user