This commit is contained in:
@@ -1,245 +1,183 @@
|
||||
"""Example generation from OpenAPI schemas."""
|
||||
from typing import Any
|
||||
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
|
||||
from src.core.models import SchemaProperty, Schema
|
||||
from src.core.models import Schema
|
||||
|
||||
|
||||
class ExampleGenerator:
|
||||
"""Generates realistic examples from OpenAPI schemas."""
|
||||
def __init__(self, components_schemas: dict[str, Schema] | None = None):
|
||||
self.components_schemas = components_schemas or {}
|
||||
|
||||
TYPE_EXAMPLES = {
|
||||
"integer": 42,
|
||||
"int32": 42,
|
||||
"int64": 9223372036854775807,
|
||||
"number": 3.14,
|
||||
"float": 3.14,
|
||||
"double": 3.14159265359,
|
||||
"string": "example string",
|
||||
"password": "secretpassword123",
|
||||
"email": "user@example.com",
|
||||
"uri": "https://example.com",
|
||||
"uuid": "123e4567-e89b-12d3-a456-426614174000",
|
||||
"date": "2024-01-15",
|
||||
"date-time": "2024-01-15T10:30:00Z",
|
||||
"time": "10:30:00",
|
||||
"byte": "dGVzdA==",
|
||||
"binary": "binary data",
|
||||
"boolean": True,
|
||||
"null": None,
|
||||
}
|
||||
|
||||
STRING_FORMATS = {
|
||||
"date": lambda: "2024-01-15",
|
||||
"date-time": lambda: "2024-01-15T10:30:00Z",
|
||||
"time": lambda: "10:30:00",
|
||||
"email": lambda: "user@example.com",
|
||||
"uri": lambda: "https://example.com",
|
||||
"hostname": lambda: "example.com",
|
||||
"ipv4": lambda: "192.168.1.1",
|
||||
"ipv6": lambda: "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
|
||||
"uuid": lambda: "123e4567-e89b-12d3-a456-426614174000",
|
||||
}
|
||||
|
||||
COUNTER = 0
|
||||
|
||||
def reset_counter(self) -> None:
|
||||
"""Reset the counter for generating unique examples."""
|
||||
self.COUNTER = 0
|
||||
|
||||
def _get_next_value(self) -> int:
|
||||
self.COUNTER += 1
|
||||
return self.COUNTER
|
||||
|
||||
def _extract_ref(self, ref: Optional[str]) -> Optional[str]:
|
||||
"""Extract the schema name from a $ref."""
|
||||
if not ref:
|
||||
def generate(self, schema: Schema | dict) -> Any:
|
||||
if schema is None:
|
||||
return None
|
||||
match = re.search(r"#/components/schemas/(\w+)", ref)
|
||||
return match.group(1) if match else None
|
||||
if isinstance(schema, dict):
|
||||
schema = Schema(**schema)
|
||||
schema_dict = schema.model_dump(exclude_none=True)
|
||||
return self._generate_from_schema(schema, schema_dict)
|
||||
|
||||
def _get_type_default(self, schema: SchemaProperty) -> Any:
|
||||
"""Get a default example value based on type and format."""
|
||||
if schema.enum:
|
||||
return schema.enum[0]
|
||||
|
||||
if schema.type is None:
|
||||
def _generate_from_schema(self, schema: Schema, schema_dict: dict) -> Any:
|
||||
schema_type = schema.type
|
||||
if schema_type == "string":
|
||||
return self._generate_string(schema)
|
||||
elif schema_type == "integer":
|
||||
return self._generate_integer(schema)
|
||||
elif schema_type == "number":
|
||||
return self._generate_number(schema)
|
||||
elif schema_type == "boolean":
|
||||
return self._generate_boolean(schema)
|
||||
elif schema_type == "array":
|
||||
return self._generate_array(schema, schema_dict)
|
||||
elif schema_type == "object":
|
||||
return self._generate_object(schema, schema_dict)
|
||||
elif schema.all_of:
|
||||
return self._generate_all_of(schema.all_of, schema_dict)
|
||||
elif schema.any_of:
|
||||
return self._generate_any_of(schema.any_of, schema_dict)
|
||||
elif schema.one_of:
|
||||
return self._generate_one_of(schema.one_of, schema_dict)
|
||||
elif schema.not_:
|
||||
return None
|
||||
|
||||
if schema.type == "string" and schema.pattern:
|
||||
return "ABC"
|
||||
|
||||
if schema.type == "string" and schema.format:
|
||||
if schema.format in self.STRING_FORMATS:
|
||||
return self.STRING_FORMATS[schema.format]()
|
||||
|
||||
if schema.type in self.TYPE_EXAMPLES:
|
||||
if schema.type == "string" and schema.format == "uuid":
|
||||
return self.TYPE_EXAMPLES["uuid"].replace("00000000", str(self._get_next_value()).zfill(8))
|
||||
return self.TYPE_EXAMPLES[schema.type]
|
||||
|
||||
return None
|
||||
|
||||
def generate_from_property(self, schema: SchemaProperty) -> Any:
|
||||
"""Generate an example from a single schema property."""
|
||||
if schema.example is not None:
|
||||
return schema.example
|
||||
|
||||
if schema.examples:
|
||||
return schema.examples[0]
|
||||
|
||||
elif schema.ref:
|
||||
return self._resolve_ref(schema.ref)
|
||||
elif schema.enum:
|
||||
return schema.enum[0] if schema.enum else None
|
||||
if schema.default is not None:
|
||||
return schema.default
|
||||
return None
|
||||
|
||||
if schema.all_of:
|
||||
merged = {}
|
||||
for s in schema.all_of:
|
||||
merged.update(self.generate_example_from_dict(s.model_dump(mode="json")))
|
||||
return merged
|
||||
def _generate_string(self, schema: Schema) -> str:
|
||||
if schema.example is not None:
|
||||
return str(schema.example)
|
||||
if schema.enum:
|
||||
return str(schema.enum[0])
|
||||
format_str = schema.format
|
||||
if format_str == "date-time":
|
||||
return "2024-01-01T00:00:00Z"
|
||||
elif format_str == "date":
|
||||
return "2024-01-01"
|
||||
elif format_str == "email":
|
||||
return "user@example.com"
|
||||
elif format_str == "uri":
|
||||
return "https://example.com"
|
||||
elif format_str == "uuid":
|
||||
return "550e8400-e29b-41d4-a716-446655440000"
|
||||
elif format_str == "hostname":
|
||||
return "example.com"
|
||||
elif format_str == "ipv4":
|
||||
return "192.168.1.1"
|
||||
elif format_str == "ipv6":
|
||||
return "::1"
|
||||
return "string"
|
||||
|
||||
if schema.one_of:
|
||||
return self.generate_example_from_dict(schema.one_of[0].model_dump(mode="json"))
|
||||
def _generate_integer(self, schema: Schema) -> int:
|
||||
if schema.example is not None:
|
||||
try:
|
||||
return int(schema.example)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
if schema.default is not None:
|
||||
try:
|
||||
return int(schema.default)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
return 0
|
||||
|
||||
if schema.any_of:
|
||||
return self.generate_example_from_dict(schema.any_of[0].model_dump(mode="json"))
|
||||
def _generate_number(self, schema: Schema) -> float:
|
||||
if schema.example is not None:
|
||||
try:
|
||||
return float(schema.example)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
if schema.default is not None:
|
||||
try:
|
||||
return float(schema.default)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
return 0.0
|
||||
|
||||
if schema.type == "object":
|
||||
if schema.properties:
|
||||
return self._generate_object(schema.properties)
|
||||
return {}
|
||||
def _generate_boolean(self, schema: Schema) -> bool:
|
||||
if schema.example is not None:
|
||||
return bool(schema.example)
|
||||
if schema.default is not None:
|
||||
return bool(schema.default)
|
||||
return False
|
||||
|
||||
if schema.type == "array":
|
||||
if schema.items:
|
||||
return [self.generate_from_property(schema.items)]
|
||||
def _generate_array(self, schema: Schema, schema_dict: dict) -> list:
|
||||
items = schema.items
|
||||
if items is None and "items" in schema_dict:
|
||||
items = schema_dict["items"]
|
||||
if items is None:
|
||||
return []
|
||||
if isinstance(items, dict):
|
||||
items = Schema(**items)
|
||||
has_dump = hasattr(items, "model_dump")
|
||||
example = self._generate_from_schema(
|
||||
items, items.model_dump() if has_dump else items
|
||||
)
|
||||
return [example]
|
||||
|
||||
return self._get_type_default(schema)
|
||||
|
||||
def _generate_object(self, properties: Dict[str, SchemaProperty]) -> Dict[str, Any]:
|
||||
"""Generate an example object from property definitions."""
|
||||
def _generate_object(self, schema: Schema, schema_dict: dict) -> dict:
|
||||
result = {}
|
||||
properties = schema.properties or schema_dict.get("properties", {})
|
||||
for prop_name, prop_schema in properties.items():
|
||||
if prop_schema.read_only:
|
||||
if prop_schema is None:
|
||||
continue
|
||||
result[prop_name] = self.generate_from_property(prop_schema)
|
||||
if isinstance(prop_schema, dict):
|
||||
prop_schema = Schema(**prop_schema)
|
||||
has_dump = hasattr(prop_schema, "model_dump")
|
||||
result[prop_name] = self._generate_from_schema(
|
||||
prop_schema, prop_schema.model_dump() if has_dump else prop_schema
|
||||
)
|
||||
return result
|
||||
|
||||
def generate_example(self, schema: Schema) -> Dict[str, Any]:
|
||||
"""Generate a complete example for a schema."""
|
||||
if schema.example:
|
||||
return schema.example
|
||||
|
||||
if schema.properties:
|
||||
return self._generate_object(schema.properties)
|
||||
|
||||
if schema.type == "object":
|
||||
return {}
|
||||
|
||||
return self._get_type_default(SchemaProperty(**schema.model_dump(mode="json")))
|
||||
|
||||
def generate_example_from_dict(self, schema_dict: Dict[str, Any]) -> Any:
|
||||
"""Generate an example from a schema dictionary."""
|
||||
schema = SchemaProperty(**schema_dict)
|
||||
return self.generate_from_property(schema)
|
||||
|
||||
def generate_from_content(self, content: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate examples from a content dictionary (requestBody or response)."""
|
||||
examples = {}
|
||||
for media_type, content_def in content.items():
|
||||
if isinstance(content_def, dict) and "schema" in content_def:
|
||||
examples[media_type] = self.generate_example_from_dict(content_def["schema"])
|
||||
return examples
|
||||
|
||||
def generate_request_body_example(self, request_body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate an example for a request body."""
|
||||
return self.generate_from_content(request_body.get("content", {}))
|
||||
|
||||
def generate_response_examples(self, responses: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate examples for all responses."""
|
||||
examples = {}
|
||||
for status_code, response in responses.items():
|
||||
if isinstance(response, dict):
|
||||
examples[status_code] = self.generate_from_content(response.get("content", {}))
|
||||
return examples
|
||||
|
||||
def generate_all_examples(self, spec_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Generate examples for the entire spec."""
|
||||
result = {
|
||||
"endpoints": [],
|
||||
"schemas": {},
|
||||
}
|
||||
|
||||
for path, methods in spec_data.get("paths", {}).items():
|
||||
for method, details in methods.items():
|
||||
if method in ["get", "post", "put", "patch", "delete"]:
|
||||
endpoint_example = {
|
||||
"path": path,
|
||||
"method": method,
|
||||
"parameters": [],
|
||||
"requestBody": None,
|
||||
"responses": {},
|
||||
}
|
||||
|
||||
for param in details.get("parameters", []):
|
||||
param_example = {
|
||||
"name": param.get("name"),
|
||||
"in": param.get("in"),
|
||||
"example": self.generate_example_from_dict(param.get("schema", {})),
|
||||
}
|
||||
endpoint_example["parameters"].append(param_example)
|
||||
|
||||
if "requestBody" in details:
|
||||
endpoint_example["requestBody"] = self.generate_request_body_example(
|
||||
details["requestBody"]
|
||||
)
|
||||
|
||||
endpoint_example["responses"] = self.generate_response_examples(
|
||||
details.get("responses", {})
|
||||
)
|
||||
|
||||
result["endpoints"].append(endpoint_example)
|
||||
|
||||
for schema_name, schema_def in spec_data.get("components", {}).get("schemas", {}).items():
|
||||
result["schemas"][schema_name] = self.generate_example_from_dict(schema_def)
|
||||
|
||||
def _generate_all_of(self, schemas: list, schema_dict: dict) -> dict:
|
||||
result = {}
|
||||
for s in schemas:
|
||||
if s is None:
|
||||
continue
|
||||
if isinstance(s, dict):
|
||||
s = Schema(**s)
|
||||
has_dump = hasattr(s, "model_dump")
|
||||
partial = self._generate_from_schema(s, s.model_dump() if has_dump else s)
|
||||
if isinstance(partial, dict):
|
||||
result.update(partial)
|
||||
return result
|
||||
|
||||
def _generate_any_of(self, schemas: list, schema_dict: dict) -> Any:
|
||||
for s in schemas:
|
||||
if s is None:
|
||||
continue
|
||||
if isinstance(s, dict):
|
||||
s = Schema(**s)
|
||||
has_dump = hasattr(s, "model_dump")
|
||||
result = self._generate_from_schema(s, s.model_dump() if has_dump else s)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
class SchemaWalker:
|
||||
"""Utility class for walking schema hierarchies."""
|
||||
def _generate_one_of(self, schemas: list, schema_dict: dict) -> Any:
|
||||
for s in schemas:
|
||||
if s is None:
|
||||
continue
|
||||
if isinstance(s, dict):
|
||||
s = Schema(**s)
|
||||
has_dump = hasattr(s, "model_dump")
|
||||
result = self._generate_from_schema(s, s.model_dump() if has_dump else s)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
def __init__(self):
|
||||
self._seen_refs: Set[str] = set()
|
||||
def _resolve_ref(self, ref: str) -> Any:
|
||||
if ref.startswith("#/components/schemas/"):
|
||||
schema_name = ref.split("/")[-1]
|
||||
if schema_name in self.components_schemas:
|
||||
return self.generate(self.components_schemas[schema_name])
|
||||
return None
|
||||
|
||||
def walk(
|
||||
self,
|
||||
schema: Any,
|
||||
callback: callable,
|
||||
visited: Optional[Set[str]] = None,
|
||||
) -> None:
|
||||
"""Walk through a schema and call callback on each node."""
|
||||
if visited is None:
|
||||
visited = set()
|
||||
|
||||
if schema is None:
|
||||
return
|
||||
|
||||
schema_id = id(schema)
|
||||
if schema_id in visited:
|
||||
return
|
||||
visited.add(schema_id)
|
||||
|
||||
if isinstance(schema, dict):
|
||||
callback(schema)
|
||||
for key, value in schema.items():
|
||||
if key not in ["title", "description", "default"]:
|
||||
self.walk(value, callback, visited)
|
||||
|
||||
elif isinstance(schema, list):
|
||||
for item in schema:
|
||||
self.walk(item, callback, visited)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the walker state."""
|
||||
self._seen_refs.clear()
|
||||
def generate_examples_from_schema(
|
||||
schema: Schema | dict,
|
||||
components_schemas: dict[str, Schema] | None = None,
|
||||
) -> Any:
|
||||
generator = ExampleGenerator(components_schemas)
|
||||
return generator.generate(schema)
|
||||
|
||||
Reference in New Issue
Block a user