480 lines
15 KiB
Python
480 lines
15 KiB
Python
"""Click CLI commands for snippet management."""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import click
|
|
from rich.console import Console
|
|
from rich.syntax import Syntax
|
|
from rich.table import Table
|
|
|
|
from snip.crypto.service import CryptoService
|
|
from snip.db.database import Database
|
|
from snip.export.handlers import export_snippets, import_snippets
|
|
from snip.search.engine import SearchEngine
|
|
|
|
console = Console()
|
|
db = Database()
|
|
crypto_service = CryptoService()
|
|
search_engine = SearchEngine(db)
|
|
|
|
|
|
@click.group()
|
|
@click.version_option(version="0.1.0")
|
|
def cli():
|
|
"""Snip - Local-First Code Snippet Manager."""
|
|
pass
|
|
|
|
|
|
@cli.command()
|
|
def init():
|
|
"""Initialize the snippet database."""
|
|
db.init_db()
|
|
console.print("[green]Database initialized successfully![/green]")
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--title", prompt="Title", help="Snippet title")
|
|
@click.option("--code", prompt="Code", help="Snippet code")
|
|
@click.option("--description", default="", help="Snippet description")
|
|
@click.option("--language", default="", help="Programming language")
|
|
@click.option("--tag", multiple=True, help="Tags to add")
|
|
@click.option("--encrypt", is_flag=True, help="Encrypt the snippet")
|
|
def add(title: str, code: str, description: str, language: str, tag: tuple, encrypt: bool):
|
|
"""Add a new snippet."""
|
|
tags = list(tag)
|
|
is_encrypted = False
|
|
|
|
if encrypt:
|
|
password = click.prompt("Encryption password", hide_input=True, confirmation_prompt=True)
|
|
code = crypto_service.encrypt(code, password)
|
|
is_encrypted = True
|
|
|
|
snippet_id = db.add_snippet(
|
|
title=title,
|
|
code=code,
|
|
description=description,
|
|
language=language,
|
|
tags=tags,
|
|
is_encrypted=is_encrypted,
|
|
)
|
|
console.print(f"[green]Snippet added with ID {snippet_id}[/green]")
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("snippet_id", type=int)
|
|
@click.option("--decrypt", help="Decryption password", default=None, hide_input=True)
|
|
def get(snippet_id: int, decrypt: str | None):
|
|
"""Get a snippet by ID."""
|
|
snippet = db.get_snippet(snippet_id)
|
|
if not snippet:
|
|
console.print(f"[red]Snippet {snippet_id} not found[/red]")
|
|
return
|
|
|
|
code = snippet["code"]
|
|
if snippet["is_encrypted"]:
|
|
if not decrypt:
|
|
decrypt = click.prompt("Decryption password", hide_input=True)
|
|
try:
|
|
code = crypto_service.decrypt(code, decrypt)
|
|
except Exception as e:
|
|
console.print(f"[red]Decryption failed: {e}[/red]")
|
|
return
|
|
|
|
language = snippet["language"] or "text"
|
|
syntax = Syntax(code, language, theme="monokai", line_numbers=True)
|
|
|
|
console.print(f"\n[bold]{snippet['title']}[/bold]")
|
|
if snippet["description"]:
|
|
console.print(f"[dim]{snippet['description']}[/dim]")
|
|
console.print(f"[dim]Language: {language} | Tags: {snippet['tags']}[/dim]\n")
|
|
console.print(syntax)
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--limit", default=50, help="Maximum number of snippets")
|
|
@click.option("--offset", default=0, help="Offset for pagination")
|
|
@click.option("--tag", default=None, help="Filter by tag")
|
|
def list(limit: int, offset: int, tag: str | None):
|
|
"""List all snippets."""
|
|
snippets = db.list_snippets(limit=limit, offset=offset, tag=tag)
|
|
|
|
if not snippets:
|
|
console.print("[dim]No snippets found[/dim]")
|
|
return
|
|
|
|
table = Table(title="Snippets")
|
|
table.add_column("ID", style="cyan")
|
|
table.add_column("Title", style="green")
|
|
table.add_column("Language", style="magenta")
|
|
table.add_column("Tags", style="yellow")
|
|
table.add_column("Updated", style="dim")
|
|
|
|
for s in snippets:
|
|
tags_str = json.loads(s.get("tags", "[]")) if isinstance(s.get("tags"), str) else s.get("tags", [])
|
|
table.add_row(
|
|
str(s["id"]),
|
|
s["title"],
|
|
s["language"] or "-",
|
|
", ".join(tags_str) if tags_str else "-",
|
|
s["updated_at"][:10],
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("snippet_id", type=int)
|
|
def edit(snippet_id: int):
|
|
"""Edit a snippet in your default editor."""
|
|
snippet = db.get_snippet(snippet_id)
|
|
if not snippet:
|
|
console.print(f"[red]Snippet {snippet_id} not found[/red]")
|
|
return
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=f".{snippet['language'] or 'txt'}", delete=False) as f:
|
|
f.write(f"# Title: {snippet['title']}\n")
|
|
f.write(f"# Description: {snippet['description']}\n")
|
|
f.write(f"# Language: {snippet['language']}\n")
|
|
f.write(f"# Tags: {snippet['tags']}\n")
|
|
f.write("\n")
|
|
f.write(snippet["code"])
|
|
temp_path = f.name
|
|
|
|
try:
|
|
click.edit(filename=temp_path)
|
|
with open(temp_path, "r") as f:
|
|
lines = f.readlines()
|
|
|
|
title = snippet["title"]
|
|
description = snippet["description"]
|
|
language = snippet["language"]
|
|
tags = json.loads(snippet["tags"]) if isinstance(snippet["tags"], str) else snippet.get("tags", [])
|
|
code_lines = []
|
|
in_code = False
|
|
|
|
for line in lines:
|
|
if line.startswith("# Title: "):
|
|
title = line[9:].strip()
|
|
elif line.startswith("# Description: "):
|
|
description = line[15:].strip()
|
|
elif line.startswith("# Language: "):
|
|
language = line[13:].strip()
|
|
elif line.startswith("# Tags: "):
|
|
tags_str = line[8:].strip()
|
|
if tags_str.startswith("["):
|
|
tags = json.loads(tags_str)
|
|
else:
|
|
tags = [t.strip() for t in tags_str.split(",")]
|
|
elif line.startswith("#"):
|
|
continue
|
|
else:
|
|
in_code = True
|
|
code_lines.append(line)
|
|
|
|
db.update_snippet(
|
|
snippet_id,
|
|
title=title,
|
|
description=description,
|
|
code="".join(code_lines),
|
|
language=language,
|
|
tags=tags,
|
|
)
|
|
console.print(f"[green]Snippet {snippet_id} updated[/green]")
|
|
finally:
|
|
os.unlink(temp_path)
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("snippet_id", type=int)
|
|
def delete(snippet_id: int):
|
|
"""Delete a snippet."""
|
|
snippet = db.get_snippet(snippet_id)
|
|
if not snippet:
|
|
console.print(f"[red]Snippet {snippet_id} not found[/red]")
|
|
return
|
|
|
|
if click.confirm(f"Delete snippet '{snippet['title']}'?"):
|
|
db.delete_snippet(snippet_id)
|
|
console.print(f"[green]Snippet {snippet_id} deleted[/green]")
|
|
|
|
|
|
@cli.command()
|
|
@click.argument("query")
|
|
@click.option("--limit", default=50, help="Maximum results")
|
|
@click.option("--language", default=None, help="Filter by language")
|
|
@click.option("--tag", default=None, help="Filter by tag")
|
|
def search(query: str, limit: int, language: str | None, tag: str | None):
|
|
"""Search snippets using full-text search."""
|
|
results = search_engine.search(query, limit=limit, language=language, tag=tag)
|
|
|
|
if not results:
|
|
console.print("[dim]No results found[/dim]")
|
|
return
|
|
|
|
table = Table(title=f"Search Results ({len(results)})")
|
|
table.add_column("ID", style="cyan")
|
|
table.add_column("Title", style="green")
|
|
table.add_column("Language", style="magenta")
|
|
table.add_column("Match Score", style="yellow")
|
|
|
|
for r in results:
|
|
table.add_row(
|
|
str(r["id"]),
|
|
r["title"],
|
|
r["language"] or "-",
|
|
f"{r.get('rank', 0):.2f}",
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
|
|
@cli.group()
|
|
def tag():
|
|
"""Manage tags."""
|
|
pass
|
|
|
|
|
|
@tag.command(name="add")
|
|
@click.argument("snippet_id", type=int)
|
|
@click.argument("tag_name")
|
|
def tag_add(snippet_id: int, tag_name: str):
|
|
"""Add a tag to a snippet."""
|
|
if db.add_tag(snippet_id, tag_name):
|
|
console.print(f"[green]Tag '{tag_name}' added to snippet {snippet_id}[/green]")
|
|
else:
|
|
console.print(f"[red]Snippet {snippet_id} not found[/red]")
|
|
|
|
|
|
@tag.command(name="remove")
|
|
@click.argument("snippet_id", type=int)
|
|
@click.argument("tag_name")
|
|
def tag_remove(snippet_id: int, tag_name: str):
|
|
"""Remove a tag from a snippet."""
|
|
if db.remove_tag(snippet_id, tag_name):
|
|
console.print(f"[green]Tag '{tag_name}' removed from snippet {snippet_id}[/green]")
|
|
else:
|
|
console.print(f"[red]Snippet {snippet_id} not found[/red]")
|
|
|
|
|
|
@tag.command(name="list")
|
|
def tag_list():
|
|
"""List all tags."""
|
|
tags = db.list_tags()
|
|
if not tags:
|
|
console.print("[dim]No tags found[/dim]")
|
|
return
|
|
console.print("[bold]Tags:[/bold]")
|
|
for t in tags:
|
|
console.print(f" [cyan]{t}[/cyan]")
|
|
|
|
|
|
@cli.group()
|
|
def collection():
|
|
"""Manage collections."""
|
|
pass
|
|
|
|
|
|
@collection.command(name="create")
|
|
@click.argument("name")
|
|
@click.option("--description", default="", help="Collection description")
|
|
def collection_create(name: str, description: str):
|
|
"""Create a new collection."""
|
|
collection_id = db.create_collection(name, description)
|
|
console.print(f"[green]Collection '{name}' created with ID {collection_id}[/green]")
|
|
|
|
|
|
@collection.command(name="list")
|
|
def collection_list():
|
|
"""List all collections."""
|
|
collections = db.list_collections()
|
|
if not collections:
|
|
console.print("[dim]No collections found[/dim]")
|
|
return
|
|
|
|
table = Table(title="Collections")
|
|
table.add_column("ID", style="cyan")
|
|
table.add_column("Name", style="green")
|
|
table.add_column("Description", style="dim")
|
|
table.add_column("Created", style="dim")
|
|
|
|
for c in collections:
|
|
table.add_row(
|
|
str(c["id"]),
|
|
c["name"],
|
|
c["description"] or "-",
|
|
c["created_at"][:10],
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
|
|
@collection.command(name="delete")
|
|
@click.argument("collection_id", type=int)
|
|
def collection_delete(collection_id: int):
|
|
"""Delete a collection."""
|
|
collection = db.get_collection(collection_id)
|
|
if not collection:
|
|
console.print(f"[red]Collection {collection_id} not found[/red]")
|
|
return
|
|
|
|
if click.confirm(f"Delete collection '{collection['name']}'?"):
|
|
db.delete_collection(collection_id)
|
|
console.print(f"[green]Collection {collection_id} deleted[/green]")
|
|
|
|
|
|
@collection.command(name="add")
|
|
@click.argument("collection_id", type=int)
|
|
@click.argument("snippet_id", type=int)
|
|
def collection_add(collection_id: int, snippet_id: int):
|
|
"""Add a snippet to a collection."""
|
|
if db.add_snippet_to_collection(snippet_id, collection_id):
|
|
console.print(f"[green]Snippet {snippet_id} added to collection {collection_id}[/green]")
|
|
else:
|
|
console.print("[red]Failed to add snippet to collection[/red]")
|
|
|
|
|
|
@collection.command(name="remove")
|
|
@click.argument("collection_id", type=int)
|
|
@click.argument("snippet_id", type=int)
|
|
def collection_remove(collection_id: int, snippet_id: int):
|
|
"""Remove a snippet from a collection."""
|
|
if db.remove_snippet_from_collection(snippet_id, collection_id):
|
|
console.print(f"[green]Snippet {snippet_id} removed from collection {collection_id}[/green]")
|
|
else:
|
|
console.print("[red]Failed to remove snippet from collection[/red]")
|
|
|
|
|
|
@cli.group()
|
|
def export():
|
|
"""Export snippets."""
|
|
pass
|
|
|
|
|
|
@export.command(name="all")
|
|
@click.option("--file", required=True, help="Output file path")
|
|
def export_all(file: str):
|
|
"""Export all snippets."""
|
|
snippets = db.export_all()
|
|
export_snippets(snippets, file)
|
|
console.print(f"[green]Exported {len(snippets)} snippets to {file}[/green]")
|
|
|
|
|
|
@export.command(name="collection")
|
|
@click.argument("collection_name")
|
|
@click.option("--file", required=True, help="Output file path")
|
|
def export_collection(collection_name: str, file: str):
|
|
"""Export a collection."""
|
|
collections = db.list_collections()
|
|
collection = next((c for c in collections if c["name"] == collection_name), None)
|
|
if not collection:
|
|
console.print(f"[red]Collection '{collection_name}' not found[/red]")
|
|
return
|
|
|
|
snippets = db.get_collection_snippets(collection["id"])
|
|
export_snippets(snippets, file)
|
|
console.print(f"[green]Exported {len(snippets)} snippets to {file}[/green]")
|
|
|
|
|
|
@export.command(name="snippet")
|
|
@click.argument("snippet_id", type=int)
|
|
@click.option("--file", required=True, help="Output file path")
|
|
def export_snippet(snippet_id: int, file: str):
|
|
"""Export a single snippet."""
|
|
snippet = db.get_snippet(snippet_id)
|
|
if not snippet:
|
|
console.print(f"[red]Snippet {snippet_id} not found[/red]")
|
|
return
|
|
|
|
export_snippets([snippet], file)
|
|
console.print(f"[green]Exported snippet {snippet_id} to {file}[/green]")
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--file", required=True, help="Input file path")
|
|
@click.option("--strategy", default="skip", type=click.Choice(["skip", "replace", "duplicate"]), help="Import strategy")
|
|
def import_cmd(file: str, strategy: str):
|
|
"""Import snippets from a JSON file."""
|
|
try:
|
|
imported, skipped = import_snippets(db, file, strategy)
|
|
console.print(f"[green]Imported {imported} snippets, skipped {skipped}[/green]")
|
|
except Exception as e:
|
|
console.print(f"[red]Import failed: {e}[/red]")
|
|
|
|
|
|
@cli.group()
|
|
def discover():
|
|
"""Discover peers on the network."""
|
|
pass
|
|
|
|
|
|
@discover.command(name="list")
|
|
def discover_list():
|
|
"""List discovered peers."""
|
|
from snip.sync.discovery import DiscoveryService
|
|
|
|
discovery = DiscoveryService()
|
|
peers = discovery.discover_peers(timeout=5.0)
|
|
|
|
if not peers:
|
|
console.print("[dim]No peers discovered[/dim]")
|
|
return
|
|
|
|
table = Table(title="Discovered Peers")
|
|
table.add_column("Peer ID", style="cyan")
|
|
table.add_column("Host", style="green")
|
|
table.add_column("Port", style="magenta")
|
|
|
|
for peer in peers:
|
|
table.add_row(peer["peer_id"], peer["host"], str(peer["port"]))
|
|
|
|
console.print(table)
|
|
|
|
|
|
@cli.command()
|
|
@click.option("--peer-id", required=True, help="Peer ID to sync with")
|
|
def sync(peer_id: str):
|
|
"""Sync snippets with a peer."""
|
|
from snip.sync.protocol import SyncProtocol
|
|
|
|
peers = db.list_peers()
|
|
peer = next((p for p in peers if p["peer_id"] == peer_id), None)
|
|
if not peer:
|
|
console.print(f"[red]Peer {peer_id} not found[/red]")
|
|
return
|
|
|
|
sync_proto = SyncProtocol(db)
|
|
try:
|
|
synced = sync_proto.sync_with_peer(peer["host"], peer["port"])
|
|
console.print(f"[green]Synced {synced} snippets with peer {peer_id}[/green]")
|
|
except Exception as e:
|
|
console.print(f"[red]Sync failed: {e}[/red]")
|
|
|
|
|
|
@cli.command()
|
|
def peers():
|
|
"""List known sync peers."""
|
|
peers = db.list_peers()
|
|
if not peers:
|
|
console.print("[dim]No known peers[/dim]")
|
|
return
|
|
|
|
table = Table(title="Known Peers")
|
|
table.add_column("Peer ID", style="cyan")
|
|
table.add_column("Host", style="green")
|
|
table.add_column("Port", style="magenta")
|
|
table.add_column("Last Seen", style="dim")
|
|
|
|
for p in peers:
|
|
table.add_row(p["peer_id"], p["host"], str(p["port"]), p["last_seen"][:10])
|
|
|
|
console.print(table)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cli()
|