From aa7c813bd52119b134abb63c21212378e8b7153d Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Sun, 1 Feb 2026 16:38:16 +0000 Subject: [PATCH] fix: resolve CI test failures --- src/utils/examples.py | 339 +++++++++++++++++++++++++++--------------- 1 file changed, 219 insertions(+), 120 deletions(-) diff --git a/src/utils/examples.py b/src/utils/examples.py index 4dc6962..ae64e83 100644 --- a/src/utils/examples.py +++ b/src/utils/examples.py @@ -1,146 +1,245 @@ -from typing import Any, Dict, List, Optional -import random +"""Example generation from OpenAPI schemas.""" + +import re +from typing import Any, Dict, List, Optional, Set + +from src.core.models import SchemaProperty, Schema -FAKE_DATA = { - 'names': ['John', 'Jane', 'Bob', 'Alice', 'Charlie', 'Diana', 'Eve', 'Frank'], - 'domains': ['example.com', 'test.org', 'sample.net', 'demo.io'], - 'cities': ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix'], - 'streets': ['Main St', 'Oak Ave', 'Maple Dr', 'Cedar Ln', 'Pine Rd'], - 'countries': ['USA', 'Canada', 'UK', 'Germany', 'France'], - 'companies': ['Acme Corp', 'TechStart', 'Global Inc', 'Local LLC', 'Digital Co'], - 'job_titles': ['Engineer', 'Manager', 'Designer', 'Developer', 'Analyst'], - 'departments': ['Engineering', 'Marketing', 'Sales', 'HR', 'Finance'], - 'products': ['Widget', 'Gadget', 'Tool', 'Device', 'Component'], - 'adjectives': ['Premium', 'Essential', 'Professional', 'Standard', 'Deluxe'], - 'lorem_words': ['lorem', 'ipsum', 'dolor', 'sit', 'amet', 'consectetur', 'adipiscing', 'elit'], - 'statuses': ['active', 'pending', 'completed', 'cancelled', 'archived'], - 'id_prefixes': ['usr_', 'ord_', 'prd_', 'inv_', 'txn_'] -} +class ExampleGenerator: + """Generates realistic examples from OpenAPI schemas.""" - -def generate_id(prefix: str = None) -> str: - prefix = prefix or random.choice(FAKE_DATA['id_prefixes']) - return f"{prefix}{random.randint(10000, 99999)}" - - -def generate_name() -> str: - first = random.choice(FAKE_DATA['names']) - last = random.choice(FAKE_DATA['names']) - return f"{first} {last}" - - -def generate_email(name: str = None) -> str: - name = (name or generate_name()).lower().replace(' ', '.') - domain = random.choice(FAKE_DATA['domains']) - return f"{name}@{domain}" - - -def generate_phone() -> str: - return f"+1-{random.randint(200, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}" - - -def generate_address() -> Dict[str, Any]: - return { - 'street': f"{random.randint(100, 9999)} {random.choice(FAKE_DATA['streets'])}", - 'city': random.choice(FAKE_DATA['cities']), - 'state': f"{random.choice(['CA', 'NY', 'TX', 'FL', 'IL'])}", - 'zip': f"{random.randint(10000, 99999)}", - 'country': random.choice(FAKE_DATA['countries']) + TYPE_EXAMPLES = { + "integer": 42, + "int32": 42, + "int64": 9223372036854775807, + "number": 3.14, + "float": 3.14, + "double": 3.14159265359, + "string": "example string", + "password": "secretpassword123", + "email": "user@example.com", + "uri": "https://example.com", + "uuid": "123e4567-e89b-12d3-a456-426614174000", + "date": "2024-01-15", + "date-time": "2024-01-15T10:30:00Z", + "time": "10:30:00", + "byte": "dGVzdA==", + "binary": "binary data", + "boolean": True, + "null": None, } - -def generate_company() -> Dict[str, Any]: - adj = random.choice(FAKE_DATA['adjectives']) - product = random.choice(FAKE_DATA['products']) - return { - 'name': f"{adj} {product} {random.choice(FAKE_DATA['companies'])}", - 'industry': random.choice(['Technology', 'Healthcare', 'Finance', 'Retail', 'Manufacturing']), - 'employees': random.randint(10, 10000), - 'founded': random.randint(1950, 2023) + STRING_FORMATS = { + "date": lambda: "2024-01-15", + "date-time": lambda: "2024-01-15T10:30:00Z", + "time": lambda: "10:30:00", + "email": lambda: "user@example.com", + "uri": lambda: "https://example.com", + "hostname": lambda: "example.com", + "ipv4": lambda: "192.168.1.1", + "ipv6": lambda: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", + "uuid": lambda: "123e4567-e89b-12d3-a456-426614174000", } + COUNTER = 0 -def generate_user() -> Dict[str, Any]: - return { - 'id': generate_id('usr_'), - 'name': generate_name(), - 'email': generate_email(), - 'phone': generate_phone(), - 'address': generate_address(), - 'created_at': '2024-01-15T10:30:00Z', - 'status': random.choice(FAKE_DATA['statuses']) - } + def reset_counter(self) -> None: + """Reset the counter for generating unique examples.""" + self.COUNTER = 0 + def _get_next_value(self) -> int: + self.COUNTER += 1 + return self.COUNTER -def generate_product() -> Dict[str, Any]: - adj = random.choice(FAKE_DATA['adjectives']) - product = random.choice(FAKE_DATA['products']) - return { - 'id': generate_id('prd_'), - 'name': f"{adj} {product}", - 'description': ' '.join(random.choices(FAKE_DATA['lorem_words'], k=10)), - 'price': round(random.uniform(9.99, 999.99), 2), - 'sku': f"SKU-{random.randint(10000, 99999)}", - 'in_stock': random.choice([True, False]), - 'category': random.choice(['Electronics', 'Clothing', 'Home', 'Sports', 'Books']) - } + def _extract_ref(self, ref: Optional[str]) -> Optional[str]: + """Extract the schema name from a $ref.""" + if not ref: + return None + match = re.search(r"#/components/schemas/(\w+)", ref) + return match.group(1) if match else None + def _get_type_default(self, schema: SchemaProperty) -> Any: + """Get a default example value based on type and format.""" + if schema.enum: + return schema.enum[0] -def generate_order() -> Dict[str, Any]: - return { - 'id': generate_id('ord_'), - 'customer_id': generate_id('usr_'), - 'items': [generate_product() for _ in range(random.randint(1, 5))], - 'total': round(random.uniform(50, 2000), 2), - 'status': random.choice(FAKE_DATA['statuses']), - 'created_at': '2024-01-15T14:30:00Z' - } + if schema.type is None: + return None + if schema.type == "string" and schema.pattern: + return "ABC" + + if schema.type == "string" and schema.format: + if schema.format in self.STRING_FORMATS: + return self.STRING_FORMATS[schema.format]() + + if schema.type in self.TYPE_EXAMPLES: + if schema.type == "string" and schema.format == "uuid": + return self.TYPE_EXAMPLES["uuid"].replace("00000000", str(self._get_next_value()).zfill(8)) + return self.TYPE_EXAMPLES[schema.type] -def generate(schema: Dict[str, Any], depth: int = 0) -> Any: - if depth > 3: return None - if not schema: - return None + def generate_from_property(self, schema: SchemaProperty) -> Any: + """Generate an example from a single schema property.""" + if schema.example is not None: + return schema.example - schema_type = schema.get('type', 'object') + if schema.examples: + return schema.examples[0] - if schema_type == 'object' and 'properties' in schema: + if schema.default is not None: + return schema.default + + if schema.all_of: + merged = {} + for s in schema.all_of: + merged.update(self.generate_example_from_dict(s.model_dump(mode="json"))) + return merged + + if schema.one_of: + return self.generate_example_from_dict(schema.one_of[0].model_dump(mode="json")) + + if schema.any_of: + return self.generate_example_from_dict(schema.any_of[0].model_dump(mode="json")) + + if schema.type == "object": + if schema.properties: + return self._generate_object(schema.properties) + return {} + + if schema.type == "array": + if schema.items: + return [self.generate_from_property(schema.items)] + return [] + + return self._get_type_default(schema) + + def _generate_object(self, properties: Dict[str, SchemaProperty]) -> Dict[str, Any]: + """Generate an example object from property definitions.""" result = {} - for prop_name, prop_schema in schema['properties'].items(): - required = schema.get('required', []) - if prop_name in required or random.choice([True, False]): - result[prop_name] = generate(prop_schema, depth + 1) + for prop_name, prop_schema in properties.items(): + if prop_schema.read_only: + continue + result[prop_name] = self.generate_from_property(prop_schema) return result - elif schema_type == 'array': - item_schema = schema.get('items', {}) - return [generate(item_schema, depth + 1) for _ in range(random.randint(1, 3))] + def generate_example(self, schema: Schema) -> Dict[str, Any]: + """Generate a complete example for a schema.""" + if schema.example: + return schema.example - elif schema_type == 'string': - string_format = schema.get('format') - if string_format == 'date-time': - return '2024-01-15T10:30:00Z' - elif string_format == 'date': - return '2024-01-15' - elif string_format == 'email': - return generate_email() - elif string_format == 'uri': - return 'https://example.com/api' - elif string_format == 'uuid': - return '550e8400-e29b-41d4-a716-446655440000' - else: - return random.choice(['sample', 'example', 'test', 'demo']) + if schema.properties: + return self._generate_object(schema.properties) - elif schema_type == 'integer' or schema_type == 'number': - return random.randint(1, 1000) + if schema.type == "object": + return {} - elif schema_type == 'boolean': - return random.choice([True, False]) + return self._get_type_default(SchemaProperty(**schema.model_dump(mode="json"))) - elif schema_type == 'null': - return None + def generate_example_from_dict(self, schema_dict: Dict[str, Any]) -> Any: + """Generate an example from a schema dictionary.""" + schema = SchemaProperty(**schema_dict) + return self.generate_from_property(schema) - return None + def generate_from_content(self, content: Dict[str, Any]) -> Dict[str, Any]: + """Generate examples from a content dictionary (requestBody or response).""" + examples = {} + for media_type, content_def in content.items(): + if isinstance(content_def, dict) and "schema" in content_def: + examples[media_type] = self.generate_example_from_dict(content_def["schema"]) + return examples + + def generate_request_body_example(self, request_body: Dict[str, Any]) -> Dict[str, Any]: + """Generate an example for a request body.""" + return self.generate_from_content(request_body.get("content", {})) + + def generate_response_examples(self, responses: Dict[str, Any]) -> Dict[str, Any]: + """Generate examples for all responses.""" + examples = {} + for status_code, response in responses.items(): + if isinstance(response, dict): + examples[status_code] = self.generate_from_content(response.get("content", {})) + return examples + + def generate_all_examples(self, spec_data: Dict[str, Any]) -> Dict[str, Any]: + """Generate examples for the entire spec.""" + result = { + "endpoints": [], + "schemas": {}, + } + + for path, methods in spec_data.get("paths", {}).items(): + for method, details in methods.items(): + if method in ["get", "post", "put", "patch", "delete"]: + endpoint_example = { + "path": path, + "method": method, + "parameters": [], + "requestBody": None, + "responses": {}, + } + + for param in details.get("parameters", []): + param_example = { + "name": param.get("name"), + "in": param.get("in"), + "example": self.generate_example_from_dict(param.get("schema", {})), + } + endpoint_example["parameters"].append(param_example) + + if "requestBody" in details: + endpoint_example["requestBody"] = self.generate_request_body_example( + details["requestBody"] + ) + + endpoint_example["responses"] = self.generate_response_examples( + details.get("responses", {}) + ) + + result["endpoints"].append(endpoint_example) + + for schema_name, schema_def in spec_data.get("components", {}).get("schemas", {}).items(): + result["schemas"][schema_name] = self.generate_example_from_dict(schema_def) + + return result + + +class SchemaWalker: + """Utility class for walking schema hierarchies.""" + + def __init__(self): + self._seen_refs: Set[str] = set() + + def walk( + self, + schema: Any, + callback: callable, + visited: Optional[Set[str]] = None, + ) -> None: + """Walk through a schema and call callback on each node.""" + if visited is None: + visited = set() + + if schema is None: + return + + schema_id = id(schema) + if schema_id in visited: + return + visited.add(schema_id) + + if isinstance(schema, dict): + callback(schema) + for key, value in schema.items(): + if key not in ["title", "description", "default"]: + self.walk(value, callback, visited) + + elif isinstance(schema, list): + for item in schema: + self.walk(item, callback, visited) + + def reset(self) -> None: + """Reset the walker state.""" + self._seen_refs.clear()