diff --git a/dataforge/validator.py b/dataforge/validator.py new file mode 100644 index 0000000..a4e1ecd --- /dev/null +++ b/dataforge/validator.py @@ -0,0 +1,82 @@ +"""JSON Schema validation module for DataForge CLI.""" + +import json +from typing import Any, Dict, List, Optional + +from jsonschema import Draft7Validator, ValidationError, validate + +from .parsers import load_data + + +class SchemaValidator: + """Handle JSON Schema validation for data files.""" + + def __init__(self, schema: Optional[Dict[str, Any]] = None, schema_file: Optional[str] = None): + """Initialize validator with optional schema.""" + self.schema = None + self.validator_class = None + if schema is not None: + self.set_schema(schema) + elif schema_file is not None: + self.load_schema_from_file(schema_file) + + def set_schema(self, schema: Dict[str, Any]) -> None: + """Set the validation schema.""" + self.schema = schema + draft = schema.get("$schema", "http://json-schema.org/draft-07/schema#") + if "draft-07" in draft or "draft-07" in draft: + self.validator_class = Draft7Validator + elif "draft-2019-09" in draft: + from jsonschema import Draft201909Validator + self.validator_class = Draft201909Validator + else: + self.validator_class = Draft7Validator + + def load_schema_from_file(self, schema_file: str) -> None: + """Load schema from a file.""" + schema_data = load_data(schema_file) + self.set_schema(schema_data) + + def validate(self, data: Any, raise_on_error: bool = False) -> List[ValidationError]: + """Validate data against the schema.""" + if self.schema is None: + raise ValueError("No schema has been set for validation") + validator = self.validator_class(self.schema) + errors = list(validator.iter_errors(data)) + if raise_on_error and errors: + raise ValidationError(errors[0].message) + return errors + + def validate_file(self, file_path: str, format: Optional[str] = None) -> List[ValidationError]: + """Validate a file against the schema.""" + data = load_data(file_path, format) + return self.validate(data) + + def get_error_messages(self, errors: List[ValidationError]) -> List[str]: + """Convert validation errors to human-readable messages.""" + messages = [] + for error in errors: + path = " -> ".join(str(p) for p in error.path) if error.path else "root" + messages.append(f"Path '{path}': {error.message}") + return messages + + +def validate_data(data: Any, schema: Dict[str, Any]) -> tuple[bool, List[str]]: + """Validate data against a schema and return success status and error messages.""" + validator = SchemaValidator(schema=schema) + errors = validator.validate(data) + messages = validator.get_error_messages(errors) + return len(errors) == 0, messages + + +def validate_file(file_path: str, schema_file: str, format: Optional[str] = None) -> tuple[bool, List[str]]: + """Validate a file against a schema file.""" + validator = SchemaValidator(schema_file=schema_file) + errors = validator.validate_file(file_path, format) + messages = validator.get_error_messages(errors) + return len(errors) == 0, messages + + +def load_schema(schema_source: str) -> Dict[str, Any]: + """Load a schema from a file or string.""" + return load_data(schema_source)