diff --git a/app/src/confgen/secrets.py b/app/src/confgen/secrets.py new file mode 100644 index 0000000..cb8b3fb --- /dev/null +++ b/app/src/confgen/secrets.py @@ -0,0 +1,100 @@ +"""Secrets resolver for environment variable interpolation.""" + +import os +import re + + +class SecretsResolver: + """Resolver for secret values from environment variables.""" + + ENV_PREFIX = "env." + VAULT_PREFIX = "vault." + + def __init__(self): + self.env_cache = {} + + def resolve(self, content: str) -> str: + """Resolve all secret placeholders in content.""" + content = self._resolve_env_vars(content) + content = self._resolve_vault_secrets(content) + return content + + def _resolve_env_vars(self, content: str) -> str: + """Resolve environment variable placeholders.""" + pattern = r"\{\{env\.([A-Z0-9_]+)\}\}" + + def replace(match): + var_name = match.group(1) + value = os.environ.get(var_name) + if value is None: + if var_name in self.env_cache: + value = self.env_cache[var_name] + else: + value = os.environ.get(f"CONFGEN_{var_name}") + if value is None: + raise ValueError(f"Environment variable '{var_name}' not found") + return value + + return re.sub(pattern, replace, content) + + def _resolve_vault_secrets(self, content: str) -> str: + """Resolve vault secret placeholders.""" + pattern = r"\{\{vault\.([A-Z0-9_]+)\}\}" + + def replace(match): + secret_path = match.group(1) + return self._get_from_vault(secret_path) + + return re.sub(pattern, replace, content) + + def _get_from_vault(self, secret_path: str) -> str: + """Get a secret from a vault backend.""" + vault_url = os.environ.get("CONFGEN_VAULT_URL", "") + vault_token = os.environ.get("CONFGEN_VAULT_TOKEN", "") + + if not vault_url or not vault_token: + raise ValueError( + "Vault not configured. Set CONFGEN_VAULT_URL and CONFGEN_VAULT_TOKEN." + ) + + try: + import requests + + headers = {"X-Vault-Token": vault_token} + response = requests.get( + f"{vault_url}/v1/{secret_path}", + headers=headers, + timeout=5, + ) + response.raise_for_status() + data = response.json() + return data.get("data", {}).get("value", "") + except ImportError: + raise ValueError("requests library required for vault integration") + except Exception as e: + raise ValueError(f"Failed to get secret from vault: {e}") + + def is_secret_placeholder(self, text: str) -> bool: + """Check if text contains a secret placeholder.""" + return self.ENV_PREFIX in text or self.VAULT_PREFIX in text + + def get_secret_names(self, content: str) -> list[str]: + """Extract secret names from content.""" + env_pattern = r"\{\{env\.([A-Z0-9_]+)\}\}" + vault_pattern = r"\{\{vault\.([A-Z0-9_]+)\}\}" + + env_matches = re.findall(env_pattern, content) + vault_matches = re.findall(vault_pattern, content) + + return env_matches + vault_matches + + def check_secret_availability(self, content: str) -> dict[str, bool]: + """Check which secrets are available in environment.""" + secret_names = self.get_secret_names(content) + availability = {} + + for name in secret_names: + available = bool(os.environ.get(name) or os.environ.get(f"CONFGEN_{name}")) + availability[name] = available + + return availability