diff --git a/confsync/tests/test_core.py b/confsync/tests/test_core.py new file mode 100644 index 0000000..574008a --- /dev/null +++ b/confsync/tests/test_core.py @@ -0,0 +1,280 @@ +"""Tests for core utilities and encryption.""" + +import pytest +import tempfile +import os +from pathlib import Path + +from confsync.utils.file_utils import ( + calculate_file_hash, + read_file_safe, + write_file_safe, + find_files_matching, + get_file_size, +) +from confsync.utils.path_utils import ( + expand_path, + normalize_path, + get_home_directory, + get_xdg_config_dir, + is_subpath, +) +from confsync.utils.encryption import EncryptionManager, EncryptedData + + +class TestFileUtils: + """Tests for file utilities.""" + + def test_calculate_hash(self): + """Test hash calculation.""" + content1 = "test content" + content2 = "test content" + + hash1 = calculate_file_hash(content1) + hash2 = calculate_file_hash(content2) + + assert hash1 == hash2 + assert len(hash1) == 64 + + def test_calculate_hash_different(self): + """Test hash calculation for different content.""" + hash1 = calculate_file_hash("content 1") + hash2 = calculate_file_hash("content 2") + + assert hash1 != hash2 + + def test_read_file_safe_success(self): + """Test reading file successfully.""" + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write("test content") + temp_path = f.name + + try: + content = read_file_safe(temp_path) + assert content == "test content" + finally: + os.unlink(temp_path) + + def test_read_file_safe_nonexistent(self): + """Test reading nonexistent file.""" + content = read_file_safe("/nonexistent/path/file.txt") + assert content is None + + def test_write_file_safe_success(self): + """Test writing file successfully.""" + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "test.txt") + success = write_file_safe("test content", path) + + assert success + assert os.path.exists(path) + with open(path, 'r') as f: + assert f.read() == "test content" + + def test_find_files_matching(self): + """Test finding files by pattern.""" + with tempfile.TemporaryDirectory() as tmpdir: + Path(os.path.join(tmpdir, "file1.json")).touch() + Path(os.path.join(tmpdir, "file2.json")).touch() + Path(os.path.join(tmpdir, "file3.yaml")).touch() + + json_files = find_files_matching(tmpdir, ["*.json"]) + + assert len(json_files) == 2 + assert all(f.endswith('.json') for f in json_files) + + def test_get_file_size(self): + """Test getting file size.""" + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write("1234567890") + temp_path = f.name + + try: + size = get_file_size(temp_path) + assert size == 10 + finally: + os.unlink(temp_path) + + +class TestPathUtils: + """Tests for path utilities.""" + + def test_expand_path_tilde(self): + """Test expanding home directory.""" + expanded = expand_path("~/config") + assert expanded.startswith("/") + assert "config" in expanded + + def test_expand_path_env(self): + """Test expanding environment variables.""" + os.environ["TEST_VAR"] = "test_value" + expanded = expand_path("$TEST_VAR/path") + assert "test_value" in expanded + + def test_normalize_path(self): + """Test normalizing paths.""" + path = normalize_path("~/./config") + assert path.startswith("/") + + def test_get_home_directory(self): + """Test getting home directory.""" + home = get_home_directory() + assert home.startswith("/") + assert os.path.exists(home) + + def test_get_xdg_config_dir(self): + """Test getting XDG config directory.""" + xdg = get_xdg_config_dir() + assert xdg.startswith("/") + + def test_is_subpath(self): + """Test subpath detection.""" + assert is_subpath("/home/user/config", "/home/user") + assert is_subpath("/home/user/.config/file", "/home/user/.config") + + +class TestEncryption: + """Tests for encryption utilities.""" + + def test_encrypt_decrypt(self): + """Test encryption and decryption.""" + enc_manager = EncryptionManager(passphrase="test_passphrase") + + original = "sensitive data" + + encrypted = enc_manager.encrypt(original) + assert isinstance(encrypted, EncryptedData) + assert len(encrypted.salt) == 16 + + decrypted = enc_manager.decrypt(encrypted, passphrase="test_passphrase") + assert decrypted == original + + def test_encrypt_manifest_roundtrip(self): + """Test manifest encryption roundtrip.""" + enc_manager = EncryptionManager(passphrase="test_passphrase") + + manifest_data = '{"version": "1.0.0", "entries": []}' + + encrypted = enc_manager.encrypt_manifest(manifest_data) + assert isinstance(encrypted, str) + + decrypted = enc_manager.decrypt_manifest(encrypted) + assert decrypted == manifest_data + + def test_encrypt_file(self): + """Test file encryption.""" + enc_manager = EncryptionManager(passphrase="test_passphrase") + + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write("sensitive content") + input_path = f.name + + try: + output_path = enc_manager.encrypt_file(input_path) + + assert os.path.exists(output_path) + assert output_path != input_path + + with open(output_path, 'rb') as f: + content = f.read() + assert len(content) > 16 + + os.unlink(output_path) + finally: + os.unlink(input_path) + + def test_verify_key(self): + """Test key verification.""" + with tempfile.TemporaryDirectory() as tmpdir: + key_path = os.path.join(tmpdir, "test.key") + + enc_manager = EncryptionManager(key_file=key_path, passphrase="test_passphrase") + + enc_manager.generate_key_file(key_path, "test_passphrase") + + assert os.path.exists(key_path) + assert enc_manager.verify_key(key_path, "test_passphrase") is True + assert enc_manager.verify_key(key_path, "wrong_passphrase") is False + + +class TestSyncMetadata: + """Tests for sync metadata model.""" + + def test_sync_metadata_serialization(self): + """Test SyncMetadata serialization.""" + from confsync.models.config_models import SyncMetadata, SyncOperation + + metadata = SyncMetadata( + operation=SyncOperation.PUSH, + source="/local", + destination="/remote", + ) + + data = metadata.to_dict() + + assert data["operation"] == "push" + assert data["source"] == "/local" + assert data["destination"] == "/remote" + + def test_sync_metadata_from_dict(self): + """Test SyncMetadata deserialization.""" + from confsync.models.config_models import SyncMetadata, SyncOperation + + data = { + "operation": "pull", + "source": "/remote", + "destination": "/local", + "timestamp": "2024-01-01T00:00:00", + "committed_files": [], + "conflicts": [], + "merged_files": [], + "status": "success", + "error_message": None, + } + + metadata = SyncMetadata.from_dict(data) + + assert metadata.operation == SyncOperation.PULL + assert metadata.status == "success" + + +class TestHistoryEntry: + """Tests for history entry model.""" + + def test_history_entry_serialization(self): + """Test HistoryEntry serialization.""" + from confsync.models.config_models import HistoryEntry + + entry = HistoryEntry( + operation="sync", + files_changed=[".vimrc", ".bashrc"], + commit_message="Update configs", + ) + + data = entry.to_dict() + + assert data["operation"] == "sync" + assert len(data["files_changed"]) == 2 + + def test_history_entry_from_dict(self): + """Test HistoryEntry deserialization.""" + from confsync.models.config_models import HistoryEntry + + data = { + "id": "test-id", + "timestamp": "2024-01-01T00:00:00", + "operation": "rollback", + "files_changed": [".vimrc"], + "commit_message": "Rollback to previous", + "diff": None, + "tags": [], + } + + entry = HistoryEntry.from_dict(data) + + assert entry.id == "test-id" + assert entry.operation == "rollback" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])