diff --git a/src/core/validator.py b/src/core/validator.py new file mode 100644 index 0000000..8055870 --- /dev/null +++ b/src/core/validator.py @@ -0,0 +1,145 @@ +from typing import Any, Dict, List, Optional, Tuple +from src.models.request import RequestValidation + + +class ValidationError: + def __init__(self, field: str, message: str, location: str): + self.field = field + self.message = message + self.location = location + + def to_dict(self) -> Dict[str, Any]: + return { + "field": self.field, + "message": self.message, + "location": self.location + } + + +class Validator: + def __init__(self, validation_rules: Optional[RequestValidation] = None): + self.rules = validation_rules + + def validate( + self, + body: Optional[Any] = None, + query: Optional[Dict[str, str]] = None, + headers: Optional[Dict[str, str]] = None, + path: Optional[Dict[str, str]] = None + ) -> Tuple[bool, List[ValidationError]]: + errors = [] + if self.rules: + if self.rules.body: + body_errors = self._validate_object(body, self.rules.body, "body") + errors.extend(body_errors) + if self.rules.query: + query_errors = self._validate_object(query or {}, self.rules.query, "query") + errors.extend(query_errors) + if self.rules.headers: + headers_errors = self._validate_object(headers or {}, self.rules.headers, "headers") + errors.extend(headers_errors) + if self.rules.path: + path_errors = self._validate_object(path or {}, self.rules.path, "path") + errors.extend(path_errors) + return len(errors) == 0, errors + + def _validate_object( + self, + data: Any, + schema: Dict[str, Any], + location: str + ) -> List[ValidationError]: + errors = [] + if not isinstance(data, dict): + errors.append(ValidationError("root", f"Expected object for {location}", location)) + return errors + type_hint = schema.get("type") + if type_hint and type_hint != "object": + errors.append(ValidationError("root", f"Expected type '{type_hint}' for {location}", location)) + required_fields = schema.get("required", []) + for field in required_fields: + if field not in data: + errors.append(ValidationError(field, f"Field '{field}' is required", location)) + properties = schema.get("properties", {}) + for field, prop_schema in properties.items(): + if field in data: + field_errors = self._validate_field(data[field], prop_schema, field, location) + errors.extend(field_errors) + return errors + + def _validate_field( + self, + value: Any, + schema: Dict[str, Any], + field_name: str, + location: str + ) -> List[ValidationError]: + errors = [] + type_hint = schema.get("type") + if type_hint: + type_map = { + "string": str, + "integer": int, + "number": (int, float), + "boolean": bool, + "array": list, + "object": dict + } + expected_type = type_map.get(type_hint) + if expected_type and not isinstance(value, expected_type): + errors.append(ValidationError( + field_name, + f"Expected {type_hint}, got {type(value).__name__}", + location + )) + min_length = schema.get("minLength") + if min_length and isinstance(value, str) and len(value) < min_length: + errors.append(ValidationError( + field_name, + f"String must be at least {min_length} characters", + location + )) + max_length = schema.get("maxLength") + if max_length and isinstance(value, str) and len(value) > max_length: + errors.append(ValidationError( + field_name, + f"String must be at most {max_length} characters", + location + )) + minimum = schema.get("minimum") + if minimum is not None and isinstance(value, (int, float)) and value < minimum: + errors.append(ValidationError( + field_name, + f"Value must be at least {minimum}", + location + )) + maximum = schema.get("maximum") + if maximum is not None and isinstance(value, (int, float)) and value > maximum: + errors.append(ValidationError( + field_name, + f"Value must be at most {maximum}", + location + )) + pattern = schema.get("pattern") + if pattern and isinstance(value, str): + import re + if not re.match(pattern, value): + errors.append(ValidationError( + field_name, + f"Value does not match pattern {pattern}", + location + )) + format_type = schema.get("format") + if format_type == "email" and isinstance(value, str): + import re + email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" + if not re.match(email_pattern, value): + errors.append(ValidationError(field_name, "Invalid email format", location)) + return errors + + def format_errors(self, errors: List[ValidationError]) -> Dict[str, Any]: + return { + "valid": False, + "error_count": len(errors), + "errors": [e.to_dict() for e in errors] + }