import http.client import json import threading from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any from ..db import get_database class SyncProtocol: def __init__(self, db_path: str | None = None, port: int = 8765): self.port = port self.db = get_database(db_path) self._server: HTTPServer | None = None self._server_thread: threading.Thread | None = None self._running = False def start_server(self): if self._running: return self._running = True self._server_thread = threading.Thread(target=self._run_server, daemon=True) self._server_thread.start() def _run_server(self): class Handler(SyncRequestHandler): protocol = self self._server = HTTPServer(("0.0.0.0", self.port), Handler) while self._running: try: self._server.timeout = 1.0 self._server.handle_request() except Exception: pass def stop_server(self): self._running = False if self._server: try: self._server.shutdown() except Exception: pass self._server = None if self._server_thread: self._server_thread.join(timeout=2.0) self._server_thread = None def fetch_snippets(self, peer_address: str, peer_port: int, since: str | None = None) -> list[dict[str, Any]]: try: conn = http.client.HTTPConnection(peer_address, peer_port, timeout=30) path = "/snippets" if since: path += f"?since={since}" conn.request("GET", path) response = conn.getresponse() if response.status == 200: data = json.loads(response.read().decode()) return data.get("snippets", []) conn.close() except Exception: pass return [] def push_snippets(self, peer_address: str, peer_port: int, snippets: list[dict[str, Any]]) -> bool: try: conn = http.client.HTTPConnection(peer_address, peer_port, timeout=30) headers = {"Content-Type": "application/json"} body = json.dumps({"snippets": snippets}) conn.request("POST", "/snippets", body, headers) response = conn.getresponse() conn.close() return response.status == 200 except Exception: return False def sync_with_peer(self, peer: dict[str, Any]) -> dict[str, Any]: peer_id = peer["peer_id"] addresses = peer.get("addresses", []) port = peer.get("port", self.port) if not addresses: return {"status": "error", "message": "No peer address available"} peer_address = addresses[0] meta = self.db.get_sync_meta(peer_id) since = meta["last_sync"] if meta else None local_snippets = self.db.get_all_snippets_for_sync(since) remote_snippets = self.fetch_snippets(peer_address, port, since) merged = 0 for snippet in remote_snippets: self.db.upsert_snippet(snippet) merged += 1 if local_snippets: self.push_snippets(peer_address, port, local_snippets) self.db.update_sync_meta(peer_id, peer.get("peer_name"), peer_address, port) return { "status": "success", "merged": merged, "pushed": len(local_snippets), "peer": peer_id, } class SyncRequestHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass def do_GET(self): if self.path.startswith("/snippets"): db = self.protocol.db since = None if "?" in self.path: query = self.path.split("?", 1)[1] for param in query.split("&"): if param.startswith("since="): since = param.split("=", 1)[1] snippets = db.get_all_snippets_for_sync(since) self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"snippets": snippets}).encode()) else: self.send_response(404) self.end_headers() def do_POST(self): if self.path.startswith("/snippets"): try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length).decode() data = json.loads(body) snippets = data.get("snippets", []) db = self.protocol.db for snippet in snippets: db.upsert_snippet(snippet) self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps({"status": "ok"}).encode()) except Exception: self.send_response(400) self.end_headers() else: self.send_response(404) self.end_headers()