"""Click CLI commands for snippet management.""" import json import os import sys import tempfile from pathlib import Path from typing import Any import click from rich.console import Console from rich.syntax import Syntax from rich.table import Table from snip.crypto.service import CryptoService from snip.db.database import Database from snip.export.handlers import export_snippets, import_snippets from snip.search.engine import SearchEngine console = Console() db = Database() crypto_service = CryptoService() search_engine = SearchEngine(db) @click.group() @click.version_option(version="0.1.0") def cli(): """Snip - Local-First Code Snippet Manager.""" pass @cli.command() def init(): """Initialize the snippet database.""" db.init_db() console.print("[green]Database initialized successfully![/green]") @cli.command() @click.option("--title", prompt="Title", help="Snippet title") @click.option("--code", prompt="Code", help="Snippet code") @click.option("--description", default="", help="Snippet description") @click.option("--language", default="", help="Programming language") @click.option("--tag", multiple=True, help="Tags to add") @click.option("--encrypt", is_flag=True, help="Encrypt the snippet") def add(title: str, code: str, description: str, language: str, tag: tuple, encrypt: bool): """Add a new snippet.""" tags = list(tag) is_encrypted = False if encrypt: password = click.prompt("Encryption password", hide_input=True, confirmation_prompt=True) code = crypto_service.encrypt(code, password) is_encrypted = True snippet_id = db.add_snippet( title=title, code=code, description=description, language=language, tags=tags, is_encrypted=is_encrypted, ) console.print(f"[green]Snippet added with ID {snippet_id}[/green]") @cli.command() @click.argument("snippet_id", type=int) @click.option("--decrypt", help="Decryption password", default=None, hide_input=True) def get(snippet_id: int, decrypt: str | None): """Get a snippet by ID.""" snippet = db.get_snippet(snippet_id) if not snippet: console.print(f"[red]Snippet {snippet_id} not found[/red]") return code = snippet["code"] if snippet["is_encrypted"]: if not decrypt: decrypt = click.prompt("Decryption password", hide_input=True) try: code = crypto_service.decrypt(code, decrypt) except Exception as e: console.print(f"[red]Decryption failed: {e}[/red]") return language = snippet["language"] or "text" syntax = Syntax(code, language, theme="monokai", line_numbers=True) console.print(f"\n[bold]{snippet['title']}[/bold]") if snippet["description"]: console.print(f"[dim]{snippet['description']}[/dim]") console.print(f"[dim]Language: {language} | Tags: {snippet['tags']}[/dim]\n") console.print(syntax) @cli.command() @click.option("--limit", default=50, help="Maximum number of snippets") @click.option("--offset", default=0, help="Offset for pagination") @click.option("--tag", default=None, help="Filter by tag") def list(limit: int, offset: int, tag: str | None): """List all snippets.""" snippets = db.list_snippets(limit=limit, offset=offset, tag=tag) if not snippets: console.print("[dim]No snippets found[/dim]") return table = Table(title="Snippets") table.add_column("ID", style="cyan") table.add_column("Title", style="green") table.add_column("Language", style="magenta") table.add_column("Tags", style="yellow") table.add_column("Updated", style="dim") for s in snippets: tags_str = json.loads(s.get("tags", "[]")) if isinstance(s.get("tags"), str) else s.get("tags", []) table.add_row( str(s["id"]), s["title"], s["language"] or "-", ", ".join(tags_str) if tags_str else "-", s["updated_at"][:10], ) console.print(table) @cli.command() @click.argument("snippet_id", type=int) def edit(snippet_id: int): """Edit a snippet in your default editor.""" snippet = db.get_snippet(snippet_id) if not snippet: console.print(f"[red]Snippet {snippet_id} not found[/red]") return with tempfile.NamedTemporaryFile(mode="w", suffix=f".{snippet['language'] or 'txt'}", delete=False) as f: f.write(f"# Title: {snippet['title']}\n") f.write(f"# Description: {snippet['description']}\n") f.write(f"# Language: {snippet['language']}\n") f.write(f"# Tags: {snippet['tags']}\n") f.write("\n") f.write(snippet["code"]) temp_path = f.name try: click.edit(filename=temp_path) with open(temp_path, "r") as f: lines = f.readlines() title = snippet["title"] description = snippet["description"] language = snippet["language"] tags = json.loads(snippet["tags"]) if isinstance(snippet["tags"], str) else snippet.get("tags", []) code_lines = [] in_code = False for line in lines: if line.startswith("# Title: "): title = line[9:].strip() elif line.startswith("# Description: "): description = line[15:].strip() elif line.startswith("# Language: "): language = line[13:].strip() elif line.startswith("# Tags: "): tags_str = line[8:].strip() if tags_str.startswith("["): tags = json.loads(tags_str) else: tags = [t.strip() for t in tags_str.split(",")] elif line.startswith("#"): continue else: in_code = True code_lines.append(line) db.update_snippet( snippet_id, title=title, description=description, code="".join(code_lines), language=language, tags=tags, ) console.print(f"[green]Snippet {snippet_id} updated[/green]") finally: os.unlink(temp_path) @cli.command() @click.argument("snippet_id", type=int) def delete(snippet_id: int): """Delete a snippet.""" snippet = db.get_snippet(snippet_id) if not snippet: console.print(f"[red]Snippet {snippet_id} not found[/red]") return if click.confirm(f"Delete snippet '{snippet['title']}'?"): db.delete_snippet(snippet_id) console.print(f"[green]Snippet {snippet_id} deleted[/green]") @cli.command() @click.argument("query") @click.option("--limit", default=50, help="Maximum results") @click.option("--language", default=None, help="Filter by language") @click.option("--tag", default=None, help="Filter by tag") def search(query: str, limit: int, language: str | None, tag: str | None): """Search snippets using full-text search.""" results = search_engine.search(query, limit=limit, language=language, tag=tag) if not results: console.print("[dim]No results found[/dim]") return table = Table(title=f"Search Results ({len(results)})") table.add_column("ID", style="cyan") table.add_column("Title", style="green") table.add_column("Language", style="magenta") table.add_column("Match Score", style="yellow") for r in results: table.add_row( str(r["id"]), r["title"], r["language"] or "-", f"{r.get('rank', 0):.2f}", ) console.print(table) @cli.group() def tag(): """Manage tags.""" pass @tag.command(name="add") @click.argument("snippet_id", type=int) @click.argument("tag_name") def tag_add(snippet_id: int, tag_name: str): """Add a tag to a snippet.""" if db.add_tag(snippet_id, tag_name): console.print(f"[green]Tag '{tag_name}' added to snippet {snippet_id}[/green]") else: console.print(f"[red]Snippet {snippet_id} not found[/red]") @tag.command(name="remove") @click.argument("snippet_id", type=int) @click.argument("tag_name") def tag_remove(snippet_id: int, tag_name: str): """Remove a tag from a snippet.""" if db.remove_tag(snippet_id, tag_name): console.print(f"[green]Tag '{tag_name}' removed from snippet {snippet_id}[/green]") else: console.print(f"[red]Snippet {snippet_id} not found[/red]") @tag.command(name="list") def tag_list(): """List all tags.""" tags = db.list_tags() if not tags: console.print("[dim]No tags found[/dim]") return console.print("[bold]Tags:[/bold]") for t in tags: console.print(f" [cyan]{t}[/cyan]") @cli.group() def collection(): """Manage collections.""" pass @collection.command(name="create") @click.argument("name") @click.option("--description", default="", help="Collection description") def collection_create(name: str, description: str): """Create a new collection.""" collection_id = db.create_collection(name, description) console.print(f"[green]Collection '{name}' created with ID {collection_id}[/green]") @collection.command(name="list") def collection_list(): """List all collections.""" collections = db.list_collections() if not collections: console.print("[dim]No collections found[/dim]") return table = Table(title="Collections") table.add_column("ID", style="cyan") table.add_column("Name", style="green") table.add_column("Description", style="dim") table.add_column("Created", style="dim") for c in collections: table.add_row( str(c["id"]), c["name"], c["description"] or "-", c["created_at"][:10], ) console.print(table) @collection.command(name="delete") @click.argument("collection_id", type=int) def collection_delete(collection_id: int): """Delete a collection.""" collection = db.get_collection(collection_id) if not collection: console.print(f"[red]Collection {collection_id} not found[/red]") return if click.confirm(f"Delete collection '{collection['name']}'?"): db.delete_collection(collection_id) console.print(f"[green]Collection {collection_id} deleted[/green]") @collection.command(name="add") @click.argument("collection_id", type=int) @click.argument("snippet_id", type=int) def collection_add(collection_id: int, snippet_id: int): """Add a snippet to a collection.""" if db.add_snippet_to_collection(snippet_id, collection_id): console.print(f"[green]Snippet {snippet_id} added to collection {collection_id}[/green]") else: console.print("[red]Failed to add snippet to collection[/red]") @collection.command(name="remove") @click.argument("collection_id", type=int) @click.argument("snippet_id", type=int) def collection_remove(collection_id: int, snippet_id: int): """Remove a snippet from a collection.""" if db.remove_snippet_from_collection(snippet_id, collection_id): console.print(f"[green]Snippet {snippet_id} removed from collection {collection_id}[/green]") else: console.print("[red]Failed to remove snippet from collection[/red]") @cli.group() def export(): """Export snippets.""" pass @export.command(name="all") @click.option("--file", required=True, help="Output file path") def export_all(file: str): """Export all snippets.""" snippets = db.export_all() export_snippets(snippets, file) console.print(f"[green]Exported {len(snippets)} snippets to {file}[/green]") @export.command(name="collection") @click.argument("collection_name") @click.option("--file", required=True, help="Output file path") def export_collection(collection_name: str, file: str): """Export a collection.""" collections = db.list_collections() collection = next((c for c in collections if c["name"] == collection_name), None) if not collection: console.print(f"[red]Collection '{collection_name}' not found[/red]") return snippets = db.get_collection_snippets(collection["id"]) export_snippets(snippets, file) console.print(f"[green]Exported {len(snippets)} snippets to {file}[/green]") @export.command(name="snippet") @click.argument("snippet_id", type=int) @click.option("--file", required=True, help="Output file path") def export_snippet(snippet_id: int, file: str): """Export a single snippet.""" snippet = db.get_snippet(snippet_id) if not snippet: console.print(f"[red]Snippet {snippet_id} not found[/red]") return export_snippets([snippet], file) console.print(f"[green]Exported snippet {snippet_id} to {file}[/green]") @cli.command() @click.option("--file", required=True, help="Input file path") @click.option("--strategy", default="skip", type=click.Choice(["skip", "replace", "duplicate"]), help="Import strategy") def import_cmd(file: str, strategy: str): """Import snippets from a JSON file.""" try: imported, skipped = import_snippets(db, file, strategy) console.print(f"[green]Imported {imported} snippets, skipped {skipped}[/green]") except Exception as e: console.print(f"[red]Import failed: {e}[/red]") @cli.group() def discover(): """Discover peers on the network.""" pass @discover.command(name="list") def discover_list(): """List discovered peers.""" from snip.sync.discovery import DiscoveryService discovery = DiscoveryService() peers = discovery.discover_peers(timeout=5.0) if not peers: console.print("[dim]No peers discovered[/dim]") return table = Table(title="Discovered Peers") table.add_column("Peer ID", style="cyan") table.add_column("Host", style="green") table.add_column("Port", style="magenta") for peer in peers: table.add_row(peer["peer_id"], peer["host"], str(peer["port"])) console.print(table) @cli.command() @click.option("--peer-id", required=True, help="Peer ID to sync with") def sync(peer_id: str): """Sync snippets with a peer.""" from snip.sync.protocol import SyncProtocol peers = db.list_peers() peer = next((p for p in peers if p["peer_id"] == peer_id), None) if not peer: console.print(f"[red]Peer {peer_id} not found[/red]") return sync_proto = SyncProtocol(db) try: synced = sync_proto.sync_with_peer(peer["host"], peer["port"]) console.print(f"[green]Synced {synced} snippets with peer {peer_id}[/green]") except Exception as e: console.print(f"[red]Sync failed: {e}[/red]") @cli.command() def peers(): """List known sync peers.""" peers = db.list_peers() if not peers: console.print("[dim]No known peers[/dim]") return table = Table(title="Known Peers") table.add_column("Peer ID", style="cyan") table.add_column("Host", style="green") table.add_column("Port", style="magenta") table.add_column("Last Seen", style="dim") for p in peers: table.add_row(p["peer_id"], p["host"], str(p["port"]), p["last_seen"][:10]) console.print(table) if __name__ == "__main__": cli()