diff --git a/app/env_pro/core/encryption.py b/app/env_pro/core/encryption.py new file mode 100644 index 0000000..3ad68de --- /dev/null +++ b/app/env_pro/core/encryption.py @@ -0,0 +1,135 @@ +"""Encryption module for env-pro using AES-256-GCM.""" + +import os +import base64 +import hashlib +from typing import Optional +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.backends import default_backend +import keyring + + +class EncryptionError(Exception): + """Base exception for encryption errors.""" + pass + + +def generate_salt() -> bytes: + """Generate a random salt for key derivation.""" + return os.urandom(16) + + +def generate_nonce() -> bytes: + """Generate a random nonce for AES-GCM.""" + return os.urandom(12) + + +def derive_key(passphrase: str, salt: bytes, iterations: int = 100000) -> bytes: + """Derive an encryption key from a passphrase using PBKDF2.""" + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=iterations, + backend=default_backend() + ) + return kdf.derive(passphrase.encode()) + + +def generate_key() -> bytes: + """Generate a random encryption key.""" + return os.urandom(32) + + +def verify_key(key: bytes) -> bool: + """Verify that a key is valid (32 bytes).""" + return isinstance(key, bytes) and len(key) == 32 + + +def encrypt_value(value: str, key: bytes) -> str: + """Encrypt a single value using AES-256-GCM.""" + if not verify_key(key): + raise EncryptionError("Invalid encryption key") + + nonce = generate_nonce() + aesgcm = AESGCM(key) + ciphertext = aesgcm.encrypt(nonce, value.encode(), None) + + result = { + "nonce": base64.b64encode(nonce).decode(), + "ciphertext": base64.b64encode(ciphertext).decode() + } + return base64.b64encode(json.dumps(result).encode()).decode() + + +def decrypt_value(encrypted: str, key: bytes) -> str: + """Decrypt a single value using AES-256-GCM.""" + if not verify_key(key): + raise EncryptionError("Invalid encryption key") + + try: + data = json.loads(base64.b64decode(encrypted)) + nonce = base64.b64decode(data["nonce"]) + ciphertext = base64.b64decode(data["ciphertext"]) + + aesgcm = AESGCM(key) + plaintext = aesgcm.decrypt(nonce, ciphertext, None) + return plaintext.decode() + except (KeyError, ValueError, base64.binascii.Error) as e: + raise EncryptionError(f"Failed to decrypt value: {e}") + + +def encrypt_file(content: str, key: bytes) -> str: + """Encrypt an entire file content.""" + salt = generate_salt() + nonce = generate_nonce() + + derived_key = derive_key(key.hex(), salt) + aesgcm = AESGCM(derived_key) + ciphertext = aesgcm.encrypt(nonce, content.encode(), None) + + result = { + "salt": base64.b64encode(salt).decode(), + "nonce": base64.b64encode(nonce).decode(), + "ciphertext": base64.b64encode(ciphertext).decode() + } + return yaml.dump(result) + + +def decrypt_file(encrypted_content: str, key: bytes) -> str: + """Decrypt an entire file content.""" + try: + data = yaml.safe_load(encrypted_content) + salt = base64.b64decode(data["salt"]) + nonce = base64.b64decode(data["nonce"]) + ciphertext = base64.b64decode(data["ciphertext"]) + + derived_key = derive_key(key.hex(), salt) + aesgcm = AESGCM(derived_key) + plaintext = aesgcm.decrypt(nonce, ciphertext, None) + return plaintext.decode() + except (KeyError, ValueError, base64.binascii.Error) as e: + raise EncryptionError(f"Failed to decrypt file: {e}") + + +def store_key_in_keyring(key: bytes, service: str = "env-pro", username: str = "default"): + """Store the encryption key in the system keyring.""" + if not verify_key(key): + raise EncryptionError("Invalid encryption key") + keyring.set_password(service, username, key.hex()) + + +def get_key_from_keyring(service: str = "env-pro", username: str = "default") -> Optional[bytes]: + """Retrieve the encryption key from the system keyring.""" + key_hex = keyring.get_password(service, username) + if key_hex is None: + return None + try: + return bytes.fromhex(key_hex) + except ValueError: + return None + + +import json