"""Click CLI commands for snippet manager.""" import os import click from pygments.lexers import get_lexer_by_name, guess_lexer from rich.console import Console from rich.syntax import Syntax from rich.table import Table from ..crypto import CryptoService from ..db import get_database from ..export import ExportHandler from ..search import SearchEngine from ..sync import DiscoveryService, SyncProtocol console = Console() @click.group() @click.pass_context def cli(ctx): """Snip - Local-First Code Snippet Manager.""" ctx.ensure_object(dict) db_path = os.environ.get("SNIP_DB_PATH") ctx.obj["db"] = get_database(db_path) ctx.obj["search"] = SearchEngine(db_path) ctx.obj["crypto"] = CryptoService() ctx.obj["export"] = ExportHandler(db_path) @cli.command() @click.pass_context def init(ctx): """Initialize the snippet database.""" db = ctx.obj["db"] db.init_schema() console.print("[green]Database initialized successfully![/green]") @cli.command() @click.option("--title", "-t", prompt="Snippet title", help="Snippet title") @click.option("--code", "-c", prompt="Code", help="Code content") @click.option("--description", "-d", default="", help="Snippet description") @click.option("--language", "-l", default="text", help="Programming language") @click.option("--tags", help="Comma-separated tags") @click.option("--encrypt", is_flag=True, help="Encrypt the snippet") @click.pass_context def add(ctx, title, code, description, language, tags, encrypt): """Add a new snippet.""" db = ctx.obj["db"] crypto = ctx.obj["crypto"] tag_list = [t.strip() for t in tags.split(",")] if tags else [] if encrypt: if not crypto.has_key(): password = click.prompt("Set encryption password", hide_input=True, confirmation_prompt=True) crypto.set_password(password) code = crypto.encrypt(code) is_encrypted = True else: is_encrypted = False snippet_id = db.create_snippet( title=title, code=code, description=description, language=language, tags=tag_list, is_encrypted=is_encrypted, ) console.print(f"[green]Snippet created with ID: {snippet_id}[/green]") @cli.command() @click.argument("snippet_id", type=int) @click.option("--no-highlight", is_flag=True, help="Disable syntax highlighting") @click.option("--style", default="monokai", help="Pygments style") @click.option("--line-numbers", is_flag=True, help="Show line numbers") @click.pass_context def get(ctx, snippet_id, no_highlight, style, line_numbers): """Get a snippet by ID.""" db = ctx.obj["db"] crypto = ctx.obj["crypto"] snippet = db.get_snippet(snippet_id) if not snippet: console.print(f"[red]Snippet {snippet_id} not found[/red]") return code = snippet["code"] if snippet.get("is_encrypted"): try: code = crypto.decrypt(code) except Exception: console.print("[red]Failed to decrypt snippet[/red]") return console.print(f"\n[bold]{snippet['title']}[/bold]") console.print(f"Language: {snippet['language']} | Tags: {', '.join(snippet.get('tags', []) or 'none')}") if snippet.get("description"): console.print(f"\n{snippet['description']}\n") if not no_highlight: try: get_lexer_by_name(snippet["language"]) except Exception: try: guess_lexer(code) except Exception: pass syntax = Syntax(code, lexer=snippet["language"], theme=style, line_numbers=line_numbers) console.print(syntax) else: console.print(code) @cli.command() @click.option("--language", "-l", help="Filter by language") @click.option("--tag", help="Filter by tag") @click.option("--collection", "-c", help="Filter by collection name") @click.option("--limit", "-n", default=50, help="Number of results") @click.option("--offset", default=0, help="Offset for pagination") @click.option("--format", "-f", type=click.Choice(["table", "list"]), default="table", help="Output format") @click.pass_context def list(ctx, language, tag, collection, limit, offset, format): """List snippets.""" db = ctx.obj["db"] collection_id = None if collection: collections = db.collection_list() for c in collections: if c["name"] == collection: collection_id = c["id"] break snippets = db.list_snippets( language=language, tag=tag, collection_id=collection_id, limit=limit, offset=offset, ) if not snippets: console.print("[dim]No snippets found[/dim]") return if format == "table": table = Table(show_header=True) table.add_column("ID", style="cyan") table.add_column("Title") table.add_column("Language", style="green") table.add_column("Tags") table.add_column("Updated") for s in snippets: tags_str = ", ".join(s.get("tags", [])[:3]) if len(s.get("tags", [])) > 3: tags_str += "..." updated = s["updated_at"][:10] table.add_row(str(s["id"]), s["title"], s["language"], tags_str, updated) console.print(table) else: for s in snippets: lock = "[red]🔒[/red]" if s.get("is_encrypted") else "" console.print(f"{s['id']}: {s['title']} ({s['language']}) {lock}") @cli.command() @click.argument("snippet_id", type=int) @click.option("--title", "-t", help="New title") @click.option("--code", "-c", help="New code") @click.option("--description", "-d", help="New description") @click.option("--language", "-l", help="New language") @click.option("--tags", help="Comma-separated tags") @click.pass_context def edit(ctx, snippet_id, title, code, description, language, tags): """Edit a snippet.""" db = ctx.obj["db"] snippet = db.get_snippet(snippet_id) if not snippet: console.print(f"[red]Snippet {snippet_id} not found[/red]") return tag_list = [t.strip() for t in tags.split(",")] if tags else None if code and snippet.get("is_encrypted"): crypto = ctx.obj["crypto"] code = crypto.encrypt(code) db.update_snippet( snippet_id, title=title, description=description, code=code, language=language, tags=tag_list, ) console.print(f"[green]Snippet {snippet_id} updated[/green]") @cli.command() @click.argument("snippet_id", type=int) @click.option("--force", is_flag=True, help="Skip confirmation") @click.pass_context def delete(ctx, snippet_id, force): """Delete a snippet.""" db = ctx.obj["db"] if not force: if not click.confirm(f"Delete snippet {snippet_id}?"): return if db.delete_snippet(snippet_id): console.print(f"[green]Snippet {snippet_id} deleted[/green]") else: console.print(f"[red]Snippet {snippet_id} not found[/red]") @cli.command() @click.argument("query") @click.option("--language", "-l", help="Filter by language") @click.option("--tag", help="Filter by tag") @click.option("--limit", "-n", default=50, help="Number of results") @click.option("--offset", default=0, help="Offset for pagination") @click.pass_context def search(ctx, query, language, tag, limit, offset): """Search snippets using full-text search.""" search_engine = ctx.obj["search"] results = search_engine.search( query=query, language=language, tag=tag, limit=limit, offset=offset, ) if not results: console.print("[dim]No results found[/dim]") return table = Table(show_header=True) table.add_column("ID", style="cyan") table.add_column("Title") table.add_column("Language", style="green") table.add_column("Match") for s in results: match_info = s.get("description", "")[:50] if s.get("description") else "" table.add_row(str(s["id"]), s["title"], s["language"], match_info) console.print(table) console.print(f"\n[dim]Found {len(results)} results[/dim]") @cli.group() def tag(): """Tag management commands.""" pass @tag.command(name="add") @click.argument("snippet_id", type=int) @click.argument("tag_name") @click.pass_context def tag_add(ctx, snippet_id, tag_name): """Add a tag to a snippet.""" db = ctx.obj["db"] db.tag_add(snippet_id, tag_name) console.print(f"[green]Tag '{tag_name}' added to snippet {snippet_id}[/green]") @tag.command(name="remove") @click.argument("snippet_id", type=int) @click.argument("tag_name") @click.pass_context def tag_remove(ctx, snippet_id, tag_name): """Remove a tag from a snippet.""" db = ctx.obj["db"] db.tag_remove(snippet_id, tag_name) console.print(f"[green]Tag '{tag_name}' removed from snippet {snippet_id}[/green]") @tag.command(name="list") @click.pass_context def tag_list(ctx): """List all tags.""" db = ctx.obj["db"] tags = db.list_tags() if tags: console.print(", ".join(tags)) else: console.print("[dim]No tags found[/dim]") @cli.group() def collection(): """Collection management commands.""" pass @collection.command(name="create") @click.argument("name") @click.option("--description", "-d", default="", help="Collection description") @click.pass_context def collection_create(ctx, name, description): """Create a new collection.""" db = ctx.obj["db"] collection_id = db.collection_create(name, description) console.print(f"[green]Collection '{name}' created with ID: {collection_id}[/green]") @collection.command(name="list") @click.pass_context def collection_list(ctx): """List all collections.""" db = ctx.obj["db"] collections = db.collection_list() if not collections: console.print("[dim]No collections found[/dim]") return table = Table(show_header=True) table.add_column("ID", style="cyan") table.add_column("Name") table.add_column("Description") table.add_column("Snippets") for c in collections: table.add_row(str(c["id"]), c["name"], c.get("description", ""), str(c.get("snippet_count", 0))) console.print(table) @collection.command(name="delete") @click.argument("collection_id", type=int) @click.option("--force", is_flag=True, help="Skip confirmation") @click.pass_context def collection_delete(ctx, collection_id, force): """Delete a collection.""" db = ctx.obj["db"] if not force: if not click.confirm(f"Delete collection {collection_id}?"): return if db.collection_delete(collection_id): console.print(f"[green]Collection {collection_id} deleted[/green]") else: console.print(f"[red]Collection {collection_id} not found[/red]") @collection.command(name="add") @click.argument("collection_id", type=int) @click.argument("snippet_id", type=int) @click.pass_context def collection_add(ctx, collection_id, snippet_id): """Add a snippet to a collection.""" db = ctx.obj["db"] db.collection_add_snippet(collection_id, snippet_id) console.print(f"[green]Snippet {snippet_id} added to collection {collection_id}[/green]") @collection.command(name="remove") @click.argument("collection_id", type=int) @click.argument("snippet_id", type=int) @click.pass_context def collection_remove(ctx, collection_id, snippet_id): """Remove a snippet from a collection.""" db = ctx.obj["db"] db.collection_remove_snippet(collection_id, snippet_id) console.print(f"[green]Snippet {snippet_id} removed from collection {collection_id}[/green]") @cli.group() def export(): """Export commands.""" pass @export.command(name="all") @click.option("--file", "-f", required=True, help="Output file path") @click.pass_context def export_all(ctx, file): """Export all snippets to JSON.""" export_handler = ctx.obj["export"] data = export_handler.export_all() export_handler.write_export(file, data) console.print(f"[green]Exported to {file}[/green]") @export.command(name="collection") @click.argument("collection_name") @click.option("--file", "-f", required=True, help="Output file path") @click.pass_context def export_collection(ctx, collection_name, file): """Export a collection to JSON.""" db = ctx.obj["db"] export_handler = ctx.obj["export"] collections = db.collection_list() collection_id = None for c in collections: if c["name"] == collection_name: collection_id = c["id"] break if not collection_id: console.print(f"[red]Collection '{collection_name}' not found[/red]") return data = export_handler.export_collection(collection_id) export_handler.write_export(file, data) console.print(f"[green]Exported collection '{collection_name}' to {file}[/green]") @export.command(name="snippet") @click.argument("snippet_id", type=int) @click.option("--file", "-f", required=True, help="Output file path") @click.pass_context def export_snippet(ctx, snippet_id, file): """Export a snippet to JSON.""" export_handler = ctx.obj["export"] data = export_handler.export_snippet(snippet_id) if data: export_handler.write_export(file, data) console.print(f"[green]Exported snippet {snippet_id} to {file}[/green]") else: console.print(f"[red]Snippet {snippet_id} not found[/red]") @cli.command(name='import') @click.option("--file", "-f", required=True, help="Input file path") @click.option( "--strategy", "-s", type=click.Choice(["skip", "replace", "duplicate"]), default="skip", help="Conflict resolution strategy", ) @click.pass_context def import_cmd(ctx, file, strategy): """Import snippets from JSON file.""" export_handler = ctx.obj["export"] try: results = export_handler.import_data(file, strategy) console.print(export_handler.generate_import_summary(results)) except Exception as e: console.print(f"[red]Import failed: {e}[/red]") @cli.group() def discover(): """Peer discovery commands.""" pass @discover.command(name="list") @click.option("--timeout", "-t", default=5.0, help="Discovery timeout in seconds") @click.pass_context def discover_list(ctx, timeout): """List discovered peers on the network.""" discovery = DiscoveryService() peers = discovery.discover_peers(timeout) if not peers: console.print("[dim]No peers discovered[/dim]") return table = Table(show_header=True) table.add_column("Peer ID", style="cyan") table.add_column("Name") table.add_column("Address") table.add_column("Port") for p in peers: addr = ", ".join(p.get("addresses", [])) or "unknown" table.add_row(p["peer_id"], p.get("peer_name", ""), addr, str(p.get("port", "?"))) console.print(table) @cli.command() @click.option("--peer-id", help="Peer ID to sync with") @click.option("--timeout", "-t", default=30.0, help="Sync timeout in seconds") @click.pass_context def sync(ctx, peer_id, timeout): """Sync snippets with a peer.""" discovery = DiscoveryService() protocol = SyncProtocol() protocol.start_server() try: peers = discovery.discover_peers(5.0) if not peers: console.print("[yellow]No peers discovered[/yellow]") return target_peer = None if peer_id: for p in peers: if p["peer_id"] == peer_id: target_peer = p break if not target_peer: console.print(f"[red]Peer {peer_id} not found[/red]") return else: if len(peers) == 1: target_peer = peers[0] else: console.print("Available peers:") for i, p in enumerate(peers): console.print(f" {i + 1}. {p['peer_name']} ({p['peer_id']})") choice = click.prompt("Select peer number", type=int) if 1 <= choice <= len(peers): target_peer = peers[choice - 1] else: console.print("[red]Invalid selection[/red]") return console.print(f"[cyan]Syncing with {target_peer['peer_name']}...[/cyan]") result = protocol.sync_with_peer(target_peer) if result["status"] == "success": console.print(f"[green]Sync complete! Merged: {result['merged']}, Pushed: {result['pushed']}[/green]") else: console.print(f"[red]Sync failed: {result.get('message', 'Unknown error')}[/red]") finally: protocol.stop_server() @cli.command() @click.pass_context def peers(ctx): """List known sync peers.""" db = ctx.obj["db"] peer_list = db.list_sync_peers() if not peer_list: console.print("[dim]No known peers[/dim]") return table = Table(show_header=True) table.add_column("Peer ID", style="cyan") table.add_column("Name") table.add_column("Last Sync") table.add_column("Address") for p in peer_list: last_sync = p.get("last_sync", "never") if last_sync: last_sync = last_sync[:19] table.add_row(p["peer_id"], p.get("peer_name", ""), last_sync, p.get("peer_address", "")) console.print(table) if __name__ == "__main__": cli()