Initial upload: DataForge CLI with full documentation and tests
This commit is contained in:
127
dataforge/type_check.py
Normal file
127
dataforge/type_check.py
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
"""Type checking and validation module for DataForge CLI."""
|
||||||
|
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
|
|
||||||
|
TypeSpec = Union[str, Dict[str, Any]]
|
||||||
|
|
||||||
|
|
||||||
|
def infer_type(value: Any) -> str:
|
||||||
|
"""Infer the type of a value."""
|
||||||
|
if value is None:
|
||||||
|
return "null"
|
||||||
|
elif isinstance(value, bool):
|
||||||
|
return "boolean"
|
||||||
|
elif isinstance(value, int):
|
||||||
|
return "integer"
|
||||||
|
elif isinstance(value, float):
|
||||||
|
return "number"
|
||||||
|
elif isinstance(value, str):
|
||||||
|
return "string"
|
||||||
|
elif isinstance(value, list):
|
||||||
|
return "array"
|
||||||
|
elif isinstance(value, dict):
|
||||||
|
return "object"
|
||||||
|
else:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
def parse_type_spec(type_spec: TypeSpec) -> Tuple[str, Optional[Dict[str, Any]]]:
|
||||||
|
"""Parse a type specification."""
|
||||||
|
if isinstance(type_spec, str):
|
||||||
|
return type_spec, None
|
||||||
|
elif isinstance(type_spec, dict):
|
||||||
|
type_name = type_spec.get("type", "object")
|
||||||
|
return type_name, type_spec
|
||||||
|
else:
|
||||||
|
return "unknown", None
|
||||||
|
|
||||||
|
|
||||||
|
def check_type(value: Any, expected_type: TypeSpec) -> Tuple[bool, Optional[str]]:
|
||||||
|
"""Check if a value matches the expected type specification."""
|
||||||
|
actual_type = infer_type(value)
|
||||||
|
type_name, type_info = parse_type_spec(expected_type)
|
||||||
|
|
||||||
|
if type_name == "any":
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
if actual_type != type_name:
|
||||||
|
return False, f"Expected {type_name}, got {actual_type}"
|
||||||
|
|
||||||
|
if type_name == "object" and isinstance(type_info, dict):
|
||||||
|
properties = type_info.get("properties", {})
|
||||||
|
required = type_info.get("required", [])
|
||||||
|
for prop_name, prop_type in properties.items():
|
||||||
|
if prop_name in value:
|
||||||
|
prop_valid, error = check_type(value[prop_name], prop_type)
|
||||||
|
if not prop_valid:
|
||||||
|
return False, f"Property '{prop_name}': {error}"
|
||||||
|
|
||||||
|
if isinstance(required, list):
|
||||||
|
for req_prop in required:
|
||||||
|
if req_prop not in value:
|
||||||
|
return False, f"Missing required property: '{req_prop}'"
|
||||||
|
|
||||||
|
if type_name == "array" and isinstance(type_info, dict):
|
||||||
|
items = type_info.get("items")
|
||||||
|
if items is not None:
|
||||||
|
for i, item in enumerate(value):
|
||||||
|
item_valid, error = check_type(item, items)
|
||||||
|
if not item_valid:
|
||||||
|
return False, f"Array item {i}: {error}"
|
||||||
|
|
||||||
|
return True, None
|
||||||
|
|
||||||
|
|
||||||
|
def validate_types(data: Any, type_spec: TypeSpec, path: str = "root") -> List[str]:
|
||||||
|
"""Validate data against a type specification and return all errors."""
|
||||||
|
errors = []
|
||||||
|
valid, error = check_type(data, type_spec)
|
||||||
|
if not valid:
|
||||||
|
errors.append(f"Path '{path}': {error}")
|
||||||
|
elif infer_type(data) == "object" and isinstance(type_spec, dict):
|
||||||
|
properties = type_spec.get("properties", {})
|
||||||
|
for key, value in data.items():
|
||||||
|
if key in properties:
|
||||||
|
sub_errors = validate_types(value, properties[key], f"{path}.{key}")
|
||||||
|
errors.extend(sub_errors)
|
||||||
|
required = type_spec.get("required", [])
|
||||||
|
if isinstance(required, list):
|
||||||
|
for req_prop in required:
|
||||||
|
if req_prop not in data:
|
||||||
|
errors.append(f"Path '{path}': Missing required property: '{req_prop}'")
|
||||||
|
elif infer_type(data) == "array" and isinstance(type_spec, dict):
|
||||||
|
items = type_spec.get("items")
|
||||||
|
if items is not None:
|
||||||
|
for i, item in enumerate(data):
|
||||||
|
sub_errors = validate_types(item, items, f"{path}[{i}]")
|
||||||
|
errors.extend(sub_errors)
|
||||||
|
return errors
|
||||||
|
|
||||||
|
|
||||||
|
def infer_schema_from_data(data: Any) -> Dict[str, Any]:
|
||||||
|
"""Infer a JSON Schema from data."""
|
||||||
|
schema: Dict[str, Any] = {}
|
||||||
|
|
||||||
|
def build_schema(value: Any, schema_obj: Dict[str, Any]) -> None:
|
||||||
|
type_name = infer_type(value)
|
||||||
|
schema_obj["type"] = type_name
|
||||||
|
|
||||||
|
if type_name == "object" and isinstance(value, dict):
|
||||||
|
schema_obj["properties"] = {}
|
||||||
|
required = []
|
||||||
|
for key, val in value.items():
|
||||||
|
prop_schema: Dict[str, Any] = {}
|
||||||
|
build_schema(val, prop_schema)
|
||||||
|
schema_obj["properties"][key] = prop_schema
|
||||||
|
if val is not None or not isinstance(val, dict) or val:
|
||||||
|
required.append(key)
|
||||||
|
if required:
|
||||||
|
schema_obj["required"] = required
|
||||||
|
|
||||||
|
elif type_name == "array" and isinstance(value, list) and value:
|
||||||
|
schema_obj["items"] = {}
|
||||||
|
build_schema(value[0], schema_obj["items"])
|
||||||
|
|
||||||
|
build_schema(data, schema)
|
||||||
|
return schema
|
||||||
Reference in New Issue
Block a user