From f7d9fe859a4702d1176e2c78e79ceae0609d4b5a Mon Sep 17 00:00:00 2001 From: 7000pctAUTO Date: Wed, 4 Feb 2026 21:55:17 +0000 Subject: [PATCH] Add validators, generators, and utils modules --- config_converter/validators/schema.py | 274 ++++++++++++++++++++++++++ 1 file changed, 274 insertions(+) create mode 100644 config_converter/validators/schema.py diff --git a/config_converter/validators/schema.py b/config_converter/validators/schema.py new file mode 100644 index 0000000..f91e1f9 --- /dev/null +++ b/config_converter/validators/schema.py @@ -0,0 +1,274 @@ +"""Schema inference and validation module.""" + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Tuple, Union + +from pydantic import BaseModel, ValidationError, field_validator + + +class SchemaType: + """Represents an inferred schema type.""" + + NULL = "null" + BOOLEAN = "boolean" + NUMBER = "number" + INTEGER = "integer" + STRING = "string" + ARRAY = "array" + OBJECT = "object" + + +@dataclass +class SchemaProperty: + """Represents a schema property.""" + + name: str + type: str + required: bool = True + description: str = "" + properties: Optional[List["SchemaProperty"]] = None + items: Optional["SchemaProperty"] = None + enum_values: Optional[List[Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + result: Dict[str, Any] = { + "name": self.name, + "type": self.type, + "required": self.required, + } + if self.description: + result["description"] = self.description + if self.properties: + result["properties"] = [p.to_dict() for p in self.properties] + if self.items: + result["items"] = self.items.to_dict() + if self.enum_values is not None: + result["enum_values"] = self.enum_values + return result + + +@dataclass +class InferredSchema: + """Represents an inferred schema.""" + + root_type: str + properties: List[SchemaProperty] = field(default_factory=list) + description: str = "" + items: Optional[SchemaProperty] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + result: Dict[str, Any] = { + "root_type": self.root_type, + "properties": [p.to_dict() for p in self.properties], + "description": self.description, + } + if self.items: + result["items"] = self.items.to_dict() + return result + + +class SchemaInferrer: + """Infers schema from configuration data.""" + + def infer(self, data: Any) -> InferredSchema: + """Infer schema from data.""" + if data is None: + return InferredSchema(root_type=SchemaType.NULL) + elif isinstance(data, bool): + return InferredSchema(root_type=SchemaType.BOOLEAN) + elif isinstance(data, (int, float)): + schema = InferredSchema(root_type=SchemaType.NUMBER) + if isinstance(data, int): + schema.root_type = SchemaType.INTEGER + return schema + elif isinstance(data, str): + return InferredSchema(root_type=SchemaType.STRING) + elif isinstance(data, list): + return self._infer_array_schema(data) + elif isinstance(data, dict): + return self._infer_object_schema(data) + else: + return InferredSchema(root_type=SchemaType.STRING) + + def _infer_array_schema(self, data: List[Any]) -> InferredSchema: + """Infer schema for an array.""" + if not data: + return InferredSchema(root_type=SchemaType.ARRAY) + + inferred_types = [self.infer(item) for item in data] + + common_type = self._find_common_type(inferred_types) + + if common_type == SchemaType.OBJECT: + merged_props = self._merge_object_properties(inferred_types) + return InferredSchema( + root_type=SchemaType.ARRAY, + properties=merged_props, + items=SchemaProperty(name="item", type="object", properties=merged_props), + ) + + return InferredSchema( + root_type=SchemaType.ARRAY, + items=SchemaProperty(name="item", type=common_type), + ) + + def _infer_object_schema(self, data: Dict[str, Any]) -> InferredSchema: + """Infer schema for an object.""" + properties = [] + + for key, value in data.items(): + inferred = self.infer(value) + prop = SchemaProperty( + name=key, + type=inferred.root_type, + properties=inferred.properties if inferred.properties else None, + items=inferred.items, + ) + properties.append(prop) + + return InferredSchema( + root_type=SchemaType.OBJECT, + properties=properties, + ) + + def _find_common_type(self, schemas: List[InferredSchema]) -> str: + """Find the common type among schemas.""" + types = set(s.root_type for s in schemas) + if len(types) == 1: + return types.pop() + if SchemaType.STRING in types: + return SchemaType.STRING + if SchemaType.NUMBER in types or SchemaType.INTEGER in types: + return SchemaType.NUMBER + if SchemaType.OBJECT in types: + return SchemaType.OBJECT + return SchemaType.STRING + + def _merge_object_properties( + self, schemas: List[InferredSchema] + ) -> List[SchemaProperty]: + """Merge properties from multiple object schemas.""" + prop_dict: Dict[str, SchemaProperty] = {} + + for schema in schemas: + if schema.properties: + for prop in schema.properties: + if prop.name not in prop_dict: + prop_dict[prop.name] = prop + else: + existing = prop_dict[prop.name] + if prop.type != existing.type: + existing.type = SchemaType.STRING + + return list(prop_dict.values()) + + +class SchemaModel(BaseModel): + """Pydantic model for schema validation.""" + + name: str + type: str + value: Optional[Any] = None + required: bool = True + + @field_validator("type") + @classmethod + def validate_type(cls, v: str) -> str: + """Validate type is one of supported types.""" + valid_types = [ + "null", + "boolean", + "number", + "integer", + "string", + "array", + "object", + ] + if v not in valid_types: + raise ValueError(f"Type must be one of {valid_types}") + return v + + +class SchemaValidator: + """Validates data against a schema.""" + + def __init__(self, schema: InferredSchema): + self.schema = schema + + def validate(self, data: Any) -> Tuple[bool, List[str]]: + """Validate data against schema.""" + errors = [] + is_valid = self._validate_value(data, self.schema, "", errors) + return is_valid, errors + + def _validate_value( + self, value: Any, schema: InferredSchema, path: str, errors: List[str] + ) -> bool: + """Validate a value against schema.""" + if value is None: + if schema.root_type != SchemaType.NULL: + errors.append(f"{path}: expected {schema.root_type}, got null") + return False + return True + + actual_type = self._get_type(value) + + if schema.root_type == SchemaType.OBJECT: + if actual_type != "object": + errors.append(f"{path}: expected object, got {actual_type}") + return False + if not isinstance(value, dict): + return True + + for prop in schema.properties or []: + prop_path = f"{path}.{prop.name}" if path else prop.name + if prop.name in value: + prop_value = value[prop.name] + prop_schema = InferredSchema( + root_type=prop.type, properties=prop.properties + ) + self._validate_value(prop_value, prop_schema, prop_path, errors) + elif prop.required: + errors.append(f"{prop_path}: required property missing") + + elif schema.root_type == SchemaType.ARRAY: + if actual_type != "array": + errors.append(f"{path}: expected array, got {actual_type}") + return False + if isinstance(value, list) and schema.items: + item_schema = InferredSchema( + root_type=schema.items.type, + properties=schema.items.properties if schema.items.properties else None, + ) + for i, item in enumerate(value): + self._validate_value(item, item_schema, f"{path}[{i}]", errors) + + elif schema.root_type == SchemaType.NUMBER: + if actual_type not in ("number", "integer"): + errors.append(f"{path}: expected number, got {actual_type}") + + elif schema.root_type != actual_type: + errors.append(f"{path}: expected {schema.root_type}, got {actual_type}") + + return len([e for e in errors if e.startswith(path)]) == 0 + + def _get_type(self, value: Any) -> str: + """Get type string for value.""" + if value is None: + return SchemaType.NULL + elif isinstance(value, bool): + return SchemaType.BOOLEAN + elif isinstance(value, int): + return SchemaType.INTEGER + elif isinstance(value, float): + return SchemaType.NUMBER + elif isinstance(value, str): + return SchemaType.STRING + elif isinstance(value, list): + return SchemaType.ARRAY + elif isinstance(value, dict): + return SchemaType.OBJECT + else: + return SchemaType.STRING