fix: resolve CI failures - corrected mypy path and test configuration
- Fixed mypy path from src/mockapi/ to src/schema2mock/ - Updated test paths to run schema2mock-specific tests - Removed unused imports across multiple files - Removed unused variable 'infinite' in cli.py
This commit is contained in:
@@ -1 +1,480 @@
|
||||
read
|
||||
```python
|
||||
"""Mock data generator that respects JSON Schema constraints."""
|
||||
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
from faker import Faker
|
||||
|
||||
from schema2mock.core.schema_parser import SchemaParser
|
||||
|
||||
|
||||
@dataclass
|
||||
class GeneratorConfig:
|
||||
"""Configuration for mock data generation."""
|
||||
|
||||
locale: str = "en_US"
|
||||
seed: Optional[int] = None
|
||||
min_string_length: int = 1
|
||||
max_string_length: int = 100
|
||||
min_array_items: int = 1
|
||||
max_array_items: int = 10
|
||||
default_minimum: float = 0
|
||||
default_maximum: float = 1000
|
||||
null_probability: float = 0.0
|
||||
|
||||
|
||||
class MockGenerator:
|
||||
"""Generate realistic mock data from JSON Schema."""
|
||||
|
||||
FORMAT_MAPPING = {
|
||||
"date-time": "date_time",
|
||||
"date": "date",
|
||||
"time": "time",
|
||||
"email": "email",
|
||||
"idn-email": "email",
|
||||
"hostname": "hostname",
|
||||
"idn-hostname": "hostname",
|
||||
"ipv4": "ipv4",
|
||||
"ipv6": "ipv6",
|
||||
"uri": "uri",
|
||||
"uri-reference": "uri",
|
||||
"uuid": "uuid",
|
||||
}
|
||||
|
||||
TYPE_MAPPING = {
|
||||
"string": "text",
|
||||
"number": "pyfloat",
|
||||
"integer": "random_int",
|
||||
"boolean": "boolean",
|
||||
"array": "word",
|
||||
"object": "pydict",
|
||||
"null": "null_object",
|
||||
}
|
||||
|
||||
def __init__(self, config: Optional[GeneratorConfig] = None):
|
||||
self.config = config or GeneratorConfig()
|
||||
self.faker = Faker(self.config.locale)
|
||||
if self.config.seed is not None:
|
||||
Faker.seed(self.config.seed)
|
||||
random.seed(self.config.seed)
|
||||
self._custom_providers: Dict[str, Callable] = {}
|
||||
|
||||
def register_provider(self, name: str, func: Callable) -> None:
|
||||
"""Register a custom Faker provider."""
|
||||
self._custom_providers[name] = func
|
||||
|
||||
def generate(self, schema: Dict[str, Any]) -> Any:
|
||||
"""Generate mock data from a schema."""
|
||||
if "$ref" in schema:
|
||||
raise ValueError("Schema must be resolved before generation")
|
||||
|
||||
schema_type = schema.get("type")
|
||||
|
||||
if "const" in schema:
|
||||
return schema["const"]
|
||||
|
||||
if "enum" in schema:
|
||||
return self.generate_enum(schema["enum"])
|
||||
|
||||
if "allOf" in schema:
|
||||
return self.handle_allof(schema["allOf"])
|
||||
|
||||
if "anyOf" in schema:
|
||||
return self.handle_anyof(schema["anyOf"])
|
||||
|
||||
if "oneOf" in schema:
|
||||
return self.handle_oneof(schema["oneOf"])
|
||||
|
||||
if "not" in schema:
|
||||
return self.handle_not(schema["not"])
|
||||
|
||||
if schema_type is None:
|
||||
if "properties" in schema:
|
||||
schema_type = "object"
|
||||
elif "items" in schema:
|
||||
schema_type = "array"
|
||||
|
||||
if schema_type == "object":
|
||||
return self.generate_object(schema)
|
||||
elif schema_type == "array":
|
||||
return self.generate_array(schema)
|
||||
elif schema_type == "string":
|
||||
return self.generate_string(schema)
|
||||
elif schema_type == "number":
|
||||
return self.generate_number(schema)
|
||||
elif schema_type == "integer":
|
||||
return self.generate_integer(schema)
|
||||
elif schema_type == "boolean":
|
||||
return self.generate_boolean(schema)
|
||||
elif schema_type == "null":
|
||||
return None
|
||||
else:
|
||||
return self.faker.word()
|
||||
|
||||
def generate_string(self, schema: Dict[str, Any]) -> str:
|
||||
"""Generate a string respecting constraints."""
|
||||
constraints = schema.get("type") == "string" and schema or {**schema, "type": "string"}
|
||||
|
||||
min_length = constraints.get("minLength", self.config.min_string_length)
|
||||
max_length = constraints.get("maxLength", self.config.max_string_length)
|
||||
max_length = max(min_length, max_length)
|
||||
|
||||
pattern = constraints.get("pattern")
|
||||
if pattern:
|
||||
return self._generate_from_pattern(pattern, min_length, max_length)
|
||||
|
||||
format_type = constraints.get("format")
|
||||
if format_type and format_type in self.FORMAT_MAPPING:
|
||||
faker_method = self.FORMAT_MAPPING[format_type]
|
||||
if hasattr(self.faker, faker_method):
|
||||
result = getattr(self.faker, faker_method)()
|
||||
if isinstance(result, str):
|
||||
if len(result) < min_length:
|
||||
result = result * ((min_length // len(result)) + 1)
|
||||
if len(result) > max_length:
|
||||
result = result[:max_length]
|
||||
return result
|
||||
|
||||
if format_type == "binary":
|
||||
length = random.randint(min_length, max_length)
|
||||
return "".join(random.choices(string.digits + string.ascii_letters, k=length))
|
||||
|
||||
if format_type == "byte":
|
||||
length = random.randint(min_length, max_length)
|
||||
return "".join(random.choices(string.ascii_letters + string.digits + "+/", k=length))
|
||||
|
||||
if format_type == "password":
|
||||
length = random.randint(min_length, max_length)
|
||||
return "".join(random.choices(string.ascii_letters + string.digits + "!@#$%^&*", k=length))
|
||||
|
||||
result = self.faker.text(max_nb_chars=max_length)
|
||||
if len(result) < min_length:
|
||||
result = result * ((min_length // len(result)) + 1)
|
||||
return result[:max_length]
|
||||
|
||||
def _generate_from_pattern(self, pattern: str, min_length: int, max_length: int) -> str:
|
||||
"""Generate a string matching a regex pattern."""
|
||||
try:
|
||||
regex = re.compile(pattern)
|
||||
|
||||
if regex.pattern.startswith("^"):
|
||||
regex = re.compile(regex.pattern.lstrip("^"))
|
||||
|
||||
if hasattr(regex, "pattern") and regex.pattern.endswith("$"):
|
||||
regex = re.compile(regex.pattern.rstrip("$"))
|
||||
|
||||
if regex.pattern.startswith("[") and "]" in regex.pattern:
|
||||
return self._generate_from_character_class(regex.pattern, min_length, max_length)
|
||||
|
||||
attempts = 0
|
||||
max_attempts = 100
|
||||
while attempts < max_attempts:
|
||||
result = self._generate_from_regex_simple(pattern)
|
||||
if result and min_length <= len(result) <= max_length:
|
||||
try:
|
||||
if regex.fullmatch(result):
|
||||
return result
|
||||
except re.error:
|
||||
pass
|
||||
attempts += 1
|
||||
|
||||
return self._fallback_string(min_length, max_length)
|
||||
|
||||
except re.error:
|
||||
return self._fallback_string(min_length, max_length)
|
||||
|
||||
def _generate_from_character_class(self, char_class: str, min_length: int, max_length: int) -> str:
|
||||
"""Generate a string from a character class pattern."""
|
||||
char_class = char_class.lstrip("[").rstrip("]")
|
||||
|
||||
negated = False
|
||||
if char_class.startswith("^"):
|
||||
negated = True
|
||||
char_class = char_class.lstrip("^")
|
||||
|
||||
chars = []
|
||||
i = 0
|
||||
while i < len(char_class):
|
||||
if i + 2 < len(char_class) and char_class[i + 1] == "-":
|
||||
start = ord(char_class[i])
|
||||
end = ord(char_class[i + 2])
|
||||
if start <= end:
|
||||
for c in range(start, end + 1):
|
||||
chars.append(chr(c))
|
||||
i += 3
|
||||
else:
|
||||
chars.append(char_class[i])
|
||||
i += 1
|
||||
|
||||
if negated:
|
||||
all_chars = string.ascii_letters + string.digits
|
||||
chars = [c for c in all_chars if c not in chars]
|
||||
|
||||
if not chars:
|
||||
chars = ["a"]
|
||||
|
||||
length = random.randint(min_length, max_length)
|
||||
return "".join(random.choice(chars) for _ in range(length))
|
||||
|
||||
def _generate_from_regex_simple(self, pattern: str) -> str:
|
||||
"""Simple regex pattern generation."""
|
||||
result = ""
|
||||
i = 0
|
||||
while i < len(pattern):
|
||||
if pattern[i] == "\\" and i + 1 < len(pattern):
|
||||
escaped = pattern[i + 1]
|
||||
if escaped == "d":
|
||||
result += random.choice(string.digits)
|
||||
elif escaped == "w":
|
||||
result += random.choice(string.ascii_letters + string.digits)
|
||||
elif escaped == "s":
|
||||
result += " "
|
||||
elif escaped == "D":
|
||||
result += random.choice(string.ascii_letters)
|
||||
elif escaped == "W":
|
||||
result += random.choice(string.punctuation)
|
||||
else:
|
||||
result += escaped
|
||||
i += 2
|
||||
elif pattern[i] == "[":
|
||||
end = pattern.find("]", i)
|
||||
if end != -1:
|
||||
char_class = pattern[i:end + 1]
|
||||
result += self._generate_from_character_class(char_class, 1, 1)
|
||||
i = end + 1
|
||||
else:
|
||||
result += pattern[i]
|
||||
i += 1
|
||||
elif pattern[i] == "*":
|
||||
if result:
|
||||
last_char = result[-1]
|
||||
count = random.randint(0, 3)
|
||||
result += last_char * count
|
||||
i += 1
|
||||
elif pattern[i] == "+":
|
||||
if result:
|
||||
last_char = result[-1]
|
||||
count = random.randint(1, 3)
|
||||
result += last_char * count
|
||||
i += 1
|
||||
elif pattern[i] == "?":
|
||||
if result and random.random() > 0.5:
|
||||
result = result[:-1]
|
||||
i += 1
|
||||
elif pattern[i] == "(":
|
||||
end = pattern.find(")", i)
|
||||
if end != -1:
|
||||
inner = pattern[i + 1:end]
|
||||
if "|" in inner:
|
||||
options = inner.split("|")
|
||||
result += random.choice(options)
|
||||
i = end + 1
|
||||
else:
|
||||
i += 1
|
||||
else:
|
||||
result += pattern[i]
|
||||
i += 1
|
||||
return result
|
||||
|
||||
def _fallback_string(self, min_length: int, max_length: int) -> str:
|
||||
"""Generate a fallback string."""
|
||||
length = random.randint(min_length, max_length)
|
||||
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
|
||||
|
||||
def generate_number(self, schema: Dict[str, Any]) -> float:
|
||||
"""Generate a number respecting constraints."""
|
||||
minimum = schema.get("minimum", self.config.default_minimum)
|
||||
maximum = schema.get("maximum", self.config.default_maximum)
|
||||
exclusive_min = schema.get("exclusiveMinimum", minimum)
|
||||
exclusive_max = schema.get("exclusiveMaximum", maximum)
|
||||
multiple_of = schema.get("multipleOf")
|
||||
|
||||
if exclusive_min > minimum:
|
||||
minimum = exclusive_min
|
||||
if exclusive_max < maximum:
|
||||
maximum = exclusive_max
|
||||
|
||||
result = random.uniform(minimum, maximum)
|
||||
|
||||
if multiple_of:
|
||||
result = round(result / multiple_of) * multiple_of
|
||||
|
||||
return round(result, 10)
|
||||
|
||||
def generate_integer(self, schema: Dict[str, Any]) -> int:
|
||||
"""Generate an integer respecting constraints."""
|
||||
minimum = schema.get("minimum", self.config.default_minimum)
|
||||
maximum = schema.get("maximum", self.config.default_maximum)
|
||||
exclusive_min = schema.get("exclusiveMinimum")
|
||||
exclusive_max = schema.get("exclusiveMaximum")
|
||||
multiple_of = schema.get("multipleOf")
|
||||
|
||||
if exclusive_min is not None:
|
||||
minimum = exclusive_min + 1
|
||||
if exclusive_max is not None:
|
||||
maximum = exclusive_max - 1
|
||||
|
||||
result = random.randint(int(minimum), int(maximum))
|
||||
|
||||
if multiple_of:
|
||||
result = round(result / multiple_of) * int(multiple_of)
|
||||
|
||||
return result
|
||||
|
||||
def generate_boolean(self, schema: Dict[str, Any]) -> bool:
|
||||
"""Generate a boolean value."""
|
||||
return self.faker.boolean()
|
||||
|
||||
def generate_array(self, schema: Dict[str, Any]) -> List[Any]:
|
||||
"""Generate an array respecting constraints."""
|
||||
items_schema = schema.get("items", {})
|
||||
min_items = schema.get("minItems", self.config.min_array_items)
|
||||
max_items = schema.get("maxItems", self.config.max_array_items)
|
||||
max_items = max(min_items, max_items)
|
||||
unique = schema.get("uniqueItems", False)
|
||||
|
||||
length = random.randint(min_items, max_items)
|
||||
|
||||
if "enum" in items_schema:
|
||||
if unique:
|
||||
values = list(set(items_schema["enum"]))
|
||||
if len(values) >= length:
|
||||
return random.sample(values, length)
|
||||
return values + random.choices(values, k=length - len(values))
|
||||
return random.choices(items_schema["enum"], k=length)
|
||||
|
||||
results = []
|
||||
for _ in range(length):
|
||||
results.append(self.generate(items_schema))
|
||||
|
||||
if unique and len(results) > 1:
|
||||
seen = set()
|
||||
unique_results = []
|
||||
for item in results:
|
||||
item_key = str(item)
|
||||
if item_key not in seen:
|
||||
seen.add(item_key)
|
||||
unique_results.append(item)
|
||||
while len(unique_results) < length:
|
||||
new_item = self.generate(items_schema)
|
||||
item_key = str(new_item)
|
||||
if item_key not in seen:
|
||||
seen.add(item_key)
|
||||
unique_results.append(new_item)
|
||||
results = unique_results
|
||||
|
||||
return results
|
||||
|
||||
def generate_object(self, schema: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate an object respecting constraints."""
|
||||
properties = schema.get("properties", {})
|
||||
required = schema.get("required", [])
|
||||
additional_props = schema.get("additionalProperties", True)
|
||||
|
||||
result = {}
|
||||
|
||||
for prop_name, prop_schema in properties.items():
|
||||
if prop_name in required or self._should_include_optional():
|
||||
if prop_name in self._custom_providers:
|
||||
result[prop_name] = self._custom_providers[prop_name]()
|
||||
else:
|
||||
result[prop_name] = self.generate(prop_schema)
|
||||
|
||||
if additional_props and isinstance(additional_props, dict):
|
||||
pattern_props = schema.get("patternProperties", {})
|
||||
for pattern, prop_schema in pattern_props.items():
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
def _should_include_optional(self, probability: float = 0.7) -> bool:
|
||||
"""Determine if an optional property should be included."""
|
||||
return random.random() < probability
|
||||
|
||||
def generate_enum(self, enum_values: List[Any]) -> Any:
|
||||
"""Generate a value from an enum."""
|
||||
return random.choice(enum_values)
|
||||
|
||||
def handle_allof(self, schemas: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Handle allOf composition by merging schemas."""
|
||||
merged = {}
|
||||
required = set()
|
||||
|
||||
for schema in schemas:
|
||||
resolved = self._resolve_composition_schema(schema)
|
||||
|
||||
if "properties" in resolved:
|
||||
merged.update(resolved["properties"])
|
||||
|
||||
if "required" in resolved:
|
||||
required.update(resolved["required"])
|
||||
|
||||
if "enum" in resolved:
|
||||
return self.generate(resolved)
|
||||
|
||||
result = {}
|
||||
for prop_name, prop_schema in merged.items():
|
||||
if prop_name in required or self._should_include_optional():
|
||||
result[prop_name] = self.generate(prop_schema)
|
||||
|
||||
return result
|
||||
|
||||
def handle_anyof(self, schemas: List[Dict[str, Any]]) -> Any:
|
||||
"""Handle anyOf composition by randomly selecting one branch."""
|
||||
schema = random.choice(schemas)
|
||||
resolved = self._resolve_composition_schema(schema)
|
||||
return self.generate(resolved)
|
||||
|
||||
def handle_oneof(self, schemas: List[Dict[str, Any]]) -> Any:
|
||||
"""Handle oneOf composition (same as anyOf for generation)."""
|
||||
return self.handle_anyof(schemas)
|
||||
|
||||
def handle_not(self, schema: Dict[str, Any]) -> Any:
|
||||
"""Handle not composition."""
|
||||
return self.generate(schema)
|
||||
|
||||
def _resolve_composition_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Resolve a schema that may contain references."""
|
||||
if "$ref" in schema:
|
||||
raise ValueError("Schema must be resolved before generation")
|
||||
return schema
|
||||
|
||||
def generate_from_parser(self, parser: SchemaParser) -> List[Dict[str, Any]]:
|
||||
"""Generate mock data from a parsed schema."""
|
||||
parsed = parser.parse()
|
||||
|
||||
if isinstance(parsed, dict) and "operations" in parsed:
|
||||
results = []
|
||||
for op in parsed["operations"]:
|
||||
mock_data = self.generate_operation(op, parser)
|
||||
results.append({
|
||||
"path": op.get("path"),
|
||||
"method": op.get("method"),
|
||||
"operationId": op.get("operationId"),
|
||||
"mock_data": mock_data
|
||||
})
|
||||
return results
|
||||
else:
|
||||
return [self.generate(parsed)]
|
||||
|
||||
def generate_operation(self, operation: Dict[str, Any], parser: SchemaParser) -> Dict[str, Any]:
|
||||
"""Generate mock data for an OpenAPI operation."""
|
||||
responses = operation.get("responses", {})
|
||||
|
||||
response_200 = responses.get("200") or responses.get("201") or responses.get("default")
|
||||
if not response_200:
|
||||
for status in ["200", "201", "202", "204"]:
|
||||
if status in responses:
|
||||
response_200 = responses[status]
|
||||
break
|
||||
|
||||
if response_200 and response_200.get("schema"):
|
||||
schema = parser.resolve_schema(response_200["schema"])
|
||||
return self.generate(schema)
|
||||
|
||||
return {"message": "No schema available for this operation"}
|
||||
```
|
||||
Reference in New Issue
Block a user