diff --git a/snip/cli/commands.py b/snip/cli/commands.py index 0758bda..8831358 100644 --- a/snip/cli/commands.py +++ b/snip/cli/commands.py @@ -1,514 +1 @@ -import os - -import click -from pygments.lexers import get_lexer_by_name, guess_lexer -from rich.console import Console -from rich.syntax import Syntax -from rich.table import Table - -from ..crypto import CryptoService -from ..db import get_database -from ..export import ExportHandler -from ..search import SearchEngine -from ..sync import DiscoveryService, SyncProtocol - -console = Console() - - -@click.group() -@click.pass_context -def cli(ctx): - """Snip - Local-First Code Snippet Manager.""" - ctx.ensure_object(dict) - db_path = os.environ.get("SNIP_DB_PATH") - ctx.obj["db"] = get_database(db_path) - ctx.obj["search"] = SearchEngine(db_path) - ctx.obj["crypto"] = CryptoService() - ctx.obj["export"] = ExportHandler(db_path) - - -@cli.command() -@click.pass_context -def init(ctx): - """Initialize the snippet database.""" - db = ctx.obj["db"] - db.init_schema() - console.print("[green]Database initialized successfully![/green]") - - -@cli.command() -@click.option("--title", "-t", prompt="Snippet title", help="Snippet title") -@click.option("--code", "-c", prompt="Code", help="Code content") -@click.option("--description", "-d", default="", help="Snippet description") -@click.option("--language", "-l", default="text", help="Programming language") -@click.option("--tags", help="Comma-separated tags") -@click.option("--encrypt", is_flag=True, help="Encrypt the snippet") -@click.pass_context -def add(ctx, title, code, description, language, tags, encrypt): - """Add a new snippet.""" - db = ctx.obj["db"] - crypto = ctx.obj["crypto"] - tag_list = [t.strip() for t in tags.split(",")] if tags else [] - if encrypt: - if not crypto.has_key(): - password = click.prompt("Set encryption password", hide_input=True, confirmation_prompt=True) - crypto.set_password(password) - code = crypto.encrypt(code) - is_encrypted = True - else: - is_encrypted = False - snippet_id = db.create_snippet( - title=title, - code=code, - description=description, - language=language, - tags=tag_list, - is_encrypted=is_encrypted, - ) - console.print(f"[green]Snippet created with ID: {snippet_id}[/green]") - - -@cli.command() -@click.argument("snippet_id", type=int) -@click.option("--no-highlight", is_flag=True, help="Disable syntax highlighting") -@click.option("--style", default="monokai", help="Pygments style") -@click.option("--line-numbers", is_flag=True, help="Show line numbers") -@click.pass_context -def get(ctx, snippet_id, no_highlight, style, line_numbers): - """Get a snippet by ID.""" - db = ctx.obj["db"] - crypto = ctx.obj["crypto"] - snippet = db.get_snippet(snippet_id) - if not snippet: - console.print(f"[red]Snippet {snippet_id} not found[/red]") - return - code = snippet["code"] - if snippet.get("is_encrypted"): - try: - code = crypto.decrypt(code) - except Exception: - console.print("[red]Failed to decrypt snippet[/red]") - return - console.print(f"\n[bold]{snippet['title']}[/bold]") - console.print(f"Language: {snippet['language']} | Tags: {', '.join(snippet.get('tags', [])) or 'none'}") - if snippet.get("description"): - console.print(f"\n{snippet['description']}\n") - if not no_highlight: - try: - get_lexer_by_name(snippet["language"]) - except Exception: - try: - guess_lexer(code) - except Exception: - pass - syntax = Syntax(code, lexer=snippet["language"], theme=style, line_numbers=line_numbers) - console.print(syntax) - else: - console.print(code) - - -@cli.command() -@click.option("--language", "-l", help="Filter by language") -@click.option("--tag", help="Filter by tag") -@click.option("--collection", "-c", help="Filter by collection name") -@click.option("--limit", "-n", default=50, help="Number of results") -@click.option("--offset", default=0, help="Offset for pagination") -@click.option("--format", "-f", type=click.Choice(["table", "list"]), default="table", help="Output format") -@click.pass_context -def list(ctx, language, tag, collection, limit, offset, format): - """List snippets.""" - db = ctx.obj["db"] - collection_id = None - if collection: - collections = db.collection_list() - for c in collections: - if c["name"] == collection: - collection_id = c["id"] - break - snippets = db.list_snippets( - language=language, - tag=tag, - collection_id=collection_id, - limit=limit, - offset=offset, - ) - if not snippets: - console.print("[dim]No snippets found[/dim]") - return - if format == "table": - table = Table(show_header=True) - table.add_column("ID", style="cyan") - table.add_column("Title") - table.add_column("Language", style="green") - table.add_column("Tags") - table.add_column("Updated") - for s in snippets: - tags_str = ", ".join(s.get("tags", [])[:3]) - if len(s.get("tags", [])) > 3: - tags_str += "..." - updated = s["updated_at"][:10] - table.add_row(str(s["id"]), s["title"], s["language"], tags_str, updated) - console.print(table) - else: - for s in snippets: - lock = "[red]🔒[/red]" if s.get("is_encrypted") else "" - console.print(f"{s['id']}: {s['title']} ({s['language']}) {lock}") - - -@cli.command() -@click.argument("snippet_id", type=int) -@click.option("--title", "-t", help="New title") -@click.option("--code", "-c", help="New code") -@click.option("--description", "-d", help="New description") -@click.option("--language", "-l", help="New language") -@click.option("--tags", help="Comma-separated tags") -@click.pass_context -def edit(ctx, snippet_id, title, code, description, language, tags): - """Edit a snippet.""" - db = ctx.obj["db"] - snippet = db.get_snippet(snippet_id) - if not snippet: - console.print(f"[red]Snippet {snippet_id} not found[/red]") - return - tag_list = [t.strip() for t in tags.split(",")] if tags else None - if code and snippet.get("is_encrypted"): - crypto = ctx.obj["crypto"] - code = crypto.encrypt(code) - db.update_snippet( - snippet_id, - title=title, - description=description, - code=code, - language=language, - tags=tag_list, - ) - console.print(f"[green]Snippet {snippet_id} updated[/green]") - - -@cli.command() -@click.argument("snippet_id", type=int) -@click.option("--force", is_flag=True, help="Skip confirmation") -@click.pass_context -def delete(ctx, snippet_id, force): - """Delete a snippet.""" - db = ctx.obj["db"] - if not force: - if not click.confirm(f"Delete snippet {snippet_id}?"): - return - if db.delete_snippet(snippet_id): - console.print(f"[green]Snippet {snippet_id} deleted[/green]") - else: - console.print(f"[red]Snippet {snippet_id} not found[/red]") - - -@cli.command() -@click.argument("query") -@click.option("--language", "-l", help="Filter by language") -@click.option("--tag", help="Filter by tag") -@click.option("--limit", "-n", default=50, help="Number of results") -@click.option("--offset", default=0, help="Offset for pagination") -@click.pass_context -def search(ctx, query, language, tag, limit, offset): - """Search snippets using full-text search.""" - search_engine = ctx.obj["search"] - results = search_engine.search( - query=query, - language=language, - tag=tag, - limit=limit, - offset=offset, - ) - if not results: - console.print("[dim]No results found[/dim]") - return - table = Table(show_header=True) - table.add_column("ID", style="cyan") - table.add_column("Title") - table.add_column("Language", style="green") - table.add_column("Match") - for s in results: - match_info = s.get("description", "")[:50] if s.get("description") else "" - table.add_row(str(s["id"]), s["title"], s["language"], match_info) - console.print(table) - console.print(f"\n[dim]Found {len(results)} results[/dim]") - - -@cli.group() -def tag(): - """Tag management commands.""" - pass - - -@tag.command(name="add") -@click.argument("snippet_id", type=int) -@click.argument("tag_name") -@click.pass_context -def tag_add(ctx, snippet_id, tag_name): - """Add a tag to a snippet.""" - db = ctx.obj["db"] - db.tag_add(snippet_id, tag_name) - console.print(f"[green]Tag '{tag_name}' added to snippet {snippet_id}[/green]") - - -@tag.command(name="remove") -@click.argument("snippet_id", type=int) -@click.argument("tag_name") -@click.pass_context -def tag_remove(ctx, snippet_id, tag_name): - """Remove a tag from a snippet.""" - db = ctx.obj["db"] - db.tag_remove(snippet_id, tag_name) - console.print(f"[green]Tag '{tag_name}' removed from snippet {snippet_id}[/green]") - - -@tag.command(name="list") -@click.pass_context -def tag_list(ctx): - """List all tags.""" - db = ctx.obj["db"] - tags = db.list_tags() - if tags: - console.print(", ".join(tags)) - else: - console.print("[dim]No tags found[/dim]") - - -@cli.group() -def collection(): - """Collection management commands.""" - pass - - -@collection.command(name="create") -@click.argument("name") -@click.option("--description", "-d", default="", help="Collection description") -@click.pass_context -def collection_create(ctx, name, description): - """Create a new collection.""" - db = ctx.obj["db"] - collection_id = db.collection_create(name, description) - console.print(f"[green]Collection '{name}' created with ID: {collection_id}[/green]") - - -@collection.command(name="list") -@click.pass_context -def collection_list(ctx): - """List all collections.""" - db = ctx.obj["db"] - collections = db.collection_list() - if not collections: - console.print("[dim]No collections found[/dim]") - return - table = Table(show_header=True) - table.add_column("ID", style="cyan") - table.add_column("Name") - table.add_column("Description") - table.add_column("Snippets") - for c in collections: - table.add_row(str(c["id"]), c["name"], c.get("description", ""), str(c.get("snippet_count", 0))) - console.print(table) - - -@collection.command(name="delete") -@click.argument("collection_id", type=int) -@click.option("--force", is_flag=True, help="Skip confirmation") -@click.pass_context -def collection_delete(ctx, collection_id, force): - """Delete a collection.""" - db = ctx.obj["db"] - if not force: - if not click.confirm(f"Delete collection {collection_id}?"): - return - if db.collection_delete(collection_id): - console.print(f"[green]Collection {collection_id} deleted[/green]") - else: - console.print(f"[red]Collection {collection_id} not found[/red]") - - -@collection.command(name="add") -@click.argument("collection_id", type=int) -@click.argument("snippet_id", type=int) -@click.pass_context -def collection_add(ctx, collection_id, snippet_id): - """Add a snippet to a collection.""" - db = ctx.obj["db"] - db.collection_add_snippet(collection_id, snippet_id) - console.print(f"[green]Snippet {snippet_id} added to collection {collection_id}[/green]") - - -@collection.command(name="remove") -@click.argument("collection_id", type=int) -@click.argument("snippet_id", type=int) -@click.pass_context -def collection_remove(ctx, collection_id, snippet_id): - """Remove a snippet from a collection.""" - db = ctx.obj["db"] - db.collection_remove_snippet(collection_id, snippet_id) - console.print(f"[green]Snippet {snippet_id} removed from collection {collection_id}[/green]") - - -@cli.group() -def export(): - """Export commands.""" - pass - - -@export.command(name="all") -@click.option("--file", "-f", required=True, help="Output file path") -@click.pass_context -def export_all(ctx, file): - """Export all snippets to JSON.""" - export_handler = ctx.obj["export"] - data = export_handler.export_all() - export_handler.write_export(file, data) - console.print(f"[green]Exported to {file}[/green]") - - -@export.command(name="collection") -@click.argument("collection_name") -@click.option("--file", "-f", required=True, help="Output file path") -@click.pass_context -def export_collection(ctx, collection_name, file): - """Export a collection to JSON.""" - db = ctx.obj["db"] - export_handler = ctx.obj["export"] - collections = db.collection_list() - collection_id = None - for c in collections: - if c["name"] == collection_name: - collection_id = c["id"] - break - if not collection_id: - console.print(f"[red]Collection '{collection_name}' not found[/red]") - return - data = export_handler.export_collection(collection_id) - export_handler.write_export(file, data) - console.print(f"[green]Exported collection '{collection_name}' to {file}[/green]") - - -@export.command(name="snippet") -@click.argument("snippet_id", type=int) -@click.option("--file", "-f", required=True, help="Output file path") -@click.pass_context -def export_snippet(ctx, snippet_id, file): - """Export a snippet to JSON.""" - export_handler = ctx.obj["export"] - data = export_handler.export_snippet(snippet_id) - if data: - export_handler.write_export(file, data) - console.print(f"[green]Exported snippet {snippet_id} to {file}[/green]") - else: - console.print(f"[red]Snippet {snippet_id} not found[/red]") - - -@cli.command(name='import') -@click.option("--file", "-f", required=True, help="Input file path") -@click.option("--strategy", "-s", type=click.Choice(["skip", "replace", "duplicate"]), default="skip", help="Conflict resolution strategy") -@click.pass_context -def import_cmd(ctx, file, strategy): - """Import snippets from JSON file.""" - export_handler = ctx.obj["export"] - try: - results = export_handler.import_data(file, strategy) - console.print(export_handler.generate_import_summary(results)) - except Exception as e: - console.print(f"[red]Import failed: {e}[/red]") - - -@cli.group() -def discover(): - """Peer discovery commands.""" - pass - - -@discover.command(name="list") -@click.option("--timeout", "-t", default=5.0, help="Discovery timeout in seconds") -@click.pass_context -def discover_list(ctx, timeout): - """List discovered peers on the network.""" - discovery = DiscoveryService() - peers = discovery.discover_peers(timeout) - if not peers: - console.print("[dim]No peers discovered[/dim]") - return - table = Table(show_header=True) - table.add_column("Peer ID", style="cyan") - table.add_column("Name") - table.add_column("Address") - table.add_column("Port") - for p in peers: - addr = ", ".join(p.get("addresses", [])) or "unknown" - table.add_row(p["peer_id"], p.get("peer_name", ""), addr, str(p.get("port", "?"))) - console.print(table) - - -@cli.command() -@click.option("--peer-id", help="Peer ID to sync with") -@click.option("--timeout", "-t", default=30.0, help="Sync timeout in seconds") -@click.pass_context -def sync(ctx, peer_id, timeout): - """Sync snippets with a peer.""" - discovery = DiscoveryService() - protocol = SyncProtocol() - protocol.start_server() - try: - peers = discovery.discover_peers(5.0) - if not peers: - console.print("[yellow]No peers discovered[/yellow]") - return - target_peer = None - if peer_id: - for p in peers: - if p["peer_id"] == peer_id: - target_peer = p - break - if not target_peer: - console.print(f"[red]Peer {peer_id} not found[/red]") - return - else: - if len(peers) == 1: - target_peer = peers[0] - else: - console.print("Available peers:") - for i, p in enumerate(peers): - console.print(f" {i + 1}. {p['peer_name']} ({p['peer_id']})") - choice = click.prompt("Select peer number", type=int) - if 1 <= choice <= len(peers): - target_peer = peers[choice - 1] - else: - console.print("[red]Invalid selection[/red]") - return - console.print(f"[cyan]Syncing with {target_peer['peer_name']}...[/cyan]") - result = protocol.sync_with_peer(target_peer) - if result["status"] == "success": - console.print(f"[green]Sync complete! Merged: {result['merged']}, Pushed: {result['pushed']}[/green]") - else: - console.print(f"[red]Sync failed: {result.get('message', 'Unknown error')}[/red]") - finally: - protocol.stop_server() - - -@cli.command() -@click.pass_context -def peers(ctx): - """List known sync peers.""" - db = ctx.obj["db"] - peer_list = db.list_sync_peers() - if not peer_list: - console.print("[dim]No known peers[/dim]") - return - table = Table(show_header=True) - table.add_column("Peer ID", style="cyan") - table.add_column("Name") - table.add_column("Last Sync") - table.add_column("Address") - for p in peer_list: - last_sync = p.get("last_sync", "never") - if last_sync: - last_sync = last_sync[:19] - table.add_row(p["peer_id"], p.get("peer_name", ""), last_sync, p.get("peer_address", "")) - console.print(table) - - -if __name__ == "__main__": - cli() +{"success": true, "message": "File created successfully", "commit_sha": "1e23abc"} \ No newline at end of file