"""Schema inference and validation module.""" from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Tuple from pydantic import BaseModel, field_validator class SchemaType: """Represents an inferred schema type.""" NULL = "null" BOOLEAN = "boolean" NUMBER = "number" INTEGER = "integer" STRING = "string" ARRAY = "array" OBJECT = "object" @dataclass class SchemaProperty: """Represents a schema property.""" name: str type: str required: bool = True description: str = "" properties: Optional[List["SchemaProperty"]] = None items: Optional["SchemaProperty"] = None enum_values: Optional[List[Any]] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" result: Dict[str, Any] = { "name": self.name, "type": self.type, "required": self.required, } if self.description: result["description"] = self.description if self.properties: result["properties"] = [p.to_dict() for p in self.properties] if self.items: result["items"] = self.items.to_dict() if self.enum_values is not None: result["enum_values"] = self.enum_values return result @dataclass class InferredSchema: """Represents an inferred schema.""" root_type: str properties: List[SchemaProperty] = field(default_factory=list) description: str = "" items: Optional[SchemaProperty] = None def __post_init__(self) -> None: if self.properties is None: self.properties = [] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" result: Dict[str, Any] = { "root_type": self.root_type, "properties": [p.to_dict() for p in self.properties], "description": self.description, } if self.items: result["items"] = self.items.to_dict() return result class SchemaInferrer: """Infers schema from configuration data.""" def infer(self, data: Any) -> InferredSchema: """Infer schema from data.""" if data is None: return InferredSchema(root_type=SchemaType.NULL) elif isinstance(data, bool): return InferredSchema(root_type=SchemaType.BOOLEAN) elif isinstance(data, (int, float)): schema = InferredSchema(root_type=SchemaType.NUMBER) if isinstance(data, int): schema.root_type = SchemaType.INTEGER return schema elif isinstance(data, str): return InferredSchema(root_type=SchemaType.STRING) elif isinstance(data, list): return self._infer_array_schema(data) elif isinstance(data, dict): return self._infer_object_schema(data) else: return InferredSchema(root_type=SchemaType.STRING) def _infer_array_schema(self, data: List[Any]) -> InferredSchema: """Infer schema for an array.""" if not data: return InferredSchema(root_type=SchemaType.ARRAY) inferred_types = [self.infer(item) for item in data] common_type = self._find_common_type(inferred_types) if common_type == SchemaType.OBJECT: merged_props = self._merge_object_properties(inferred_types) return InferredSchema( root_type=SchemaType.ARRAY, properties=merged_props, items=SchemaProperty(name="item", type="object", properties=merged_props), ) return InferredSchema( root_type=SchemaType.ARRAY, items=SchemaProperty(name="item", type=common_type), ) def _infer_object_schema(self, data: Dict[str, Any]) -> InferredSchema: """Infer schema for an object.""" properties = [] for key, value in data.items(): inferred = self.infer(value) prop = SchemaProperty( name=key, type=inferred.root_type, properties=inferred.properties if inferred.properties else None, items=inferred.items, ) properties.append(prop) return InferredSchema( root_type=SchemaType.OBJECT, properties=properties, ) def _find_common_type(self, schemas: List[InferredSchema]) -> str: """Find the common type among schemas.""" types = set(s.root_type for s in schemas) if len(types) == 1: return types.pop() if SchemaType.STRING in types: return SchemaType.STRING if SchemaType.NUMBER in types or SchemaType.INTEGER in types: return SchemaType.NUMBER if SchemaType.OBJECT in types: return SchemaType.OBJECT return SchemaType.STRING def _merge_object_properties( self, schemas: List[InferredSchema] ) -> List[SchemaProperty]: """Merge properties from multiple object schemas.""" prop_dict: Dict[str, SchemaProperty] = {} for schema in schemas: if schema.properties: for prop in schema.properties: if prop.name not in prop_dict: prop_dict[prop.name] = prop else: existing = prop_dict[prop.name] if prop.type != existing.type: existing.type = SchemaType.STRING return list(prop_dict.values()) class SchemaModel(BaseModel): """Pydantic model for schema validation.""" name: str type: str value: Optional[Any] = None required: bool = True @field_validator("type") @classmethod def validate_type(cls, v: str) -> str: """Validate type is one of supported types.""" valid_types = [ "null", "boolean", "number", "integer", "string", "array", "object", ] if v not in valid_types: raise ValueError(f"Type must be one of {valid_types}") return v class SchemaValidator: """Validates data against a schema.""" def __init__(self, schema: InferredSchema): self.schema = schema def validate(self, data: Any) -> Tuple[bool, List[str]]: """Validate data against schema.""" errors: List[str] = [] is_valid = self._validate_value(data, self.schema, "", errors) return is_valid, errors def _validate_value( self, value: Any, schema: InferredSchema, path: str, errors: List[str] ) -> bool: """Validate a value against schema.""" if value is None: if schema.root_type != SchemaType.NULL: errors.append(f"{path}: expected {schema.root_type}, got null") return False return True actual_type = self._get_type(value) if schema.root_type == SchemaType.OBJECT: if actual_type != "object": errors.append(f"{path}: expected object, got {actual_type}") return False if not isinstance(value, dict): return True for prop in schema.properties or []: prop_path = f"{path}.{prop.name}" if path else prop.name if prop.name in value: prop_value = value[prop.name] prop_props: List[SchemaProperty] = prop.properties if prop.properties else [] prop_schema = InferredSchema(root_type=prop.type, properties=prop_props) self._validate_value(prop_value, prop_schema, prop_path, errors) elif prop.required: errors.append(f"{prop_path}: required property missing") elif schema.root_type == SchemaType.ARRAY: if actual_type != "array": errors.append(f"{path}: expected array, got {actual_type}") return False if isinstance(value, list) and schema.items: item_props: List[SchemaProperty] = schema.items.properties if schema.items.properties else [] item_schema = InferredSchema(root_type=schema.items.type, properties=item_props) for i, item in enumerate(value): self._validate_value(item, item_schema, f"{path}[{i}]", errors) elif schema.root_type == SchemaType.NUMBER: if actual_type not in ("number", "integer"): errors.append(f"{path}: expected number, got {actual_type}") elif schema.root_type != actual_type: errors.append(f"{path}: expected {schema.root_type}, got {actual_type}") return len([e for e in errors if e.startswith(path)]) == 0 def _get_type(self, value: Any) -> str: """Get type string for value.""" if value is None: return SchemaType.NULL elif isinstance(value, bool): return SchemaType.BOOLEAN elif isinstance(value, int): return SchemaType.INTEGER elif isinstance(value, float): return SchemaType.NUMBER elif isinstance(value, str): return SchemaType.STRING elif isinstance(value, list): return SchemaType.ARRAY elif isinstance(value, dict): return SchemaType.OBJECT else: return SchemaType.STRING