This commit is contained in:
@@ -1,146 +1,245 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
import random
|
||||
"""Example generation from OpenAPI schemas."""
|
||||
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
|
||||
from src.core.models import SchemaProperty, Schema
|
||||
|
||||
|
||||
FAKE_DATA = {
|
||||
'names': ['John', 'Jane', 'Bob', 'Alice', 'Charlie', 'Diana', 'Eve', 'Frank'],
|
||||
'domains': ['example.com', 'test.org', 'sample.net', 'demo.io'],
|
||||
'cities': ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix'],
|
||||
'streets': ['Main St', 'Oak Ave', 'Maple Dr', 'Cedar Ln', 'Pine Rd'],
|
||||
'countries': ['USA', 'Canada', 'UK', 'Germany', 'France'],
|
||||
'companies': ['Acme Corp', 'TechStart', 'Global Inc', 'Local LLC', 'Digital Co'],
|
||||
'job_titles': ['Engineer', 'Manager', 'Designer', 'Developer', 'Analyst'],
|
||||
'departments': ['Engineering', 'Marketing', 'Sales', 'HR', 'Finance'],
|
||||
'products': ['Widget', 'Gadget', 'Tool', 'Device', 'Component'],
|
||||
'adjectives': ['Premium', 'Essential', 'Professional', 'Standard', 'Deluxe'],
|
||||
'lorem_words': ['lorem', 'ipsum', 'dolor', 'sit', 'amet', 'consectetur', 'adipiscing', 'elit'],
|
||||
'statuses': ['active', 'pending', 'completed', 'cancelled', 'archived'],
|
||||
'id_prefixes': ['usr_', 'ord_', 'prd_', 'inv_', 'txn_']
|
||||
}
|
||||
class ExampleGenerator:
|
||||
"""Generates realistic examples from OpenAPI schemas."""
|
||||
|
||||
|
||||
def generate_id(prefix: str = None) -> str:
|
||||
prefix = prefix or random.choice(FAKE_DATA['id_prefixes'])
|
||||
return f"{prefix}{random.randint(10000, 99999)}"
|
||||
|
||||
|
||||
def generate_name() -> str:
|
||||
first = random.choice(FAKE_DATA['names'])
|
||||
last = random.choice(FAKE_DATA['names'])
|
||||
return f"{first} {last}"
|
||||
|
||||
|
||||
def generate_email(name: str = None) -> str:
|
||||
name = (name or generate_name()).lower().replace(' ', '.')
|
||||
domain = random.choice(FAKE_DATA['domains'])
|
||||
return f"{name}@{domain}"
|
||||
|
||||
|
||||
def generate_phone() -> str:
|
||||
return f"+1-{random.randint(200, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}"
|
||||
|
||||
|
||||
def generate_address() -> Dict[str, Any]:
|
||||
return {
|
||||
'street': f"{random.randint(100, 9999)} {random.choice(FAKE_DATA['streets'])}",
|
||||
'city': random.choice(FAKE_DATA['cities']),
|
||||
'state': f"{random.choice(['CA', 'NY', 'TX', 'FL', 'IL'])}",
|
||||
'zip': f"{random.randint(10000, 99999)}",
|
||||
'country': random.choice(FAKE_DATA['countries'])
|
||||
TYPE_EXAMPLES = {
|
||||
"integer": 42,
|
||||
"int32": 42,
|
||||
"int64": 9223372036854775807,
|
||||
"number": 3.14,
|
||||
"float": 3.14,
|
||||
"double": 3.14159265359,
|
||||
"string": "example string",
|
||||
"password": "secretpassword123",
|
||||
"email": "user@example.com",
|
||||
"uri": "https://example.com",
|
||||
"uuid": "123e4567-e89b-12d3-a456-426614174000",
|
||||
"date": "2024-01-15",
|
||||
"date-time": "2024-01-15T10:30:00Z",
|
||||
"time": "10:30:00",
|
||||
"byte": "dGVzdA==",
|
||||
"binary": "binary data",
|
||||
"boolean": True,
|
||||
"null": None,
|
||||
}
|
||||
|
||||
|
||||
def generate_company() -> Dict[str, Any]:
|
||||
adj = random.choice(FAKE_DATA['adjectives'])
|
||||
product = random.choice(FAKE_DATA['products'])
|
||||
return {
|
||||
'name': f"{adj} {product} {random.choice(FAKE_DATA['companies'])}",
|
||||
'industry': random.choice(['Technology', 'Healthcare', 'Finance', 'Retail', 'Manufacturing']),
|
||||
'employees': random.randint(10, 10000),
|
||||
'founded': random.randint(1950, 2023)
|
||||
STRING_FORMATS = {
|
||||
"date": lambda: "2024-01-15",
|
||||
"date-time": lambda: "2024-01-15T10:30:00Z",
|
||||
"time": lambda: "10:30:00",
|
||||
"email": lambda: "user@example.com",
|
||||
"uri": lambda: "https://example.com",
|
||||
"hostname": lambda: "example.com",
|
||||
"ipv4": lambda: "192.168.1.1",
|
||||
"ipv6": lambda: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
|
||||
"uuid": lambda: "123e4567-e89b-12d3-a456-426614174000",
|
||||
}
|
||||
|
||||
COUNTER = 0
|
||||
|
||||
def generate_user() -> Dict[str, Any]:
|
||||
return {
|
||||
'id': generate_id('usr_'),
|
||||
'name': generate_name(),
|
||||
'email': generate_email(),
|
||||
'phone': generate_phone(),
|
||||
'address': generate_address(),
|
||||
'created_at': '2024-01-15T10:30:00Z',
|
||||
'status': random.choice(FAKE_DATA['statuses'])
|
||||
}
|
||||
def reset_counter(self) -> None:
|
||||
"""Reset the counter for generating unique examples."""
|
||||
self.COUNTER = 0
|
||||
|
||||
def _get_next_value(self) -> int:
|
||||
self.COUNTER += 1
|
||||
return self.COUNTER
|
||||
|
||||
def generate_product() -> Dict[str, Any]:
|
||||
adj = random.choice(FAKE_DATA['adjectives'])
|
||||
product = random.choice(FAKE_DATA['products'])
|
||||
return {
|
||||
'id': generate_id('prd_'),
|
||||
'name': f"{adj} {product}",
|
||||
'description': ' '.join(random.choices(FAKE_DATA['lorem_words'], k=10)),
|
||||
'price': round(random.uniform(9.99, 999.99), 2),
|
||||
'sku': f"SKU-{random.randint(10000, 99999)}",
|
||||
'in_stock': random.choice([True, False]),
|
||||
'category': random.choice(['Electronics', 'Clothing', 'Home', 'Sports', 'Books'])
|
||||
}
|
||||
def _extract_ref(self, ref: Optional[str]) -> Optional[str]:
|
||||
"""Extract the schema name from a $ref."""
|
||||
if not ref:
|
||||
return None
|
||||
match = re.search(r"#/components/schemas/(\w+)", ref)
|
||||
return match.group(1) if match else None
|
||||
|
||||
def _get_type_default(self, schema: SchemaProperty) -> Any:
|
||||
"""Get a default example value based on type and format."""
|
||||
if schema.enum:
|
||||
return schema.enum[0]
|
||||
|
||||
def generate_order() -> Dict[str, Any]:
|
||||
return {
|
||||
'id': generate_id('ord_'),
|
||||
'customer_id': generate_id('usr_'),
|
||||
'items': [generate_product() for _ in range(random.randint(1, 5))],
|
||||
'total': round(random.uniform(50, 2000), 2),
|
||||
'status': random.choice(FAKE_DATA['statuses']),
|
||||
'created_at': '2024-01-15T14:30:00Z'
|
||||
}
|
||||
|
||||
|
||||
def generate(schema: Dict[str, Any], depth: int = 0) -> Any:
|
||||
if depth > 3:
|
||||
if schema.type is None:
|
||||
return None
|
||||
|
||||
if not schema:
|
||||
if schema.type == "string" and schema.pattern:
|
||||
return "ABC"
|
||||
|
||||
if schema.type == "string" and schema.format:
|
||||
if schema.format in self.STRING_FORMATS:
|
||||
return self.STRING_FORMATS[schema.format]()
|
||||
|
||||
if schema.type in self.TYPE_EXAMPLES:
|
||||
if schema.type == "string" and schema.format == "uuid":
|
||||
return self.TYPE_EXAMPLES["uuid"].replace("00000000", str(self._get_next_value()).zfill(8))
|
||||
return self.TYPE_EXAMPLES[schema.type]
|
||||
|
||||
return None
|
||||
|
||||
schema_type = schema.get('type', 'object')
|
||||
def generate_from_property(self, schema: SchemaProperty) -> Any:
|
||||
"""Generate an example from a single schema property."""
|
||||
if schema.example is not None:
|
||||
return schema.example
|
||||
|
||||
if schema_type == 'object' and 'properties' in schema:
|
||||
if schema.examples:
|
||||
return schema.examples[0]
|
||||
|
||||
if schema.default is not None:
|
||||
return schema.default
|
||||
|
||||
if schema.all_of:
|
||||
merged = {}
|
||||
for s in schema.all_of:
|
||||
merged.update(self.generate_example_from_dict(s.model_dump(mode="json")))
|
||||
return merged
|
||||
|
||||
if schema.one_of:
|
||||
return self.generate_example_from_dict(schema.one_of[0].model_dump(mode="json"))
|
||||
|
||||
if schema.any_of:
|
||||
return self.generate_example_from_dict(schema.any_of[0].model_dump(mode="json"))
|
||||
|
||||
if schema.type == "object":
|
||||
if schema.properties:
|
||||
return self._generate_object(schema.properties)
|
||||
return {}
|
||||
|
||||
if schema.type == "array":
|
||||
if schema.items:
|
||||
return [self.generate_from_property(schema.items)]
|
||||
return []
|
||||
|
||||
return self._get_type_default(schema)
|
||||
|
||||
def _generate_object(self, properties: Dict[str, SchemaProperty]) -> Dict[str, Any]:
|
||||
"""Generate an example object from property definitions."""
|
||||
result = {}
|
||||
for prop_name, prop_schema in schema['properties'].items():
|
||||
required = schema.get('required', [])
|
||||
if prop_name in required or random.choice([True, False]):
|
||||
result[prop_name] = generate(prop_schema, depth + 1)
|
||||
for prop_name, prop_schema in properties.items():
|
||||
if prop_schema.read_only:
|
||||
continue
|
||||
result[prop_name] = self.generate_from_property(prop_schema)
|
||||
return result
|
||||
|
||||
elif schema_type == 'array':
|
||||
item_schema = schema.get('items', {})
|
||||
return [generate(item_schema, depth + 1) for _ in range(random.randint(1, 3))]
|
||||
def generate_example(self, schema: Schema) -> Dict[str, Any]:
|
||||
"""Generate a complete example for a schema."""
|
||||
if schema.example:
|
||||
return schema.example
|
||||
|
||||
elif schema_type == 'string':
|
||||
string_format = schema.get('format')
|
||||
if string_format == 'date-time':
|
||||
return '2024-01-15T10:30:00Z'
|
||||
elif string_format == 'date':
|
||||
return '2024-01-15'
|
||||
elif string_format == 'email':
|
||||
return generate_email()
|
||||
elif string_format == 'uri':
|
||||
return 'https://example.com/api'
|
||||
elif string_format == 'uuid':
|
||||
return '550e8400-e29b-41d4-a716-446655440000'
|
||||
else:
|
||||
return random.choice(['sample', 'example', 'test', 'demo'])
|
||||
if schema.properties:
|
||||
return self._generate_object(schema.properties)
|
||||
|
||||
elif schema_type == 'integer' or schema_type == 'number':
|
||||
return random.randint(1, 1000)
|
||||
if schema.type == "object":
|
||||
return {}
|
||||
|
||||
elif schema_type == 'boolean':
|
||||
return random.choice([True, False])
|
||||
return self._get_type_default(SchemaProperty(**schema.model_dump(mode="json")))
|
||||
|
||||
elif schema_type == 'null':
|
||||
return None
|
||||
def generate_example_from_dict(self, schema_dict: Dict[str, Any]) -> Any:
|
||||
"""Generate an example from a schema dictionary."""
|
||||
schema = SchemaProperty(**schema_dict)
|
||||
return self.generate_from_property(schema)
|
||||
|
||||
return None
|
||||
def generate_from_content(self, content: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate examples from a content dictionary (requestBody or response)."""
|
||||
examples = {}
|
||||
for media_type, content_def in content.items():
|
||||
if isinstance(content_def, dict) and "schema" in content_def:
|
||||
examples[media_type] = self.generate_example_from_dict(content_def["schema"])
|
||||
return examples
|
||||
|
||||
def generate_request_body_example(self, request_body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate an example for a request body."""
|
||||
return self.generate_from_content(request_body.get("content", {}))
|
||||
|
||||
def generate_response_examples(self, responses: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate examples for all responses."""
|
||||
examples = {}
|
||||
for status_code, response in responses.items():
|
||||
if isinstance(response, dict):
|
||||
examples[status_code] = self.generate_from_content(response.get("content", {}))
|
||||
return examples
|
||||
|
||||
def generate_all_examples(self, spec_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate examples for the entire spec."""
|
||||
result = {
|
||||
"endpoints": [],
|
||||
"schemas": {},
|
||||
}
|
||||
|
||||
for path, methods in spec_data.get("paths", {}).items():
|
||||
for method, details in methods.items():
|
||||
if method in ["get", "post", "put", "patch", "delete"]:
|
||||
endpoint_example = {
|
||||
"path": path,
|
||||
"method": method,
|
||||
"parameters": [],
|
||||
"requestBody": None,
|
||||
"responses": {},
|
||||
}
|
||||
|
||||
for param in details.get("parameters", []):
|
||||
param_example = {
|
||||
"name": param.get("name"),
|
||||
"in": param.get("in"),
|
||||
"example": self.generate_example_from_dict(param.get("schema", {})),
|
||||
}
|
||||
endpoint_example["parameters"].append(param_example)
|
||||
|
||||
if "requestBody" in details:
|
||||
endpoint_example["requestBody"] = self.generate_request_body_example(
|
||||
details["requestBody"]
|
||||
)
|
||||
|
||||
endpoint_example["responses"] = self.generate_response_examples(
|
||||
details.get("responses", {})
|
||||
)
|
||||
|
||||
result["endpoints"].append(endpoint_example)
|
||||
|
||||
for schema_name, schema_def in spec_data.get("components", {}).get("schemas", {}).items():
|
||||
result["schemas"][schema_name] = self.generate_example_from_dict(schema_def)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class SchemaWalker:
|
||||
"""Utility class for walking schema hierarchies."""
|
||||
|
||||
def __init__(self):
|
||||
self._seen_refs: Set[str] = set()
|
||||
|
||||
def walk(
|
||||
self,
|
||||
schema: Any,
|
||||
callback: callable,
|
||||
visited: Optional[Set[str]] = None,
|
||||
) -> None:
|
||||
"""Walk through a schema and call callback on each node."""
|
||||
if visited is None:
|
||||
visited = set()
|
||||
|
||||
if schema is None:
|
||||
return
|
||||
|
||||
schema_id = id(schema)
|
||||
if schema_id in visited:
|
||||
return
|
||||
visited.add(schema_id)
|
||||
|
||||
if isinstance(schema, dict):
|
||||
callback(schema)
|
||||
for key, value in schema.items():
|
||||
if key not in ["title", "description", "default"]:
|
||||
self.walk(value, callback, visited)
|
||||
|
||||
elif isinstance(schema, list):
|
||||
for item in schema:
|
||||
self.walk(item, callback, visited)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the walker state."""
|
||||
self._seen_refs.clear()
|
||||
|
||||
Reference in New Issue
Block a user