83 lines
3.2 KiB
Python
83 lines
3.2 KiB
Python
"""JSON Schema validation module for DataForge CLI."""
|
|
|
|
import json
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from jsonschema import Draft7Validator, ValidationError, validate
|
|
|
|
from .parsers import load_data
|
|
|
|
|
|
class SchemaValidator:
|
|
"""Handle JSON Schema validation for data files."""
|
|
|
|
def __init__(self, schema: Optional[Dict[str, Any]] = None, schema_file: Optional[str] = None):
|
|
"""Initialize validator with optional schema."""
|
|
self.schema = None
|
|
self.validator_class = None
|
|
if schema is not None:
|
|
self.set_schema(schema)
|
|
elif schema_file is not None:
|
|
self.load_schema_from_file(schema_file)
|
|
|
|
def set_schema(self, schema: Dict[str, Any]) -> None:
|
|
"""Set the validation schema."""
|
|
self.schema = schema
|
|
draft = schema.get("$", "http://json-schema.org/draft-07/schema#")
|
|
if "draft-07" in draft or "draft-07" in draft:
|
|
self.validator_class = Draft7Validator
|
|
elif "draft-2019-09" in draft:
|
|
from jsonschema import Draft201909Validator
|
|
self.validator_class = Draft201909Validator
|
|
else:
|
|
self.validator_class = Draft7Validator
|
|
|
|
def load_schema_from_file(self, schema_file: str) -> None:
|
|
"""Load schema from a file."""
|
|
schema_data = load_data(schema_file)
|
|
self.set_schema(schema_data)
|
|
|
|
def validate(self, data: Any, raise_on_error: bool = False) -> List[ValidationError]:
|
|
"""Validate data against the schema."""
|
|
if self.schema is None:
|
|
raise ValueError("No schema has been set for validation")
|
|
validator = self.validator_class(self.schema)
|
|
errors = list(validator.iter_errors(data))
|
|
if raise_on_error and errors:
|
|
raise ValidationError(errors[0].message)
|
|
return errors
|
|
|
|
def validate_file(self, file_path: str, format: Optional[str] = None) -> List[ValidationError]:
|
|
"""Validate a file against the schema."""
|
|
data = load_data(file_path, format)
|
|
return self.validate(data)
|
|
|
|
def get_error_messages(self, errors: List[ValidationError]) -> List[str]:
|
|
"""Convert validation errors to human-readable messages."""
|
|
messages = []
|
|
for error in errors:
|
|
path = " -> ".join(str(p) for p in error.path) if error.path else "root"
|
|
messages.append(f"Path '{path}': {error.message}")
|
|
return messages
|
|
|
|
|
|
def validate_data(data: Any, schema: Dict[str, Any]) -> tuple[bool, List[str]]:
|
|
"""Validate data against a schema and return success status and error messages."""
|
|
validator = SchemaValidator(schema=schema)
|
|
errors = validator.validate(data)
|
|
messages = validator.get_error_messages(errors)
|
|
return len(errors) == 0, messages
|
|
|
|
|
|
def validate_file(file_path: str, schema_file: str, format: Optional[str] = None) -> tuple[bool, List[str]]:
|
|
"""Validate a file against a schema file."""
|
|
validator = SchemaValidator(schema_file=schema_file)
|
|
errors = validator.validate_file(file_path, format)
|
|
messages = validator.get_error_messages(errors)
|
|
return len(errors) == 0, messages
|
|
|
|
|
|
def load_schema(schema_source: str) -> Dict[str, Any]:
|
|
"""Load a schema from a file or string."""
|
|
return load_data(schema_source)
|