Add env_pro core modules
This commit is contained in:
135
app/env_pro/core/encryption.py
Normal file
135
app/env_pro/core/encryption.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Encryption module for env-pro using AES-256-GCM."""
|
||||
|
||||
import os
|
||||
import base64
|
||||
import hashlib
|
||||
from typing import Optional
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import keyring
|
||||
|
||||
|
||||
class EncryptionError(Exception):
|
||||
"""Base exception for encryption errors."""
|
||||
pass
|
||||
|
||||
|
||||
def generate_salt() -> bytes:
|
||||
"""Generate a random salt for key derivation."""
|
||||
return os.urandom(16)
|
||||
|
||||
|
||||
def generate_nonce() -> bytes:
|
||||
"""Generate a random nonce for AES-GCM."""
|
||||
return os.urandom(12)
|
||||
|
||||
|
||||
def derive_key(passphrase: str, salt: bytes, iterations: int = 100000) -> bytes:
|
||||
"""Derive an encryption key from a passphrase using PBKDF2."""
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=iterations,
|
||||
backend=default_backend()
|
||||
)
|
||||
return kdf.derive(passphrase.encode())
|
||||
|
||||
|
||||
def generate_key() -> bytes:
|
||||
"""Generate a random encryption key."""
|
||||
return os.urandom(32)
|
||||
|
||||
|
||||
def verify_key(key: bytes) -> bool:
|
||||
"""Verify that a key is valid (32 bytes)."""
|
||||
return isinstance(key, bytes) and len(key) == 32
|
||||
|
||||
|
||||
def encrypt_value(value: str, key: bytes) -> str:
|
||||
"""Encrypt a single value using AES-256-GCM."""
|
||||
if not verify_key(key):
|
||||
raise EncryptionError("Invalid encryption key")
|
||||
|
||||
nonce = generate_nonce()
|
||||
aesgcm = AESGCM(key)
|
||||
ciphertext = aesgcm.encrypt(nonce, value.encode(), None)
|
||||
|
||||
result = {
|
||||
"nonce": base64.b64encode(nonce).decode(),
|
||||
"ciphertext": base64.b64encode(ciphertext).decode()
|
||||
}
|
||||
return base64.b64encode(json.dumps(result).encode()).decode()
|
||||
|
||||
|
||||
def decrypt_value(encrypted: str, key: bytes) -> str:
|
||||
"""Decrypt a single value using AES-256-GCM."""
|
||||
if not verify_key(key):
|
||||
raise EncryptionError("Invalid encryption key")
|
||||
|
||||
try:
|
||||
data = json.loads(base64.b64decode(encrypted))
|
||||
nonce = base64.b64decode(data["nonce"])
|
||||
ciphertext = base64.b64decode(data["ciphertext"])
|
||||
|
||||
aesgcm = AESGCM(key)
|
||||
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
|
||||
return plaintext.decode()
|
||||
except (KeyError, ValueError, base64.binascii.Error) as e:
|
||||
raise EncryptionError(f"Failed to decrypt value: {e}")
|
||||
|
||||
|
||||
def encrypt_file(content: str, key: bytes) -> str:
|
||||
"""Encrypt an entire file content."""
|
||||
salt = generate_salt()
|
||||
nonce = generate_nonce()
|
||||
|
||||
derived_key = derive_key(key.hex(), salt)
|
||||
aesgcm = AESGCM(derived_key)
|
||||
ciphertext = aesgcm.encrypt(nonce, content.encode(), None)
|
||||
|
||||
result = {
|
||||
"salt": base64.b64encode(salt).decode(),
|
||||
"nonce": base64.b64encode(nonce).decode(),
|
||||
"ciphertext": base64.b64encode(ciphertext).decode()
|
||||
}
|
||||
return yaml.dump(result)
|
||||
|
||||
|
||||
def decrypt_file(encrypted_content: str, key: bytes) -> str:
|
||||
"""Decrypt an entire file content."""
|
||||
try:
|
||||
data = yaml.safe_load(encrypted_content)
|
||||
salt = base64.b64decode(data["salt"])
|
||||
nonce = base64.b64decode(data["nonce"])
|
||||
ciphertext = base64.b64decode(data["ciphertext"])
|
||||
|
||||
derived_key = derive_key(key.hex(), salt)
|
||||
aesgcm = AESGCM(derived_key)
|
||||
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
|
||||
return plaintext.decode()
|
||||
except (KeyError, ValueError, base64.binascii.Error) as e:
|
||||
raise EncryptionError(f"Failed to decrypt file: {e}")
|
||||
|
||||
|
||||
def store_key_in_keyring(key: bytes, service: str = "env-pro", username: str = "default"):
|
||||
"""Store the encryption key in the system keyring."""
|
||||
if not verify_key(key):
|
||||
raise EncryptionError("Invalid encryption key")
|
||||
keyring.set_password(service, username, key.hex())
|
||||
|
||||
|
||||
def get_key_from_keyring(service: str = "env-pro", username: str = "default") -> Optional[bytes]:
|
||||
"""Retrieve the encryption key from the system keyring."""
|
||||
key_hex = keyring.get_password(service, username)
|
||||
if key_hex is None:
|
||||
return None
|
||||
try:
|
||||
return bytes.fromhex(key_hex)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
import json
|
||||
Reference in New Issue
Block a user