Files
config-converter-cli/config_converter/validators/schema.py
7000pctAUTO f7d9fe859a
Some checks failed
CI / test (push) Has been cancelled
Add validators, generators, and utils modules
2026-02-04 21:55:17 +00:00

275 lines
9.1 KiB
Python

"""Schema inference and validation module."""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Union
from pydantic import BaseModel, ValidationError, field_validator
class SchemaType:
"""Represents an inferred schema type."""
NULL = "null"
BOOLEAN = "boolean"
NUMBER = "number"
INTEGER = "integer"
STRING = "string"
ARRAY = "array"
OBJECT = "object"
@dataclass
class SchemaProperty:
"""Represents a schema property."""
name: str
type: str
required: bool = True
description: str = ""
properties: Optional[List["SchemaProperty"]] = None
items: Optional["SchemaProperty"] = None
enum_values: Optional[List[Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
result: Dict[str, Any] = {
"name": self.name,
"type": self.type,
"required": self.required,
}
if self.description:
result["description"] = self.description
if self.properties:
result["properties"] = [p.to_dict() for p in self.properties]
if self.items:
result["items"] = self.items.to_dict()
if self.enum_values is not None:
result["enum_values"] = self.enum_values
return result
@dataclass
class InferredSchema:
"""Represents an inferred schema."""
root_type: str
properties: List[SchemaProperty] = field(default_factory=list)
description: str = ""
items: Optional[SchemaProperty] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
result: Dict[str, Any] = {
"root_type": self.root_type,
"properties": [p.to_dict() for p in self.properties],
"description": self.description,
}
if self.items:
result["items"] = self.items.to_dict()
return result
class SchemaInferrer:
"""Infers schema from configuration data."""
def infer(self, data: Any) -> InferredSchema:
"""Infer schema from data."""
if data is None:
return InferredSchema(root_type=SchemaType.NULL)
elif isinstance(data, bool):
return InferredSchema(root_type=SchemaType.BOOLEAN)
elif isinstance(data, (int, float)):
schema = InferredSchema(root_type=SchemaType.NUMBER)
if isinstance(data, int):
schema.root_type = SchemaType.INTEGER
return schema
elif isinstance(data, str):
return InferredSchema(root_type=SchemaType.STRING)
elif isinstance(data, list):
return self._infer_array_schema(data)
elif isinstance(data, dict):
return self._infer_object_schema(data)
else:
return InferredSchema(root_type=SchemaType.STRING)
def _infer_array_schema(self, data: List[Any]) -> InferredSchema:
"""Infer schema for an array."""
if not data:
return InferredSchema(root_type=SchemaType.ARRAY)
inferred_types = [self.infer(item) for item in data]
common_type = self._find_common_type(inferred_types)
if common_type == SchemaType.OBJECT:
merged_props = self._merge_object_properties(inferred_types)
return InferredSchema(
root_type=SchemaType.ARRAY,
properties=merged_props,
items=SchemaProperty(name="item", type="object", properties=merged_props),
)
return InferredSchema(
root_type=SchemaType.ARRAY,
items=SchemaProperty(name="item", type=common_type),
)
def _infer_object_schema(self, data: Dict[str, Any]) -> InferredSchema:
"""Infer schema for an object."""
properties = []
for key, value in data.items():
inferred = self.infer(value)
prop = SchemaProperty(
name=key,
type=inferred.root_type,
properties=inferred.properties if inferred.properties else None,
items=inferred.items,
)
properties.append(prop)
return InferredSchema(
root_type=SchemaType.OBJECT,
properties=properties,
)
def _find_common_type(self, schemas: List[InferredSchema]) -> str:
"""Find the common type among schemas."""
types = set(s.root_type for s in schemas)
if len(types) == 1:
return types.pop()
if SchemaType.STRING in types:
return SchemaType.STRING
if SchemaType.NUMBER in types or SchemaType.INTEGER in types:
return SchemaType.NUMBER
if SchemaType.OBJECT in types:
return SchemaType.OBJECT
return SchemaType.STRING
def _merge_object_properties(
self, schemas: List[InferredSchema]
) -> List[SchemaProperty]:
"""Merge properties from multiple object schemas."""
prop_dict: Dict[str, SchemaProperty] = {}
for schema in schemas:
if schema.properties:
for prop in schema.properties:
if prop.name not in prop_dict:
prop_dict[prop.name] = prop
else:
existing = prop_dict[prop.name]
if prop.type != existing.type:
existing.type = SchemaType.STRING
return list(prop_dict.values())
class SchemaModel(BaseModel):
"""Pydantic model for schema validation."""
name: str
type: str
value: Optional[Any] = None
required: bool = True
@field_validator("type")
@classmethod
def validate_type(cls, v: str) -> str:
"""Validate type is one of supported types."""
valid_types = [
"null",
"boolean",
"number",
"integer",
"string",
"array",
"object",
]
if v not in valid_types:
raise ValueError(f"Type must be one of {valid_types}")
return v
class SchemaValidator:
"""Validates data against a schema."""
def __init__(self, schema: InferredSchema):
self.schema = schema
def validate(self, data: Any) -> Tuple[bool, List[str]]:
"""Validate data against schema."""
errors = []
is_valid = self._validate_value(data, self.schema, "", errors)
return is_valid, errors
def _validate_value(
self, value: Any, schema: InferredSchema, path: str, errors: List[str]
) -> bool:
"""Validate a value against schema."""
if value is None:
if schema.root_type != SchemaType.NULL:
errors.append(f"{path}: expected {schema.root_type}, got null")
return False
return True
actual_type = self._get_type(value)
if schema.root_type == SchemaType.OBJECT:
if actual_type != "object":
errors.append(f"{path}: expected object, got {actual_type}")
return False
if not isinstance(value, dict):
return True
for prop in schema.properties or []:
prop_path = f"{path}.{prop.name}" if path else prop.name
if prop.name in value:
prop_value = value[prop.name]
prop_schema = InferredSchema(
root_type=prop.type, properties=prop.properties
)
self._validate_value(prop_value, prop_schema, prop_path, errors)
elif prop.required:
errors.append(f"{prop_path}: required property missing")
elif schema.root_type == SchemaType.ARRAY:
if actual_type != "array":
errors.append(f"{path}: expected array, got {actual_type}")
return False
if isinstance(value, list) and schema.items:
item_schema = InferredSchema(
root_type=schema.items.type,
properties=schema.items.properties if schema.items.properties else None,
)
for i, item in enumerate(value):
self._validate_value(item, item_schema, f"{path}[{i}]", errors)
elif schema.root_type == SchemaType.NUMBER:
if actual_type not in ("number", "integer"):
errors.append(f"{path}: expected number, got {actual_type}")
elif schema.root_type != actual_type:
errors.append(f"{path}: expected {schema.root_type}, got {actual_type}")
return len([e for e in errors if e.startswith(path)]) == 0
def _get_type(self, value: Any) -> str:
"""Get type string for value."""
if value is None:
return SchemaType.NULL
elif isinstance(value, bool):
return SchemaType.BOOLEAN
elif isinstance(value, int):
return SchemaType.INTEGER
elif isinstance(value, float):
return SchemaType.NUMBER
elif isinstance(value, str):
return SchemaType.STRING
elif isinstance(value, list):
return SchemaType.ARRAY
elif isinstance(value, dict):
return SchemaType.OBJECT
else:
return SchemaType.STRING