Initial upload with full project structure
This commit is contained in:
200
app/src/confgen/vault.py
Normal file
200
app/src/confgen/vault.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""Vault backend integration for secret management."""
|
||||
|
||||
import json
|
||||
import requests
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
class VaultBackend(ABC):
|
||||
"""Abstract base class for vault backends."""
|
||||
|
||||
@abstractmethod
|
||||
def get(self, key: str) -> Optional[str]:
|
||||
"""Get a secret value by key."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set(self, key: str, value: str) -> None:
|
||||
"""Set a secret value."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete(self, key: str) -> None:
|
||||
"""Delete a secret."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def list(self) -> list[str]:
|
||||
"""List all secret keys."""
|
||||
pass
|
||||
|
||||
|
||||
class EnvironmentVault(VaultBackend):
|
||||
"""Vault backend that uses environment variables."""
|
||||
|
||||
def __init__(self, prefix: str = "CONFGEN_"):
|
||||
self.prefix = prefix
|
||||
|
||||
def get(self, key: str) -> Optional[str]:
|
||||
"""Get a secret from environment variables."""
|
||||
import os
|
||||
|
||||
return os.environ.get(f"{self.prefix}{key}") or os.environ.get(key)
|
||||
|
||||
def set(self, key: str, value: str) -> None:
|
||||
"""Set an environment variable."""
|
||||
import os
|
||||
|
||||
os.environ[f"{self.prefix}{key}"] = value
|
||||
|
||||
def delete(self, key: str) -> None:
|
||||
"""Delete an environment variable."""
|
||||
import os
|
||||
|
||||
key_with_prefix = f"{self.prefix}{key}"
|
||||
if key_with_prefix in os.environ:
|
||||
del os.environ[key_with_prefix]
|
||||
if key in os.environ:
|
||||
del os.environ[key]
|
||||
|
||||
def list(self) -> list[str]:
|
||||
"""List all environment variable secrets."""
|
||||
import os
|
||||
|
||||
return [k for k in os.environ.keys() if k.startswith(self.prefix)]
|
||||
|
||||
|
||||
class FileVault(VaultBackend):
|
||||
"""Vault backend that uses a JSON file for secrets."""
|
||||
|
||||
def __init__(self, path: str = "~/.confgen/vault.json"):
|
||||
import os
|
||||
|
||||
self.path = os.path.expanduser(path)
|
||||
self._ensure_file()
|
||||
|
||||
def _ensure_file(self) -> None:
|
||||
"""Ensure the vault file exists."""
|
||||
import os
|
||||
|
||||
os.makedirs(os.path.dirname(self.path), exist_ok=True)
|
||||
if not os.path.exists(self.path):
|
||||
with open(self.path, "w") as f:
|
||||
json.dump({}, f)
|
||||
|
||||
def _load(self) -> dict[str, str]:
|
||||
"""Load secrets from file."""
|
||||
with open(self.path) as f:
|
||||
return json.load(f)
|
||||
|
||||
def _save(self, data: dict[str, str]) -> None:
|
||||
"""Save secrets to file."""
|
||||
with open(self.path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
def get(self, key: str) -> Optional[str]:
|
||||
"""Get a secret from file."""
|
||||
data = self._load()
|
||||
return data.get(key)
|
||||
|
||||
def set(self, key: str, value: str) -> None:
|
||||
"""Set a secret in file."""
|
||||
data = self._load()
|
||||
data[key] = value
|
||||
self._save(data)
|
||||
|
||||
def delete(self, key: str) -> None:
|
||||
"""Delete a secret from file."""
|
||||
data = self._load()
|
||||
if key in data:
|
||||
del data[key]
|
||||
self._save(data)
|
||||
|
||||
def list(self) -> list[str]:
|
||||
"""List all secrets in file."""
|
||||
data = self._load()
|
||||
return list(data.keys())
|
||||
|
||||
|
||||
class HTTPSVault(VaultBackend):
|
||||
"""Vault backend that uses an HTTP API."""
|
||||
|
||||
def __init__(self, base_url: str, token: str, timeout: int = 5):
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.token = token
|
||||
self.timeout = timeout
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update({"X-Vault-Token": token})
|
||||
|
||||
def _request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
data: Optional[dict] = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Make an HTTP request to the vault."""
|
||||
url = f"{self.base_url}/v1/{path}"
|
||||
response = self.session.request(method, url, json=data, timeout=self.timeout)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def get(self, key: str) -> Optional[str]:
|
||||
"""Get a secret from the vault."""
|
||||
try:
|
||||
data = self._request("GET", key)
|
||||
return data.get("data", {}).get("value")
|
||||
except requests.HTTPError:
|
||||
return None
|
||||
|
||||
def set(self, key: str, value: str) -> None:
|
||||
"""Set a secret in the vault."""
|
||||
self._request("POST", key, {"value": value})
|
||||
|
||||
def delete(self, key: str) -> None:
|
||||
"""Delete a secret from the vault."""
|
||||
self._request("DELETE", key)
|
||||
|
||||
def list(self) -> list[str]:
|
||||
"""List all secrets in the vault."""
|
||||
try:
|
||||
data = self._request("LIST", "")
|
||||
return data.get("data", {}).get("keys", [])
|
||||
except requests.HTTPError:
|
||||
return []
|
||||
|
||||
|
||||
class VaultManager:
|
||||
"""Manager for multiple vault backends."""
|
||||
|
||||
def __init__(self):
|
||||
self.backends: list[VaultBackend] = []
|
||||
|
||||
def add_backend(self, backend: VaultBackend) -> None:
|
||||
"""Add a vault backend."""
|
||||
self.backends.append(backend)
|
||||
|
||||
def get(self, key: str) -> Optional[str]:
|
||||
"""Get a secret from the first backend that has it."""
|
||||
for backend in self.backends:
|
||||
value = backend.get(key)
|
||||
if value is not None:
|
||||
return value
|
||||
return None
|
||||
|
||||
def set(self, key: str, value: str) -> None:
|
||||
"""Set a secret in all backends."""
|
||||
for backend in self.backends:
|
||||
backend.set(key, value)
|
||||
|
||||
def delete(self, key: str) -> None:
|
||||
"""Delete a secret from all backends."""
|
||||
for backend in self.backends:
|
||||
backend.delete(key)
|
||||
|
||||
def list(self) -> list[str]:
|
||||
"""List all secrets from all backends."""
|
||||
secrets = set()
|
||||
for backend in self.backends:
|
||||
secrets.update(backend.list())
|
||||
return sorted(list(secrets))
|
||||
Reference in New Issue
Block a user