Add utility modules: encryption, file_utils, git_utils, path_utils
This commit is contained in:
169
confsync/utils/encryption.py
Normal file
169
confsync/utils/encryption.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""Encryption utilities for secure storage and sync operations."""
|
||||
|
||||
import os
|
||||
import base64
|
||||
import getpass
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
from cryptography.fernet import Fernet
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
|
||||
@dataclass
|
||||
class EncryptedData:
|
||||
"""Represents encrypted data."""
|
||||
|
||||
ciphertext: bytes
|
||||
salt: bytes
|
||||
nonce: Optional[bytes] = None
|
||||
|
||||
|
||||
class EncryptionManager:
|
||||
"""Manages encryption and decryption of configuration files."""
|
||||
|
||||
def __init__(self, key_file: Optional[str] = None, passphrase: Optional[str] = None):
|
||||
self.key_file = key_file
|
||||
self.passphrase = passphrase
|
||||
self._fernet: Optional[Fernet] = None
|
||||
self._key: Optional[bytes] = None
|
||||
|
||||
def _derive_key(self, passphrase: str, salt: bytes) -> bytes:
|
||||
"""Derive encryption key from passphrase using PBKDF2."""
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=480000,
|
||||
backend=default_backend()
|
||||
)
|
||||
return base64.urlsafe_b64encode(kdf.derive(passphrase.encode()))
|
||||
|
||||
def _get_fernet(self) -> Fernet:
|
||||
"""Get or create Fernet instance."""
|
||||
if self._fernet is None:
|
||||
salt, passphrase = self._load_or_create_key()
|
||||
self._key = self._derive_key(passphrase, salt)
|
||||
self._fernet = Fernet(self._key)
|
||||
return self._fernet
|
||||
|
||||
def _load_or_create_key(self) -> Tuple[bytes, str]:
|
||||
"""Load existing key or create new one."""
|
||||
passphrase = self.passphrase
|
||||
if passphrase is None:
|
||||
passphrase = self._get_passphrase()
|
||||
|
||||
if self.key_file and Path(self.key_file).exists():
|
||||
with open(self.key_file, 'rb') as f:
|
||||
salt = f.read(16)
|
||||
return salt, passphrase
|
||||
|
||||
salt = os.urandom(16)
|
||||
if self.key_file:
|
||||
Path(self.key_file).parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(self.key_file, 'wb') as f:
|
||||
f.write(salt)
|
||||
|
||||
return salt, passphrase
|
||||
|
||||
def _get_passphrase(self) -> str:
|
||||
"""Get passphrase from user input."""
|
||||
if self.passphrase:
|
||||
return self.passphrase
|
||||
return getpass.getpass("Enter encryption passphrase: ")
|
||||
|
||||
def set_passphrase(self, passphrase: str) -> None:
|
||||
"""Set the encryption passphrase."""
|
||||
self.passphrase = passphrase
|
||||
self._fernet = None
|
||||
self._key = None
|
||||
|
||||
def encrypt(self, data: str) -> EncryptedData:
|
||||
"""Encrypt string data."""
|
||||
salt, passphrase = self._load_or_create_key()
|
||||
key = self._derive_key(passphrase, salt)
|
||||
fernet = Fernet(key)
|
||||
ciphertext = fernet.encrypt(data.encode())
|
||||
return EncryptedData(ciphertext=ciphertext, salt=salt)
|
||||
|
||||
def decrypt(self, encrypted_data: EncryptedData, passphrase: Optional[str] = None) -> str:
|
||||
"""Decrypt encrypted data."""
|
||||
if passphrase is None:
|
||||
passphrase = self._get_passphrase()
|
||||
else:
|
||||
self.passphrase = passphrase
|
||||
|
||||
key = self._derive_key(passphrase, encrypted_data.salt)
|
||||
fernet = Fernet(key)
|
||||
plaintext = fernet.decrypt(encrypted_data.ciphertext)
|
||||
return plaintext.decode()
|
||||
|
||||
def encrypt_file(self, input_path: str, output_path: Optional[str] = None) -> str:
|
||||
"""Encrypt a file and return the path to encrypted file."""
|
||||
with open(input_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
encrypted = self.encrypt(content)
|
||||
|
||||
output = output_path or f"{input_path}.enc"
|
||||
with open(output, 'wb') as f:
|
||||
f.write(encrypted.salt + encrypted.ciphertext)
|
||||
|
||||
return output
|
||||
|
||||
def decrypt_file(self, input_path: str, output_path: Optional[str] = None) -> str:
|
||||
"""Decrypt a file and return the path to decrypted file."""
|
||||
with open(input_path, 'rb') as f:
|
||||
file_content = f.read()
|
||||
|
||||
salt = file_content[:16]
|
||||
ciphertext = file_content[16:]
|
||||
|
||||
encrypted_data = EncryptedData(ciphertext=ciphertext, salt=salt)
|
||||
decrypted = self.decrypt(encrypted_data)
|
||||
|
||||
output = output_path or input_path.rsplit('.', 1)[0] if input_path.endswith('.enc') else input_path
|
||||
with open(output, 'w', encoding='utf-8') as f:
|
||||
f.write(decrypted)
|
||||
|
||||
return output
|
||||
|
||||
def encrypt_manifest(self, manifest_data: str) -> str:
|
||||
"""Encrypt manifest data."""
|
||||
encrypted = self.encrypt(manifest_data)
|
||||
return base64.b64encode(encrypted.salt + encrypted.ciphertext).decode()
|
||||
|
||||
def decrypt_manifest(self, encrypted_data: str) -> str:
|
||||
"""Decrypt manifest data."""
|
||||
raw_data = base64.b64decode(encrypted_data.encode())
|
||||
salt = raw_data[:16]
|
||||
ciphertext = raw_data[16:]
|
||||
|
||||
encrypted = EncryptedData(ciphertext=ciphertext, salt=salt)
|
||||
return self.decrypt(encrypted)
|
||||
|
||||
def generate_key_file(self, key_path: str, passphrase: Optional[str] = None) -> None:
|
||||
"""Generate a new key file."""
|
||||
salt = os.urandom(16)
|
||||
passwd = passphrase or self._get_passphrase()
|
||||
key = self._derive_key(passwd, salt)
|
||||
|
||||
Path(key_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(key_path, 'wb') as f:
|
||||
f.write(salt + key)
|
||||
|
||||
def verify_key(self, key_path: str, passphrase: str) -> bool:
|
||||
"""Verify if a passphrase matches a key file."""
|
||||
if not Path(key_path).exists():
|
||||
return False
|
||||
|
||||
with open(key_path, 'rb') as f:
|
||||
file_content = f.read()
|
||||
|
||||
salt = file_content[:16]
|
||||
stored_key = file_content[16:]
|
||||
|
||||
computed_key = self._derive_key(passphrase, salt)
|
||||
return computed_key == stored_key
|
||||
Reference in New Issue
Block a user