136 lines
4.1 KiB
Python
136 lines
4.1 KiB
Python
"""Encryption module for env-pro using AES-256-GCM."""
|
|
|
|
import os
|
|
import base64
|
|
import hashlib
|
|
from typing import Optional
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.backends import default_backend
|
|
import keyring
|
|
|
|
|
|
class EncryptionError(Exception):
|
|
"""Base exception for encryption errors."""
|
|
pass
|
|
|
|
|
|
def generate_salt() -> bytes:
|
|
"""Generate a random salt for key derivation."""
|
|
return os.urandom(16)
|
|
|
|
|
|
def generate_nonce() -> bytes:
|
|
"""Generate a random nonce for AES-GCM."""
|
|
return os.urandom(12)
|
|
|
|
|
|
def derive_key(passphrase: str, salt: bytes, iterations: int = 100000) -> bytes:
|
|
"""Derive an encryption key from a passphrase using PBKDF2."""
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=32,
|
|
salt=salt,
|
|
iterations=iterations,
|
|
backend=default_backend()
|
|
)
|
|
return kdf.derive(passphrase.encode())
|
|
|
|
|
|
def generate_key() -> bytes:
|
|
"""Generate a random encryption key."""
|
|
return os.urandom(32)
|
|
|
|
|
|
def verify_key(key: bytes) -> bool:
|
|
"""Verify that a key is valid (32 bytes)."""
|
|
return isinstance(key, bytes) and len(key) == 32
|
|
|
|
|
|
def encrypt_value(value: str, key: bytes) -> str:
|
|
"""Encrypt a single value using AES-256-GCM."""
|
|
if not verify_key(key):
|
|
raise EncryptionError("Invalid encryption key")
|
|
|
|
nonce = generate_nonce()
|
|
aesgcm = AESGCM(key)
|
|
ciphertext = aesgcm.encrypt(nonce, value.encode(), None)
|
|
|
|
result = {
|
|
"nonce": base64.b64encode(nonce).decode(),
|
|
"ciphertext": base64.b64encode(ciphertext).decode()
|
|
}
|
|
return base64.b64encode(json.dumps(result).encode()).decode()
|
|
|
|
|
|
def decrypt_value(encrypted: str, key: bytes) -> str:
|
|
"""Decrypt a single value using AES-256-GCM."""
|
|
if not verify_key(key):
|
|
raise EncryptionError("Invalid encryption key")
|
|
|
|
try:
|
|
data = json.loads(base64.b64decode(encrypted))
|
|
nonce = base64.b64decode(data["nonce"])
|
|
ciphertext = base64.b64decode(data["ciphertext"])
|
|
|
|
aesgcm = AESGCM(key)
|
|
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
|
|
return plaintext.decode()
|
|
except (KeyError, ValueError, base64.binascii.Error) as e:
|
|
raise EncryptionError(f"Failed to decrypt value: {e}")
|
|
|
|
|
|
def encrypt_file(content: str, key: bytes) -> str:
|
|
"""Encrypt an entire file content."""
|
|
salt = generate_salt()
|
|
nonce = generate_nonce()
|
|
|
|
derived_key = derive_key(key.hex(), salt)
|
|
aesgcm = AESGCM(derived_key)
|
|
ciphertext = aesgcm.encrypt(nonce, content.encode(), None)
|
|
|
|
result = {
|
|
"salt": base64.b64encode(salt).decode(),
|
|
"nonce": base64.b64encode(nonce).decode(),
|
|
"ciphertext": base64.b64encode(ciphertext).decode()
|
|
}
|
|
return yaml.dump(result)
|
|
|
|
|
|
def decrypt_file(encrypted_content: str, key: bytes) -> str:
|
|
"""Decrypt an entire file content."""
|
|
try:
|
|
data = yaml.safe_load(encrypted_content)
|
|
salt = base64.b64decode(data["salt"])
|
|
nonce = base64.b64decode(data["nonce"])
|
|
ciphertext = base64.b64decode(data["ciphertext"])
|
|
|
|
derived_key = derive_key(key.hex(), salt)
|
|
aesgcm = AESGCM(derived_key)
|
|
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
|
|
return plaintext.decode()
|
|
except (KeyError, ValueError, base64.binascii.Error) as e:
|
|
raise EncryptionError(f"Failed to decrypt file: {e}")
|
|
|
|
|
|
def store_key_in_keyring(key: bytes, service: str = "env-pro", username: str = "default"):
|
|
"""Store the encryption key in the system keyring."""
|
|
if not verify_key(key):
|
|
raise EncryptionError("Invalid encryption key")
|
|
keyring.set_password(service, username, key.hex())
|
|
|
|
|
|
def get_key_from_keyring(service: str = "env-pro", username: str = "default") -> Optional[bytes]:
|
|
"""Retrieve the encryption key from the system keyring."""
|
|
key_hex = keyring.get_password(service, username)
|
|
if key_hex is None:
|
|
return None
|
|
try:
|
|
return bytes.fromhex(key_hex)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
import json
|