Files
snippet-manager/snip/sync/protocol.py
7000pctAUTO f334a9ba60
Some checks failed
CI / test (push) Has been cancelled
fix: resolve CI test failures - API compatibility fixes
2026-03-22 12:11:47 +00:00

144 lines
5.1 KiB
Python

import http.client
import json
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from ..db import get_database
class SyncProtocol:
def __init__(self, db_path: str | None = None, port: int = 8765):
self.port = port
self.db = get_database(db_path)
self._server: HTTPServer | None = None
self._server_thread: threading.Thread | None = None
self._running = False
def start_server(self):
if self._running:
return
self._running = True
self._server_thread = threading.Thread(target=self._run_server, daemon=True)
self._server_thread.start()
def _run_server(self):
class Handler(SyncRequestHandler):
protocol = self
self._server = HTTPServer(("0.0.0.0", self.port), Handler)
while self._running:
try:
self._server.timeout = 1.0
self._server.handle_request()
except Exception:
pass
def stop_server(self):
self._running = False
if self._server:
try:
self._server.shutdown()
except Exception:
pass
self._server = None
if self._server_thread:
self._server_thread.join(timeout=2.0)
self._server_thread = None
def fetch_snippets(self, peer_address: str, peer_port: int, since: str | None = None) -> list[dict[str, Any]]:
try:
conn = http.client.HTTPConnection(peer_address, peer_port, timeout=30)
path = "/snippets"
if since:
path += f"?since={since}"
conn.request("GET", path)
response = conn.getresponse()
if response.status == 200:
data = json.loads(response.read().decode())
return data.get("snippets", [])
conn.close()
except Exception:
pass
return []
def push_snippets(self, peer_address: str, peer_port: int, snippets: list[dict[str, Any]]) -> bool:
try:
conn = http.client.HTTPConnection(peer_address, peer_port, timeout=30)
headers = {"Content-Type": "application/json"}
body = json.dumps({"snippets": snippets})
conn.request("POST", "/snippets", body, headers)
response = conn.getresponse()
conn.close()
return response.status == 200
except Exception:
return False
def sync_with_peer(self, peer: dict[str, Any]) -> dict[str, Any]:
peer_id = peer["peer_id"]
addresses = peer.get("addresses", [])
port = peer.get("port", self.port)
if not addresses:
return {"status": "error", "message": "No peer address available"}
peer_address = addresses[0]
meta = self.db.get_sync_meta(peer_id)
since = meta["last_sync"] if meta else None
local_snippets = self.db.get_all_snippets_for_sync(since)
remote_snippets = self.fetch_snippets(peer_address, port, since)
merged = 0
for snippet in remote_snippets:
self.db.upsert_snippet(snippet)
merged += 1
if local_snippets:
self.push_snippets(peer_address, port, local_snippets)
self.db.update_sync_meta(peer_id, peer.get("peer_name"), peer_address, port)
return {
"status": "success",
"merged": merged,
"pushed": len(local_snippets),
"peer": peer_id,
}
class SyncRequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
def do_GET(self):
if self.path.startswith("/snippets"):
db = self.protocol.db
since = None
if "?" in self.path:
query = self.path.split("?", 1)[1]
for param in query.split("&"):
if param.startswith("since="):
since = param.split("=", 1)[1]
snippets = db.get_all_snippets_for_sync(since)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"snippets": snippets}).encode())
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
if self.path.startswith("/snippets"):
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode()
data = json.loads(body)
snippets = data.get("snippets", [])
db = self.protocol.db
for snippet in snippets:
db.upsert_snippet(snippet)
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode())
except Exception:
self.send_response(400)
self.end_headers()
else:
self.send_response(404)
self.end_headers()