121 lines
3.3 KiB
Python
121 lines
3.3 KiB
Python
"""Tests for P2P sync functionality."""
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
from snip.db.database import Database
|
|
from snip.sync.protocol import SyncProtocol
|
|
|
|
|
|
@pytest.fixture
|
|
def db():
|
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
db_path = f.name
|
|
database = Database(db_path)
|
|
database.init_db()
|
|
yield database
|
|
os.unlink(db_path)
|
|
|
|
|
|
@pytest.fixture
|
|
def sync_protocol(db):
|
|
return SyncProtocol(db, port=18765)
|
|
|
|
|
|
def test_sync_protocol_init(sync_protocol):
|
|
"""Test sync protocol initialization."""
|
|
assert sync_protocol.port == 18765
|
|
assert sync_protocol.server is None
|
|
|
|
|
|
def test_start_stop_server(sync_protocol):
|
|
"""Test starting and stopping the sync server."""
|
|
sync_protocol.start_server()
|
|
assert sync_protocol.server is not None
|
|
|
|
sync_protocol.stop_server()
|
|
assert sync_protocol.server is None
|
|
|
|
|
|
def test_sync_with_peer_no_connection(sync_protocol, db):
|
|
"""Test sync with unreachable peer."""
|
|
peer = {"host": "127.0.0.1", "port": 9999, "peer_id": "test"}
|
|
result = sync_protocol.sync_with_peer(peer)
|
|
assert result["status"] == "error"
|
|
|
|
|
|
def test_get_all_snippets_for_sync(db):
|
|
"""Test getting all snippets for sync."""
|
|
db.add_snippet(title="Test 1", code="code1")
|
|
db.add_snippet(title="Test 2", code="code2")
|
|
|
|
snippets = db.export_all()
|
|
assert len(snippets) == 2
|
|
|
|
|
|
def test_get_snippets_since_timestamp(db):
|
|
"""Test getting snippets updated since timestamp."""
|
|
from datetime import datetime, timedelta
|
|
|
|
db.add_snippet(title="Old", code="old code")
|
|
old_time = datetime.utcnow().isoformat()
|
|
|
|
db.add_snippet(title="New", code="new code")
|
|
|
|
snippets = db.list_snippets(limit=100)
|
|
old_snippets = [s for s in snippets if s["updated_at"] <= old_time]
|
|
new_snippets = [s for s in snippets if s["updated_at"] > old_time]
|
|
|
|
assert len(old_snippets) >= 1
|
|
assert len(new_snippets) >= 1
|
|
|
|
|
|
def test_upsert_snippet(db):
|
|
"""Test upserting a snippet."""
|
|
snippet_id = db.add_snippet(title="Original", code="original")
|
|
assert snippet_id > 0
|
|
|
|
result = db.import_snippet({"title": "Original", "code": "updated"}, strategy="replace")
|
|
assert result == -1
|
|
|
|
snippet = db.get_snippet(snippet_id)
|
|
assert snippet["code"] == "updated"
|
|
|
|
|
|
def test_update_sync_meta(db):
|
|
"""Test updating sync metadata."""
|
|
from datetime import datetime
|
|
|
|
db.add_peer("peer1", "localhost", 8765)
|
|
db.update_sync_meta("peer1", datetime.utcnow().isoformat())
|
|
|
|
peers = db.list_sync_peers()
|
|
assert len(peers) == 1
|
|
assert peers[0]["last_sync"] is not None
|
|
|
|
|
|
def test_discovery_service_init():
|
|
"""Test discovery service initialization."""
|
|
from snip.sync.discovery import DiscoveryService
|
|
|
|
discovery = DiscoveryService(port=18765)
|
|
assert discovery.port == 18765
|
|
|
|
|
|
def test_peer_cache_persistence(tmp_path):
|
|
"""Test peer cache persistence."""
|
|
from snip.sync.discovery import DiscoveryService
|
|
import json
|
|
|
|
cache_file = tmp_path / "peers.json"
|
|
discovery = DiscoveryService()
|
|
discovery._peer_cache_file = cache_file
|
|
|
|
peers = [{"peer_id": "test", "host": "localhost", "port": 8765}]
|
|
discovery.save_peer_cache(peers)
|
|
|
|
loaded = discovery.load_peer_cache()
|
|
assert len(loaded) == 1
|
|
assert loaded[0]["peer_id"] == "test" |