Initial upload with full project structure

This commit is contained in:
2026-02-01 20:49:01 +00:00
parent ed5f7c2156
commit cf8c722988

100
app/src/confgen/secrets.py Normal file
View File

@@ -0,0 +1,100 @@
"""Secrets resolver for environment variable interpolation."""
import os
import re
class SecretsResolver:
"""Resolver for secret values from environment variables."""
ENV_PREFIX = "env."
VAULT_PREFIX = "vault."
def __init__(self):
self.env_cache = {}
def resolve(self, content: str) -> str:
"""Resolve all secret placeholders in content."""
content = self._resolve_env_vars(content)
content = self._resolve_vault_secrets(content)
return content
def _resolve_env_vars(self, content: str) -> str:
"""Resolve environment variable placeholders."""
pattern = r"\{\{env\.([A-Z0-9_]+)\}\}"
def replace(match):
var_name = match.group(1)
value = os.environ.get(var_name)
if value is None:
if var_name in self.env_cache:
value = self.env_cache[var_name]
else:
value = os.environ.get(f"CONFGEN_{var_name}")
if value is None:
raise ValueError(f"Environment variable '{var_name}' not found")
return value
return re.sub(pattern, replace, content)
def _resolve_vault_secrets(self, content: str) -> str:
"""Resolve vault secret placeholders."""
pattern = r"\{\{vault\.([A-Z0-9_]+)\}\}"
def replace(match):
secret_path = match.group(1)
return self._get_from_vault(secret_path)
return re.sub(pattern, replace, content)
def _get_from_vault(self, secret_path: str) -> str:
"""Get a secret from a vault backend."""
vault_url = os.environ.get("CONFGEN_VAULT_URL", "")
vault_token = os.environ.get("CONFGEN_VAULT_TOKEN", "")
if not vault_url or not vault_token:
raise ValueError(
"Vault not configured. Set CONFGEN_VAULT_URL and CONFGEN_VAULT_TOKEN."
)
try:
import requests
headers = {"X-Vault-Token": vault_token}
response = requests.get(
f"{vault_url}/v1/{secret_path}",
headers=headers,
timeout=5,
)
response.raise_for_status()
data = response.json()
return data.get("data", {}).get("value", "")
except ImportError:
raise ValueError("requests library required for vault integration")
except Exception as e:
raise ValueError(f"Failed to get secret from vault: {e}")
def is_secret_placeholder(self, text: str) -> bool:
"""Check if text contains a secret placeholder."""
return self.ENV_PREFIX in text or self.VAULT_PREFIX in text
def get_secret_names(self, content: str) -> list[str]:
"""Extract secret names from content."""
env_pattern = r"\{\{env\.([A-Z0-9_]+)\}\}"
vault_pattern = r"\{\{vault\.([A-Z0-9_]+)\}\}"
env_matches = re.findall(env_pattern, content)
vault_matches = re.findall(vault_pattern, content)
return env_matches + vault_matches
def check_secret_availability(self, content: str) -> dict[str, bool]:
"""Check which secrets are available in environment."""
secret_names = self.get_secret_names(content)
availability = {}
for name in secret_names:
available = bool(os.environ.get(name) or os.environ.get(f"CONFGEN_{name}"))
availability[name] = available
return availability