diff --git a/app/src/confgen/vault.py b/app/src/confgen/vault.py new file mode 100644 index 0000000..36b3dbe --- /dev/null +++ b/app/src/confgen/vault.py @@ -0,0 +1,200 @@ +"""Vault backend integration for secret management.""" + +import json +import requests +from abc import ABC, abstractmethod +from typing import Any, Optional + + +class VaultBackend(ABC): + """Abstract base class for vault backends.""" + + @abstractmethod + def get(self, key: str) -> Optional[str]: + """Get a secret value by key.""" + pass + + @abstractmethod + def set(self, key: str, value: str) -> None: + """Set a secret value.""" + pass + + @abstractmethod + def delete(self, key: str) -> None: + """Delete a secret.""" + pass + + @abstractmethod + def list(self) -> list[str]: + """List all secret keys.""" + pass + + +class EnvironmentVault(VaultBackend): + """Vault backend that uses environment variables.""" + + def __init__(self, prefix: str = "CONFGEN_"): + self.prefix = prefix + + def get(self, key: str) -> Optional[str]: + """Get a secret from environment variables.""" + import os + + return os.environ.get(f"{self.prefix}{key}") or os.environ.get(key) + + def set(self, key: str, value: str) -> None: + """Set an environment variable.""" + import os + + os.environ[f"{self.prefix}{key}"] = value + + def delete(self, key: str) -> None: + """Delete an environment variable.""" + import os + + key_with_prefix = f"{self.prefix}{key}" + if key_with_prefix in os.environ: + del os.environ[key_with_prefix] + if key in os.environ: + del os.environ[key] + + def list(self) -> list[str]: + """List all environment variable secrets.""" + import os + + return [k for k in os.environ.keys() if k.startswith(self.prefix)] + + +class FileVault(VaultBackend): + """Vault backend that uses a JSON file for secrets.""" + + def __init__(self, path: str = "~/.confgen/vault.json"): + import os + + self.path = os.path.expanduser(path) + self._ensure_file() + + def _ensure_file(self) -> None: + """Ensure the vault file exists.""" + import os + + os.makedirs(os.path.dirname(self.path), exist_ok=True) + if not os.path.exists(self.path): + with open(self.path, "w") as f: + json.dump({}, f) + + def _load(self) -> dict[str, str]: + """Load secrets from file.""" + with open(self.path) as f: + return json.load(f) + + def _save(self, data: dict[str, str]) -> None: + """Save secrets to file.""" + with open(self.path, "w") as f: + json.dump(data, f, indent=2) + + def get(self, key: str) -> Optional[str]: + """Get a secret from file.""" + data = self._load() + return data.get(key) + + def set(self, key: str, value: str) -> None: + """Set a secret in file.""" + data = self._load() + data[key] = value + self._save(data) + + def delete(self, key: str) -> None: + """Delete a secret from file.""" + data = self._load() + if key in data: + del data[key] + self._save(data) + + def list(self) -> list[str]: + """List all secrets in file.""" + data = self._load() + return list(data.keys()) + + +class HTTPSVault(VaultBackend): + """Vault backend that uses an HTTP API.""" + + def __init__(self, base_url: str, token: str, timeout: int = 5): + self.base_url = base_url.rstrip("/") + self.token = token + self.timeout = timeout + self.session = requests.Session() + self.session.headers.update({"X-Vault-Token": token}) + + def _request( + self, + method: str, + path: str, + data: Optional[dict] = None, + ) -> dict[str, Any]: + """Make an HTTP request to the vault.""" + url = f"{self.base_url}/v1/{path}" + response = self.session.request(method, url, json=data, timeout=self.timeout) + response.raise_for_status() + return response.json() + + def get(self, key: str) -> Optional[str]: + """Get a secret from the vault.""" + try: + data = self._request("GET", key) + return data.get("data", {}).get("value") + except requests.HTTPError: + return None + + def set(self, key: str, value: str) -> None: + """Set a secret in the vault.""" + self._request("POST", key, {"value": value}) + + def delete(self, key: str) -> None: + """Delete a secret from the vault.""" + self._request("DELETE", key) + + def list(self) -> list[str]: + """List all secrets in the vault.""" + try: + data = self._request("LIST", "") + return data.get("data", {}).get("keys", []) + except requests.HTTPError: + return [] + + +class VaultManager: + """Manager for multiple vault backends.""" + + def __init__(self): + self.backends: list[VaultBackend] = [] + + def add_backend(self, backend: VaultBackend) -> None: + """Add a vault backend.""" + self.backends.append(backend) + + def get(self, key: str) -> Optional[str]: + """Get a secret from the first backend that has it.""" + for backend in self.backends: + value = backend.get(key) + if value is not None: + return value + return None + + def set(self, key: str, value: str) -> None: + """Set a secret in all backends.""" + for backend in self.backends: + backend.set(key, value) + + def delete(self, key: str) -> None: + """Delete a secret from all backends.""" + for backend in self.backends: + backend.delete(key) + + def list(self) -> list[str]: + """List all secrets from all backends.""" + secrets = set() + for backend in self.backends: + secrets.update(backend.list()) + return sorted(list(secrets))