117 lines
3.7 KiB
Python
117 lines
3.7 KiB
Python
"""Schema validation for environment variables."""
|
|
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
from pydantic import BaseModel, Field, validator
|
|
from pydantic.types import AnyUrl
|
|
|
|
|
|
class VarSchema(BaseModel):
|
|
"""Schema for a single variable."""
|
|
type: str = "string"
|
|
required: bool = False
|
|
description: Optional[str] = None
|
|
default: Optional[str] = None
|
|
pattern: Optional[str] = None
|
|
min_length: Optional[int] = None
|
|
max_length: Optional[int] = None
|
|
|
|
|
|
class Schema(BaseModel):
|
|
"""Full schema for environment validation."""
|
|
variables: Dict[str, VarSchema] = {}
|
|
|
|
class Config:
|
|
arbitrary_types_allowed = True
|
|
|
|
|
|
class ValidationError:
|
|
"""Represents a validation error."""
|
|
|
|
def __init__(self, key: str, message: str, severity: str = "error"):
|
|
self.key = key
|
|
self.message = message
|
|
self.severity = severity
|
|
|
|
def __str__(self):
|
|
return f"{self.key}: {self.message}"
|
|
|
|
|
|
class Validator:
|
|
"""Environment variable validator."""
|
|
|
|
def __init__(self, schema: Optional[Schema] = None):
|
|
"""Initialize validator with optional schema."""
|
|
self.schema = schema
|
|
|
|
@classmethod
|
|
def from_file(cls, schema_path: Optional[Path] = None) -> "Validator":
|
|
"""Load schema from file."""
|
|
import yaml
|
|
|
|
if schema_path is None:
|
|
schema_path = Path.cwd() / ".env.schema.yaml"
|
|
|
|
if not schema_path.exists():
|
|
return cls()
|
|
|
|
try:
|
|
with open(schema_path, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
return cls(schema=Schema(**data))
|
|
except yaml.YAMLError:
|
|
return cls()
|
|
|
|
def validate(self, variables: Dict[str, str]) -> List[ValidationError]:
|
|
"""Validate variables against schema."""
|
|
errors = []
|
|
|
|
if self.schema is None:
|
|
return errors
|
|
|
|
for key, var_schema in self.schema.variables.items():
|
|
value = variables.get(key)
|
|
|
|
if var_schema.required and (value is None or value == ""):
|
|
errors.append(ValidationError(key, "required variable is missing"))
|
|
continue
|
|
|
|
if value is not None:
|
|
if var_schema.pattern:
|
|
if not re.match(var_schema.pattern, value):
|
|
errors.append(ValidationError(key, f"value does not match pattern {var_schema.pattern}"))
|
|
|
|
if var_schema.min_length and len(value) < var_schema.min_length:
|
|
errors.append(ValidationError(key, f"value length {len(value)} is less than minimum {var_schema.min_length}"))
|
|
|
|
if var_schema.max_length and len(value) > var_schema.max_length:
|
|
errors.append(ValidationError(key, f"value length {len(value)} exceeds maximum {var_schema.max_length}"))
|
|
|
|
return errors
|
|
|
|
def check_types(self, variables: Dict[str, str]) -> List[ValidationError]:
|
|
"""Check variable types against schema."""
|
|
errors = []
|
|
|
|
if self.schema is None:
|
|
return errors
|
|
|
|
type_converters = {
|
|
"bool": lambda v: v.lower() in ("true", "1", "yes"),
|
|
"int": lambda v: int(v),
|
|
"float": lambda v: float(v),
|
|
"url": lambda v: AnyUrl(v),
|
|
}
|
|
|
|
for key, var_schema in self.schema.variables.items():
|
|
value = variables.get(key)
|
|
|
|
if value is not None and var_schema.type in type_converters:
|
|
try:
|
|
type_converters[var_schema.type](value)
|
|
except (ValueError, Exception):
|
|
errors.append(ValidationError(key, f"cannot convert to {var_schema.type}"))
|
|
|
|
return errors
|