276 lines
9.3 KiB
Python
276 lines
9.3 KiB
Python
"""Schema inference and validation module."""
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from pydantic import BaseModel, field_validator
|
|
|
|
|
|
class SchemaType:
|
|
"""Represents an inferred schema type."""
|
|
|
|
NULL = "null"
|
|
BOOLEAN = "boolean"
|
|
NUMBER = "number"
|
|
INTEGER = "integer"
|
|
STRING = "string"
|
|
ARRAY = "array"
|
|
OBJECT = "object"
|
|
|
|
|
|
@dataclass
|
|
class SchemaProperty:
|
|
"""Represents a schema property."""
|
|
|
|
name: str
|
|
type: str
|
|
required: bool = True
|
|
description: str = ""
|
|
properties: Optional[List["SchemaProperty"]] = None
|
|
items: Optional["SchemaProperty"] = None
|
|
enum_values: Optional[List[Any]] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary."""
|
|
result: Dict[str, Any] = {
|
|
"name": self.name,
|
|
"type": self.type,
|
|
"required": self.required,
|
|
}
|
|
if self.description:
|
|
result["description"] = self.description
|
|
if self.properties:
|
|
result["properties"] = [p.to_dict() for p in self.properties]
|
|
if self.items:
|
|
result["items"] = self.items.to_dict()
|
|
if self.enum_values is not None:
|
|
result["enum_values"] = self.enum_values
|
|
return result
|
|
|
|
|
|
@dataclass
|
|
class InferredSchema:
|
|
"""Represents an inferred schema."""
|
|
|
|
root_type: str
|
|
properties: List[SchemaProperty] = field(default_factory=list)
|
|
description: str = ""
|
|
items: Optional[SchemaProperty] = None
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.properties is None:
|
|
self.properties = []
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary."""
|
|
result: Dict[str, Any] = {
|
|
"root_type": self.root_type,
|
|
"properties": [p.to_dict() for p in self.properties],
|
|
"description": self.description,
|
|
}
|
|
if self.items:
|
|
result["items"] = self.items.to_dict()
|
|
return result
|
|
|
|
|
|
class SchemaInferrer:
|
|
"""Infers schema from configuration data."""
|
|
|
|
def infer(self, data: Any) -> InferredSchema:
|
|
"""Infer schema from data."""
|
|
if data is None:
|
|
return InferredSchema(root_type=SchemaType.NULL)
|
|
elif isinstance(data, bool):
|
|
return InferredSchema(root_type=SchemaType.BOOLEAN)
|
|
elif isinstance(data, (int, float)):
|
|
schema = InferredSchema(root_type=SchemaType.NUMBER)
|
|
if isinstance(data, int):
|
|
schema.root_type = SchemaType.INTEGER
|
|
return schema
|
|
elif isinstance(data, str):
|
|
return InferredSchema(root_type=SchemaType.STRING)
|
|
elif isinstance(data, list):
|
|
return self._infer_array_schema(data)
|
|
elif isinstance(data, dict):
|
|
return self._infer_object_schema(data)
|
|
else:
|
|
return InferredSchema(root_type=SchemaType.STRING)
|
|
|
|
def _infer_array_schema(self, data: List[Any]) -> InferredSchema:
|
|
"""Infer schema for an array."""
|
|
if not data:
|
|
return InferredSchema(root_type=SchemaType.ARRAY)
|
|
|
|
inferred_types = [self.infer(item) for item in data]
|
|
|
|
common_type = self._find_common_type(inferred_types)
|
|
|
|
if common_type == SchemaType.OBJECT:
|
|
merged_props = self._merge_object_properties(inferred_types)
|
|
return InferredSchema(
|
|
root_type=SchemaType.ARRAY,
|
|
properties=merged_props,
|
|
items=SchemaProperty(name="item", type="object", properties=merged_props),
|
|
)
|
|
|
|
return InferredSchema(
|
|
root_type=SchemaType.ARRAY,
|
|
items=SchemaProperty(name="item", type=common_type),
|
|
)
|
|
|
|
def _infer_object_schema(self, data: Dict[str, Any]) -> InferredSchema:
|
|
"""Infer schema for an object."""
|
|
properties = []
|
|
|
|
for key, value in data.items():
|
|
inferred = self.infer(value)
|
|
prop = SchemaProperty(
|
|
name=key,
|
|
type=inferred.root_type,
|
|
properties=inferred.properties if inferred.properties else None,
|
|
items=inferred.items,
|
|
)
|
|
properties.append(prop)
|
|
|
|
return InferredSchema(
|
|
root_type=SchemaType.OBJECT,
|
|
properties=properties,
|
|
)
|
|
|
|
def _find_common_type(self, schemas: List[InferredSchema]) -> str:
|
|
"""Find the common type among schemas."""
|
|
types = set(s.root_type for s in schemas)
|
|
if len(types) == 1:
|
|
return types.pop()
|
|
if SchemaType.STRING in types:
|
|
return SchemaType.STRING
|
|
if SchemaType.NUMBER in types or SchemaType.INTEGER in types:
|
|
return SchemaType.NUMBER
|
|
if SchemaType.OBJECT in types:
|
|
return SchemaType.OBJECT
|
|
return SchemaType.STRING
|
|
|
|
def _merge_object_properties(
|
|
self, schemas: List[InferredSchema]
|
|
) -> List[SchemaProperty]:
|
|
"""Merge properties from multiple object schemas."""
|
|
prop_dict: Dict[str, SchemaProperty] = {}
|
|
|
|
for schema in schemas:
|
|
if schema.properties:
|
|
for prop in schema.properties:
|
|
if prop.name not in prop_dict:
|
|
prop_dict[prop.name] = prop
|
|
else:
|
|
existing = prop_dict[prop.name]
|
|
if prop.type != existing.type:
|
|
existing.type = SchemaType.STRING
|
|
|
|
return list(prop_dict.values())
|
|
|
|
|
|
class SchemaModel(BaseModel):
|
|
"""Pydantic model for schema validation."""
|
|
|
|
name: str
|
|
type: str
|
|
value: Optional[Any] = None
|
|
required: bool = True
|
|
|
|
@field_validator("type")
|
|
@classmethod
|
|
def validate_type(cls, v: str) -> str:
|
|
"""Validate type is one of supported types."""
|
|
valid_types = [
|
|
"null",
|
|
"boolean",
|
|
"number",
|
|
"integer",
|
|
"string",
|
|
"array",
|
|
"object",
|
|
]
|
|
if v not in valid_types:
|
|
raise ValueError(f"Type must be one of {valid_types}")
|
|
return v
|
|
|
|
|
|
class SchemaValidator:
|
|
"""Validates data against a schema."""
|
|
|
|
def __init__(self, schema: InferredSchema):
|
|
self.schema = schema
|
|
|
|
def validate(self, data: Any) -> Tuple[bool, List[str]]:
|
|
"""Validate data against schema."""
|
|
errors: List[str] = []
|
|
is_valid = self._validate_value(data, self.schema, "", errors)
|
|
return is_valid, errors
|
|
|
|
def _validate_value(
|
|
self, value: Any, schema: InferredSchema, path: str, errors: List[str]
|
|
) -> bool:
|
|
"""Validate a value against schema."""
|
|
if value is None:
|
|
if schema.root_type != SchemaType.NULL:
|
|
errors.append(f"{path}: expected {schema.root_type}, got null")
|
|
return False
|
|
return True
|
|
|
|
actual_type = self._get_type(value)
|
|
|
|
if schema.root_type == SchemaType.OBJECT:
|
|
if actual_type != "object":
|
|
errors.append(f"{path}: expected object, got {actual_type}")
|
|
return False
|
|
if not isinstance(value, dict):
|
|
return True
|
|
|
|
for prop in schema.properties or []:
|
|
prop_path = f"{path}.{prop.name}" if path else prop.name
|
|
if prop.name in value:
|
|
prop_value = value[prop.name]
|
|
prop_props: List[SchemaProperty] = prop.properties if prop.properties else []
|
|
prop_schema = InferredSchema(root_type=prop.type, properties=prop_props) # type: ignore[arg-type]
|
|
self._validate_value(prop_value, prop_schema, prop_path, errors)
|
|
elif prop.required:
|
|
errors.append(f"{prop_path}: required property missing")
|
|
|
|
elif schema.root_type == SchemaType.ARRAY:
|
|
if actual_type != "array":
|
|
errors.append(f"{path}: expected array, got {actual_type}")
|
|
return False
|
|
if isinstance(value, list) and schema.items:
|
|
item_props: List[SchemaProperty] = schema.items.properties if schema.items.properties else []
|
|
item_schema = InferredSchema(root_type=schema.items.type, properties=item_props) # type: ignore[arg-type]
|
|
for i, item in enumerate(value):
|
|
self._validate_value(item, item_schema, f"{path}[{i}]", errors)
|
|
|
|
elif schema.root_type == SchemaType.NUMBER:
|
|
if actual_type not in ("number", "integer"):
|
|
errors.append(f"{path}: expected number, got {actual_type}")
|
|
|
|
elif schema.root_type != actual_type:
|
|
errors.append(f"{path}: expected {schema.root_type}, got {actual_type}")
|
|
|
|
return len([e for e in errors if e.startswith(path)]) == 0
|
|
|
|
def _get_type(self, value: Any) -> str:
|
|
"""Get type string for value."""
|
|
if value is None:
|
|
return SchemaType.NULL
|
|
elif isinstance(value, bool):
|
|
return SchemaType.BOOLEAN
|
|
elif isinstance(value, int):
|
|
return SchemaType.INTEGER
|
|
elif isinstance(value, float):
|
|
return SchemaType.NUMBER
|
|
elif isinstance(value, str):
|
|
return SchemaType.STRING
|
|
elif isinstance(value, list):
|
|
return SchemaType.ARRAY
|
|
elif isinstance(value, dict):
|
|
return SchemaType.OBJECT
|
|
else:
|
|
return SchemaType.STRING
|