Initial upload: snippet-manager with CI/CD workflow
Some checks failed
CI / test (push) Has been cancelled
CI / lint (push) Has been cancelled

This commit is contained in:
2026-03-22 11:22:26 +00:00
parent 0a56b95b73
commit a156cb3bbc

479
snip/cli/commands.py Normal file
View File

@@ -0,0 +1,479 @@
"""Click CLI commands for snippet management."""
import json
import os
import sys
import tempfile
from pathlib import Path
from typing import Any
import click
from rich.console import Console
from rich.syntax import Syntax
from rich.table import Table
from snip.crypto.service import CryptoService
from snip.db.database import Database
from snip.export.handlers import export_snippets, import_snippets
from snip.search.engine import SearchEngine
console = Console()
db = Database()
crypto_service = CryptoService()
search_engine = SearchEngine(db)
@click.group()
@click.version_option(version="0.1.0")
def cli():
"""Snip - Local-First Code Snippet Manager."""
pass
@cli.command()
def init():
"""Initialize the snippet database."""
db.init_db()
console.print("[green]Database initialized successfully![/green]")
@cli.command()
@click.option("--title", prompt="Title", help="Snippet title")
@click.option("--code", prompt="Code", help="Snippet code")
@click.option("--description", default="", help="Snippet description")
@click.option("--language", default="", help="Programming language")
@click.option("--tag", multiple=True, help="Tags to add")
@click.option("--encrypt", is_flag=True, help="Encrypt the snippet")
def add(title: str, code: str, description: str, language: str, tag: tuple, encrypt: bool):
"""Add a new snippet."""
tags = list(tag)
is_encrypted = False
if encrypt:
password = click.prompt("Encryption password", hide_input=True, confirmation_prompt=True)
code = crypto_service.encrypt(code, password)
is_encrypted = True
snippet_id = db.add_snippet(
title=title,
code=code,
description=description,
language=language,
tags=tags,
is_encrypted=is_encrypted,
)
console.print(f"[green]Snippet added with ID {snippet_id}[/green]")
@cli.command()
@click.argument("snippet_id", type=int)
@click.option("--decrypt", help="Decryption password", default=None, hide_input=True)
def get(snippet_id: int, decrypt: str | None):
"""Get a snippet by ID."""
snippet = db.get_snippet(snippet_id)
if not snippet:
console.print(f"[red]Snippet {snippet_id} not found[/red]")
return
code = snippet["code"]
if snippet["is_encrypted"]:
if not decrypt:
decrypt = click.prompt("Decryption password", hide_input=True)
try:
code = crypto_service.decrypt(code, decrypt)
except Exception as e:
console.print(f"[red]Decryption failed: {e}[/red]")
return
language = snippet["language"] or "text"
syntax = Syntax(code, language, theme="monokai", line_numbers=True)
console.print(f"\n[bold]{snippet['title']}[/bold]")
if snippet["description"]:
console.print(f"[dim]{snippet['description']}[/dim]")
console.print(f"[dim]Language: {language} | Tags: {snippet['tags']}[/dim]\n")
console.print(syntax)
@cli.command()
@click.option("--limit", default=50, help="Maximum number of snippets")
@click.option("--offset", default=0, help="Offset for pagination")
@click.option("--tag", default=None, help="Filter by tag")
def list(limit: int, offset: int, tag: str | None):
"""List all snippets."""
snippets = db.list_snippets(limit=limit, offset=offset, tag=tag)
if not snippets:
console.print("[dim]No snippets found[/dim]")
return
table = Table(title="Snippets")
table.add_column("ID", style="cyan")
table.add_column("Title", style="green")
table.add_column("Language", style="magenta")
table.add_column("Tags", style="yellow")
table.add_column("Updated", style="dim")
for s in snippets:
tags_str = json.loads(s.get("tags", "[]")) if isinstance(s.get("tags"), str) else s.get("tags", [])
table.add_row(
str(s["id"]),
s["title"],
s["language"] or "-",
", ".join(tags_str) if tags_str else "-",
s["updated_at"][:10],
)
console.print(table)
@cli.command()
@click.argument("snippet_id", type=int)
def edit(snippet_id: int):
"""Edit a snippet in your default editor."""
snippet = db.get_snippet(snippet_id)
if not snippet:
console.print(f"[red]Snippet {snippet_id} not found[/red]")
return
with tempfile.NamedTemporaryFile(mode="w", suffix=f".{snippet['language'] or 'txt'}", delete=False) as f:
f.write(f"# Title: {snippet['title']}\n")
f.write(f"# Description: {snippet['description']}\n")
f.write(f"# Language: {snippet['language']}\n")
f.write(f"# Tags: {snippet['tags']}\n")
f.write("\n")
f.write(snippet["code"])
temp_path = f.name
try:
click.edit(filename=temp_path)
with open(temp_path, "r") as f:
lines = f.readlines()
title = snippet["title"]
description = snippet["description"]
language = snippet["language"]
tags = json.loads(snippet["tags"]) if isinstance(snippet["tags"], str) else snippet.get("tags", [])
code_lines = []
in_code = False
for line in lines:
if line.startswith("# Title: "):
title = line[9:].strip()
elif line.startswith("# Description: "):
description = line[15:].strip()
elif line.startswith("# Language: "):
language = line[13:].strip()
elif line.startswith("# Tags: "):
tags_str = line[8:].strip()
if tags_str.startswith("["):
tags = json.loads(tags_str)
else:
tags = [t.strip() for t in tags_str.split(",")]
elif line.startswith("#"):
continue
else:
in_code = True
code_lines.append(line)
db.update_snippet(
snippet_id,
title=title,
description=description,
code="".join(code_lines),
language=language,
tags=tags,
)
console.print(f"[green]Snippet {snippet_id} updated[/green]")
finally:
os.unlink(temp_path)
@cli.command()
@click.argument("snippet_id", type=int)
def delete(snippet_id: int):
"""Delete a snippet."""
snippet = db.get_snippet(snippet_id)
if not snippet:
console.print(f"[red]Snippet {snippet_id} not found[/red]")
return
if click.confirm(f"Delete snippet '{snippet['title']}'?"):
db.delete_snippet(snippet_id)
console.print(f"[green]Snippet {snippet_id} deleted[/green]")
@cli.command()
@click.argument("query")
@click.option("--limit", default=50, help="Maximum results")
@click.option("--language", default=None, help="Filter by language")
@click.option("--tag", default=None, help="Filter by tag")
def search(query: str, limit: int, language: str | None, tag: str | None):
"""Search snippets using full-text search."""
results = search_engine.search(query, limit=limit, language=language, tag=tag)
if not results:
console.print("[dim]No results found[/dim]")
return
table = Table(title=f"Search Results ({len(results)})")
table.add_column("ID", style="cyan")
table.add_column("Title", style="green")
table.add_column("Language", style="magenta")
table.add_column("Match Score", style="yellow")
for r in results:
table.add_row(
str(r["id"]),
r["title"],
r["language"] or "-",
f"{r.get('rank', 0):.2f}",
)
console.print(table)
@cli.group()
def tag():
"""Manage tags."""
pass
@tag.command(name="add")
@click.argument("snippet_id", type=int)
@click.argument("tag_name")
def tag_add(snippet_id: int, tag_name: str):
"""Add a tag to a snippet."""
if db.add_tag(snippet_id, tag_name):
console.print(f"[green]Tag '{tag_name}' added to snippet {snippet_id}[/green]")
else:
console.print(f"[red]Snippet {snippet_id} not found[/red]")
@tag.command(name="remove")
@click.argument("snippet_id", type=int)
@click.argument("tag_name")
def tag_remove(snippet_id: int, tag_name: str):
"""Remove a tag from a snippet."""
if db.remove_tag(snippet_id, tag_name):
console.print(f"[green]Tag '{tag_name}' removed from snippet {snippet_id}[/green]")
else:
console.print(f"[red]Snippet {snippet_id} not found[/red]")
@tag.command(name="list")
def tag_list():
"""List all tags."""
tags = db.list_tags()
if not tags:
console.print("[dim]No tags found[/dim]")
return
console.print("[bold]Tags:[/bold]")
for t in tags:
console.print(f" [cyan]{t}[/cyan]")
@cli.group()
def collection():
"""Manage collections."""
pass
@collection.command(name="create")
@click.argument("name")
@click.option("--description", default="", help="Collection description")
def collection_create(name: str, description: str):
"""Create a new collection."""
collection_id = db.create_collection(name, description)
console.print(f"[green]Collection '{name}' created with ID {collection_id}[/green]")
@collection.command(name="list")
def collection_list():
"""List all collections."""
collections = db.list_collections()
if not collections:
console.print("[dim]No collections found[/dim]")
return
table = Table(title="Collections")
table.add_column("ID", style="cyan")
table.add_column("Name", style="green")
table.add_column("Description", style="dim")
table.add_column("Created", style="dim")
for c in collections:
table.add_row(
str(c["id"]),
c["name"],
c["description"] or "-",
c["created_at"][:10],
)
console.print(table)
@collection.command(name="delete")
@click.argument("collection_id", type=int)
def collection_delete(collection_id: int):
"""Delete a collection."""
collection = db.get_collection(collection_id)
if not collection:
console.print(f"[red]Collection {collection_id} not found[/red]")
return
if click.confirm(f"Delete collection '{collection['name']}'?"):
db.delete_collection(collection_id)
console.print(f"[green]Collection {collection_id} deleted[/green]")
@collection.command(name="add")
@click.argument("collection_id", type=int)
@click.argument("snippet_id", type=int)
def collection_add(collection_id: int, snippet_id: int):
"""Add a snippet to a collection."""
if db.add_snippet_to_collection(snippet_id, collection_id):
console.print(f"[green]Snippet {snippet_id} added to collection {collection_id}[/green]")
else:
console.print("[red]Failed to add snippet to collection[/red]")
@collection.command(name="remove")
@click.argument("collection_id", type=int)
@click.argument("snippet_id", type=int)
def collection_remove(collection_id: int, snippet_id: int):
"""Remove a snippet from a collection."""
if db.remove_snippet_from_collection(snippet_id, collection_id):
console.print(f"[green]Snippet {snippet_id} removed from collection {collection_id}[/green]")
else:
console.print("[red]Failed to remove snippet from collection[/red]")
@cli.group()
def export():
"""Export snippets."""
pass
@export.command(name="all")
@click.option("--file", required=True, help="Output file path")
def export_all(file: str):
"""Export all snippets."""
snippets = db.export_all()
export_snippets(snippets, file)
console.print(f"[green]Exported {len(snippets)} snippets to {file}[/green]")
@export.command(name="collection")
@click.argument("collection_name")
@click.option("--file", required=True, help="Output file path")
def export_collection(collection_name: str, file: str):
"""Export a collection."""
collections = db.list_collections()
collection = next((c for c in collections if c["name"] == collection_name), None)
if not collection:
console.print(f"[red]Collection '{collection_name}' not found[/red]")
return
snippets = db.get_collection_snippets(collection["id"])
export_snippets(snippets, file)
console.print(f"[green]Exported {len(snippets)} snippets to {file}[/green]")
@export.command(name="snippet")
@click.argument("snippet_id", type=int)
@click.option("--file", required=True, help="Output file path")
def export_snippet(snippet_id: int, file: str):
"""Export a single snippet."""
snippet = db.get_snippet(snippet_id)
if not snippet:
console.print(f"[red]Snippet {snippet_id} not found[/red]")
return
export_snippets([snippet], file)
console.print(f"[green]Exported snippet {snippet_id} to {file}[/green]")
@cli.command()
@click.option("--file", required=True, help="Input file path")
@click.option("--strategy", default="skip", type=click.Choice(["skip", "replace", "duplicate"]), help="Import strategy")
def import_cmd(file: str, strategy: str):
"""Import snippets from a JSON file."""
try:
imported, skipped = import_snippets(db, file, strategy)
console.print(f"[green]Imported {imported} snippets, skipped {skipped}[/green]")
except Exception as e:
console.print(f"[red]Import failed: {e}[/red]")
@cli.group()
def discover():
"""Discover peers on the network."""
pass
@discover.command(name="list")
def discover_list():
"""List discovered peers."""
from snip.sync.discovery import DiscoveryService
discovery = DiscoveryService()
peers = discovery.discover_peers(timeout=5.0)
if not peers:
console.print("[dim]No peers discovered[/dim]")
return
table = Table(title="Discovered Peers")
table.add_column("Peer ID", style="cyan")
table.add_column("Host", style="green")
table.add_column("Port", style="magenta")
for peer in peers:
table.add_row(peer["peer_id"], peer["host"], str(peer["port"]))
console.print(table)
@cli.command()
@click.option("--peer-id", required=True, help="Peer ID to sync with")
def sync(peer_id: str):
"""Sync snippets with a peer."""
from snip.sync.protocol import SyncProtocol
peers = db.list_peers()
peer = next((p for p in peers if p["peer_id"] == peer_id), None)
if not peer:
console.print(f"[red]Peer {peer_id} not found[/red]")
return
sync_proto = SyncProtocol(db)
try:
synced = sync_proto.sync_with_peer(peer["host"], peer["port"])
console.print(f"[green]Synced {synced} snippets with peer {peer_id}[/green]")
except Exception as e:
console.print(f"[red]Sync failed: {e}[/red]")
@cli.command()
def peers():
"""List known sync peers."""
peers = db.list_peers()
if not peers:
console.print("[dim]No known peers[/dim]")
return
table = Table(title="Known Peers")
table.add_column("Peer ID", style="cyan")
table.add_column("Host", style="green")
table.add_column("Port", style="magenta")
table.add_column("Last Seen", style="dim")
for p in peers:
table.add_row(p["peer_id"], p["host"], str(p["port"]), p["last_seen"][:10])
console.print(table)
if __name__ == "__main__":
cli()