Initial upload: API Mock CLI v0.1.0
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / type-check (push) Has been cancelled
CI / build (push) Has been cancelled
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.9) (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / type-check (push) Has been cancelled
CI / build (push) Has been cancelled
This commit is contained in:
145
src/core/validator.py
Normal file
145
src/core/validator.py
Normal file
@@ -0,0 +1,145 @@
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from src.models.request import RequestValidation
|
||||
|
||||
|
||||
class ValidationError:
|
||||
def __init__(self, field: str, message: str, location: str):
|
||||
self.field = field
|
||||
self.message = message
|
||||
self.location = location
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"field": self.field,
|
||||
"message": self.message,
|
||||
"location": self.location
|
||||
}
|
||||
|
||||
|
||||
class Validator:
|
||||
def __init__(self, validation_rules: Optional[RequestValidation] = None):
|
||||
self.rules = validation_rules
|
||||
|
||||
def validate(
|
||||
self,
|
||||
body: Optional[Any] = None,
|
||||
query: Optional[Dict[str, str]] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
path: Optional[Dict[str, str]] = None
|
||||
) -> Tuple[bool, List[ValidationError]]:
|
||||
errors = []
|
||||
if self.rules:
|
||||
if self.rules.body:
|
||||
body_errors = self._validate_object(body, self.rules.body, "body")
|
||||
errors.extend(body_errors)
|
||||
if self.rules.query:
|
||||
query_errors = self._validate_object(query or {}, self.rules.query, "query")
|
||||
errors.extend(query_errors)
|
||||
if self.rules.headers:
|
||||
headers_errors = self._validate_object(headers or {}, self.rules.headers, "headers")
|
||||
errors.extend(headers_errors)
|
||||
if self.rules.path:
|
||||
path_errors = self._validate_object(path or {}, self.rules.path, "path")
|
||||
errors.extend(path_errors)
|
||||
return len(errors) == 0, errors
|
||||
|
||||
def _validate_object(
|
||||
self,
|
||||
data: Any,
|
||||
schema: Dict[str, Any],
|
||||
location: str
|
||||
) -> List[ValidationError]:
|
||||
errors = []
|
||||
if not isinstance(data, dict):
|
||||
errors.append(ValidationError("root", f"Expected object for {location}", location))
|
||||
return errors
|
||||
type_hint = schema.get("type")
|
||||
if type_hint and type_hint != "object":
|
||||
errors.append(ValidationError("root", f"Expected type '{type_hint}' for {location}", location))
|
||||
required_fields = schema.get("required", [])
|
||||
for field in required_fields:
|
||||
if field not in data:
|
||||
errors.append(ValidationError(field, f"Field '{field}' is required", location))
|
||||
properties = schema.get("properties", {})
|
||||
for field, prop_schema in properties.items():
|
||||
if field in data:
|
||||
field_errors = self._validate_field(data[field], prop_schema, field, location)
|
||||
errors.extend(field_errors)
|
||||
return errors
|
||||
|
||||
def _validate_field(
|
||||
self,
|
||||
value: Any,
|
||||
schema: Dict[str, Any],
|
||||
field_name: str,
|
||||
location: str
|
||||
) -> List[ValidationError]:
|
||||
errors = []
|
||||
type_hint = schema.get("type")
|
||||
if type_hint:
|
||||
type_map = {
|
||||
"string": str,
|
||||
"integer": int,
|
||||
"number": (int, float),
|
||||
"boolean": bool,
|
||||
"array": list,
|
||||
"object": dict
|
||||
}
|
||||
expected_type = type_map.get(type_hint)
|
||||
if expected_type and not isinstance(value, expected_type):
|
||||
errors.append(ValidationError(
|
||||
field_name,
|
||||
f"Expected {type_hint}, got {type(value).__name__}",
|
||||
location
|
||||
))
|
||||
min_length = schema.get("minLength")
|
||||
if min_length and isinstance(value, str) and len(value) < min_length:
|
||||
errors.append(ValidationError(
|
||||
field_name,
|
||||
f"String must be at least {min_length} characters",
|
||||
location
|
||||
))
|
||||
max_length = schema.get("maxLength")
|
||||
if max_length and isinstance(value, str) and len(value) > max_length:
|
||||
errors.append(ValidationError(
|
||||
field_name,
|
||||
f"String must be at most {max_length} characters",
|
||||
location
|
||||
))
|
||||
minimum = schema.get("minimum")
|
||||
if minimum is not None and isinstance(value, (int, float)) and value < minimum:
|
||||
errors.append(ValidationError(
|
||||
field_name,
|
||||
f"Value must be at least {minimum}",
|
||||
location
|
||||
))
|
||||
maximum = schema.get("maximum")
|
||||
if maximum is not None and isinstance(value, (int, float)) and value > maximum:
|
||||
errors.append(ValidationError(
|
||||
field_name,
|
||||
f"Value must be at most {maximum}",
|
||||
location
|
||||
))
|
||||
pattern = schema.get("pattern")
|
||||
if pattern and isinstance(value, str):
|
||||
import re
|
||||
if not re.match(pattern, value):
|
||||
errors.append(ValidationError(
|
||||
field_name,
|
||||
f"Value does not match pattern {pattern}",
|
||||
location
|
||||
))
|
||||
format_type = schema.get("format")
|
||||
if format_type == "email" and isinstance(value, str):
|
||||
import re
|
||||
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
|
||||
if not re.match(email_pattern, value):
|
||||
errors.append(ValidationError(field_name, "Invalid email format", location))
|
||||
return errors
|
||||
|
||||
def format_errors(self, errors: List[ValidationError]) -> Dict[str, Any]:
|
||||
return {
|
||||
"valid": False,
|
||||
"error_count": len(errors),
|
||||
"errors": [e.to_dict() for e in errors]
|
||||
}
|
||||
Reference in New Issue
Block a user