Add test files: test_core, test_detect, test_manifest
This commit is contained in:
280
confsync/tests/test_core.py
Normal file
280
confsync/tests/test_core.py
Normal file
@@ -0,0 +1,280 @@
|
||||
"""Tests for core utilities and encryption."""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from confsync.utils.file_utils import (
|
||||
calculate_file_hash,
|
||||
read_file_safe,
|
||||
write_file_safe,
|
||||
find_files_matching,
|
||||
get_file_size,
|
||||
)
|
||||
from confsync.utils.path_utils import (
|
||||
expand_path,
|
||||
normalize_path,
|
||||
get_home_directory,
|
||||
get_xdg_config_dir,
|
||||
is_subpath,
|
||||
)
|
||||
from confsync.utils.encryption import EncryptionManager, EncryptedData
|
||||
|
||||
|
||||
class TestFileUtils:
|
||||
"""Tests for file utilities."""
|
||||
|
||||
def test_calculate_hash(self):
|
||||
"""Test hash calculation."""
|
||||
content1 = "test content"
|
||||
content2 = "test content"
|
||||
|
||||
hash1 = calculate_file_hash(content1)
|
||||
hash2 = calculate_file_hash(content2)
|
||||
|
||||
assert hash1 == hash2
|
||||
assert len(hash1) == 64
|
||||
|
||||
def test_calculate_hash_different(self):
|
||||
"""Test hash calculation for different content."""
|
||||
hash1 = calculate_file_hash("content 1")
|
||||
hash2 = calculate_file_hash("content 2")
|
||||
|
||||
assert hash1 != hash2
|
||||
|
||||
def test_read_file_safe_success(self):
|
||||
"""Test reading file successfully."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
|
||||
f.write("test content")
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
content = read_file_safe(temp_path)
|
||||
assert content == "test content"
|
||||
finally:
|
||||
os.unlink(temp_path)
|
||||
|
||||
def test_read_file_safe_nonexistent(self):
|
||||
"""Test reading nonexistent file."""
|
||||
content = read_file_safe("/nonexistent/path/file.txt")
|
||||
assert content is None
|
||||
|
||||
def test_write_file_safe_success(self):
|
||||
"""Test writing file successfully."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = os.path.join(tmpdir, "test.txt")
|
||||
success = write_file_safe("test content", path)
|
||||
|
||||
assert success
|
||||
assert os.path.exists(path)
|
||||
with open(path, 'r') as f:
|
||||
assert f.read() == "test content"
|
||||
|
||||
def test_find_files_matching(self):
|
||||
"""Test finding files by pattern."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
Path(os.path.join(tmpdir, "file1.json")).touch()
|
||||
Path(os.path.join(tmpdir, "file2.json")).touch()
|
||||
Path(os.path.join(tmpdir, "file3.yaml")).touch()
|
||||
|
||||
json_files = find_files_matching(tmpdir, ["*.json"])
|
||||
|
||||
assert len(json_files) == 2
|
||||
assert all(f.endswith('.json') for f in json_files)
|
||||
|
||||
def test_get_file_size(self):
|
||||
"""Test getting file size."""
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
|
||||
f.write("1234567890")
|
||||
temp_path = f.name
|
||||
|
||||
try:
|
||||
size = get_file_size(temp_path)
|
||||
assert size == 10
|
||||
finally:
|
||||
os.unlink(temp_path)
|
||||
|
||||
|
||||
class TestPathUtils:
|
||||
"""Tests for path utilities."""
|
||||
|
||||
def test_expand_path_tilde(self):
|
||||
"""Test expanding home directory."""
|
||||
expanded = expand_path("~/config")
|
||||
assert expanded.startswith("/")
|
||||
assert "config" in expanded
|
||||
|
||||
def test_expand_path_env(self):
|
||||
"""Test expanding environment variables."""
|
||||
os.environ["TEST_VAR"] = "test_value"
|
||||
expanded = expand_path("$TEST_VAR/path")
|
||||
assert "test_value" in expanded
|
||||
|
||||
def test_normalize_path(self):
|
||||
"""Test normalizing paths."""
|
||||
path = normalize_path("~/./config")
|
||||
assert path.startswith("/")
|
||||
|
||||
def test_get_home_directory(self):
|
||||
"""Test getting home directory."""
|
||||
home = get_home_directory()
|
||||
assert home.startswith("/")
|
||||
assert os.path.exists(home)
|
||||
|
||||
def test_get_xdg_config_dir(self):
|
||||
"""Test getting XDG config directory."""
|
||||
xdg = get_xdg_config_dir()
|
||||
assert xdg.startswith("/")
|
||||
|
||||
def test_is_subpath(self):
|
||||
"""Test subpath detection."""
|
||||
assert is_subpath("/home/user/config", "/home/user")
|
||||
assert is_subpath("/home/user/.config/file", "/home/user/.config")
|
||||
|
||||
|
||||
class TestEncryption:
|
||||
"""Tests for encryption utilities."""
|
||||
|
||||
def test_encrypt_decrypt(self):
|
||||
"""Test encryption and decryption."""
|
||||
enc_manager = EncryptionManager(passphrase="test_passphrase")
|
||||
|
||||
original = "sensitive data"
|
||||
|
||||
encrypted = enc_manager.encrypt(original)
|
||||
assert isinstance(encrypted, EncryptedData)
|
||||
assert len(encrypted.salt) == 16
|
||||
|
||||
decrypted = enc_manager.decrypt(encrypted, passphrase="test_passphrase")
|
||||
assert decrypted == original
|
||||
|
||||
def test_encrypt_manifest_roundtrip(self):
|
||||
"""Test manifest encryption roundtrip."""
|
||||
enc_manager = EncryptionManager(passphrase="test_passphrase")
|
||||
|
||||
manifest_data = '{"version": "1.0.0", "entries": []}'
|
||||
|
||||
encrypted = enc_manager.encrypt_manifest(manifest_data)
|
||||
assert isinstance(encrypted, str)
|
||||
|
||||
decrypted = enc_manager.decrypt_manifest(encrypted)
|
||||
assert decrypted == manifest_data
|
||||
|
||||
def test_encrypt_file(self):
|
||||
"""Test file encryption."""
|
||||
enc_manager = EncryptionManager(passphrase="test_passphrase")
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
|
||||
f.write("sensitive content")
|
||||
input_path = f.name
|
||||
|
||||
try:
|
||||
output_path = enc_manager.encrypt_file(input_path)
|
||||
|
||||
assert os.path.exists(output_path)
|
||||
assert output_path != input_path
|
||||
|
||||
with open(output_path, 'rb') as f:
|
||||
content = f.read()
|
||||
assert len(content) > 16
|
||||
|
||||
os.unlink(output_path)
|
||||
finally:
|
||||
os.unlink(input_path)
|
||||
|
||||
def test_verify_key(self):
|
||||
"""Test key verification."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
key_path = os.path.join(tmpdir, "test.key")
|
||||
|
||||
enc_manager = EncryptionManager(key_file=key_path, passphrase="test_passphrase")
|
||||
|
||||
enc_manager.generate_key_file(key_path, "test_passphrase")
|
||||
|
||||
assert os.path.exists(key_path)
|
||||
assert enc_manager.verify_key(key_path, "test_passphrase") is True
|
||||
assert enc_manager.verify_key(key_path, "wrong_passphrase") is False
|
||||
|
||||
|
||||
class TestSyncMetadata:
|
||||
"""Tests for sync metadata model."""
|
||||
|
||||
def test_sync_metadata_serialization(self):
|
||||
"""Test SyncMetadata serialization."""
|
||||
from confsync.models.config_models import SyncMetadata, SyncOperation
|
||||
|
||||
metadata = SyncMetadata(
|
||||
operation=SyncOperation.PUSH,
|
||||
source="/local",
|
||||
destination="/remote",
|
||||
)
|
||||
|
||||
data = metadata.to_dict()
|
||||
|
||||
assert data["operation"] == "push"
|
||||
assert data["source"] == "/local"
|
||||
assert data["destination"] == "/remote"
|
||||
|
||||
def test_sync_metadata_from_dict(self):
|
||||
"""Test SyncMetadata deserialization."""
|
||||
from confsync.models.config_models import SyncMetadata, SyncOperation
|
||||
|
||||
data = {
|
||||
"operation": "pull",
|
||||
"source": "/remote",
|
||||
"destination": "/local",
|
||||
"timestamp": "2024-01-01T00:00:00",
|
||||
"committed_files": [],
|
||||
"conflicts": [],
|
||||
"merged_files": [],
|
||||
"status": "success",
|
||||
"error_message": None,
|
||||
}
|
||||
|
||||
metadata = SyncMetadata.from_dict(data)
|
||||
|
||||
assert metadata.operation == SyncOperation.PULL
|
||||
assert metadata.status == "success"
|
||||
|
||||
|
||||
class TestHistoryEntry:
|
||||
"""Tests for history entry model."""
|
||||
|
||||
def test_history_entry_serialization(self):
|
||||
"""Test HistoryEntry serialization."""
|
||||
from confsync.models.config_models import HistoryEntry
|
||||
|
||||
entry = HistoryEntry(
|
||||
operation="sync",
|
||||
files_changed=[".vimrc", ".bashrc"],
|
||||
commit_message="Update configs",
|
||||
)
|
||||
|
||||
data = entry.to_dict()
|
||||
|
||||
assert data["operation"] == "sync"
|
||||
assert len(data["files_changed"]) == 2
|
||||
|
||||
def test_history_entry_from_dict(self):
|
||||
"""Test HistoryEntry deserialization."""
|
||||
from confsync.models.config_models import HistoryEntry
|
||||
|
||||
data = {
|
||||
"id": "test-id",
|
||||
"timestamp": "2024-01-01T00:00:00",
|
||||
"operation": "rollback",
|
||||
"files_changed": [".vimrc"],
|
||||
"commit_message": "Rollback to previous",
|
||||
"diff": None,
|
||||
"tags": [],
|
||||
}
|
||||
|
||||
entry = HistoryEntry.from_dict(data)
|
||||
|
||||
assert entry.id == "test-id"
|
||||
assert entry.operation == "rollback"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user