fix: resolve CI test failures - API compatibility fixes
This commit is contained in:
@@ -1,5 +1,3 @@
|
||||
"""Click CLI commands for snippet manager."""
|
||||
|
||||
import os
|
||||
|
||||
import click
|
||||
@@ -50,9 +48,7 @@ def add(ctx, title, code, description, language, tags, encrypt):
|
||||
"""Add a new snippet."""
|
||||
db = ctx.obj["db"]
|
||||
crypto = ctx.obj["crypto"]
|
||||
|
||||
tag_list = [t.strip() for t in tags.split(",")] if tags else []
|
||||
|
||||
if encrypt:
|
||||
if not crypto.has_key():
|
||||
password = click.prompt("Set encryption password", hide_input=True, confirmation_prompt=True)
|
||||
@@ -61,7 +57,6 @@ def add(ctx, title, code, description, language, tags, encrypt):
|
||||
is_encrypted = True
|
||||
else:
|
||||
is_encrypted = False
|
||||
|
||||
snippet_id = db.create_snippet(
|
||||
title=title,
|
||||
code=code,
|
||||
@@ -83,12 +78,10 @@ def get(ctx, snippet_id, no_highlight, style, line_numbers):
|
||||
"""Get a snippet by ID."""
|
||||
db = ctx.obj["db"]
|
||||
crypto = ctx.obj["crypto"]
|
||||
|
||||
snippet = db.get_snippet(snippet_id)
|
||||
if not snippet:
|
||||
console.print(f"[red]Snippet {snippet_id} not found[/red]")
|
||||
return
|
||||
|
||||
code = snippet["code"]
|
||||
if snippet.get("is_encrypted"):
|
||||
try:
|
||||
@@ -96,13 +89,10 @@ def get(ctx, snippet_id, no_highlight, style, line_numbers):
|
||||
except Exception:
|
||||
console.print("[red]Failed to decrypt snippet[/red]")
|
||||
return
|
||||
|
||||
console.print(f"\n[bold]{snippet['title']}[/bold]")
|
||||
console.print(f"Language: {snippet['language']} | Tags: {', '.join(snippet.get('tags', []) or 'none')}")
|
||||
|
||||
console.print(f"Language: {snippet['language']} | Tags: {', '.join(snippet.get('tags', [])) or 'none'}")
|
||||
if snippet.get("description"):
|
||||
console.print(f"\n{snippet['description']}\n")
|
||||
|
||||
if not no_highlight:
|
||||
try:
|
||||
get_lexer_by_name(snippet["language"])
|
||||
@@ -111,7 +101,6 @@ def get(ctx, snippet_id, no_highlight, style, line_numbers):
|
||||
guess_lexer(code)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
syntax = Syntax(code, lexer=snippet["language"], theme=style, line_numbers=line_numbers)
|
||||
console.print(syntax)
|
||||
else:
|
||||
@@ -129,7 +118,6 @@ def get(ctx, snippet_id, no_highlight, style, line_numbers):
|
||||
def list(ctx, language, tag, collection, limit, offset, format):
|
||||
"""List snippets."""
|
||||
db = ctx.obj["db"]
|
||||
|
||||
collection_id = None
|
||||
if collection:
|
||||
collections = db.collection_list()
|
||||
@@ -137,7 +125,6 @@ def list(ctx, language, tag, collection, limit, offset, format):
|
||||
if c["name"] == collection:
|
||||
collection_id = c["id"]
|
||||
break
|
||||
|
||||
snippets = db.list_snippets(
|
||||
language=language,
|
||||
tag=tag,
|
||||
@@ -145,11 +132,9 @@ def list(ctx, language, tag, collection, limit, offset, format):
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
if not snippets:
|
||||
console.print("[dim]No snippets found[/dim]")
|
||||
return
|
||||
|
||||
if format == "table":
|
||||
table = Table(show_header=True)
|
||||
table.add_column("ID", style="cyan")
|
||||
@@ -157,14 +142,12 @@ def list(ctx, language, tag, collection, limit, offset, format):
|
||||
table.add_column("Language", style="green")
|
||||
table.add_column("Tags")
|
||||
table.add_column("Updated")
|
||||
|
||||
for s in snippets:
|
||||
tags_str = ", ".join(s.get("tags", [])[:3])
|
||||
if len(s.get("tags", [])) > 3:
|
||||
tags_str += "..."
|
||||
updated = s["updated_at"][:10]
|
||||
table.add_row(str(s["id"]), s["title"], s["language"], tags_str, updated)
|
||||
|
||||
console.print(table)
|
||||
else:
|
||||
for s in snippets:
|
||||
@@ -183,18 +166,14 @@ def list(ctx, language, tag, collection, limit, offset, format):
|
||||
def edit(ctx, snippet_id, title, code, description, language, tags):
|
||||
"""Edit a snippet."""
|
||||
db = ctx.obj["db"]
|
||||
|
||||
snippet = db.get_snippet(snippet_id)
|
||||
if not snippet:
|
||||
console.print(f"[red]Snippet {snippet_id} not found[/red]")
|
||||
return
|
||||
|
||||
tag_list = [t.strip() for t in tags.split(",")] if tags else None
|
||||
|
||||
if code and snippet.get("is_encrypted"):
|
||||
crypto = ctx.obj["crypto"]
|
||||
code = crypto.encrypt(code)
|
||||
|
||||
db.update_snippet(
|
||||
snippet_id,
|
||||
title=title,
|
||||
@@ -213,11 +192,9 @@ def edit(ctx, snippet_id, title, code, description, language, tags):
|
||||
def delete(ctx, snippet_id, force):
|
||||
"""Delete a snippet."""
|
||||
db = ctx.obj["db"]
|
||||
|
||||
if not force:
|
||||
if not click.confirm(f"Delete snippet {snippet_id}?"):
|
||||
return
|
||||
|
||||
if db.delete_snippet(snippet_id):
|
||||
console.print(f"[green]Snippet {snippet_id} deleted[/green]")
|
||||
else:
|
||||
@@ -234,7 +211,6 @@ def delete(ctx, snippet_id, force):
|
||||
def search(ctx, query, language, tag, limit, offset):
|
||||
"""Search snippets using full-text search."""
|
||||
search_engine = ctx.obj["search"]
|
||||
|
||||
results = search_engine.search(
|
||||
query=query,
|
||||
language=language,
|
||||
@@ -242,21 +218,17 @@ def search(ctx, query, language, tag, limit, offset):
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
if not results:
|
||||
console.print("[dim]No results found[/dim]")
|
||||
return
|
||||
|
||||
table = Table(show_header=True)
|
||||
table.add_column("ID", style="cyan")
|
||||
table.add_column("Title")
|
||||
table.add_column("Language", style="green")
|
||||
table.add_column("Match")
|
||||
|
||||
for s in results:
|
||||
match_info = s.get("description", "")[:50] if s.get("description") else ""
|
||||
table.add_row(str(s["id"]), s["title"], s["language"], match_info)
|
||||
|
||||
console.print(table)
|
||||
console.print(f"\n[dim]Found {len(results)} results[/dim]")
|
||||
|
||||
@@ -324,20 +296,16 @@ def collection_list(ctx):
|
||||
"""List all collections."""
|
||||
db = ctx.obj["db"]
|
||||
collections = db.collection_list()
|
||||
|
||||
if not collections:
|
||||
console.print("[dim]No collections found[/dim]")
|
||||
return
|
||||
|
||||
table = Table(show_header=True)
|
||||
table.add_column("ID", style="cyan")
|
||||
table.add_column("Name")
|
||||
table.add_column("Description")
|
||||
table.add_column("Snippets")
|
||||
|
||||
for c in collections:
|
||||
table.add_row(str(c["id"]), c["name"], c.get("description", ""), str(c.get("snippet_count", 0)))
|
||||
|
||||
console.print(table)
|
||||
|
||||
|
||||
@@ -348,11 +316,9 @@ def collection_list(ctx):
|
||||
def collection_delete(ctx, collection_id, force):
|
||||
"""Delete a collection."""
|
||||
db = ctx.obj["db"]
|
||||
|
||||
if not force:
|
||||
if not click.confirm(f"Delete collection {collection_id}?"):
|
||||
return
|
||||
|
||||
if db.collection_delete(collection_id):
|
||||
console.print(f"[green]Collection {collection_id} deleted[/green]")
|
||||
else:
|
||||
@@ -406,18 +372,15 @@ def export_collection(ctx, collection_name, file):
|
||||
"""Export a collection to JSON."""
|
||||
db = ctx.obj["db"]
|
||||
export_handler = ctx.obj["export"]
|
||||
|
||||
collections = db.collection_list()
|
||||
collection_id = None
|
||||
for c in collections:
|
||||
if c["name"] == collection_name:
|
||||
collection_id = c["id"]
|
||||
break
|
||||
|
||||
if not collection_id:
|
||||
console.print(f"[red]Collection '{collection_name}' not found[/red]")
|
||||
return
|
||||
|
||||
data = export_handler.export_collection(collection_id)
|
||||
export_handler.write_export(file, data)
|
||||
console.print(f"[green]Exported collection '{collection_name}' to {file}[/green]")
|
||||
@@ -440,13 +403,7 @@ def export_snippet(ctx, snippet_id, file):
|
||||
|
||||
@cli.command(name='import')
|
||||
@click.option("--file", "-f", required=True, help="Input file path")
|
||||
@click.option(
|
||||
"--strategy",
|
||||
"-s",
|
||||
type=click.Choice(["skip", "replace", "duplicate"]),
|
||||
default="skip",
|
||||
help="Conflict resolution strategy",
|
||||
)
|
||||
@click.option("--strategy", "-s", type=click.Choice(["skip", "replace", "duplicate"]), default="skip", help="Conflict resolution strategy")
|
||||
@click.pass_context
|
||||
def import_cmd(ctx, file, strategy):
|
||||
"""Import snippets from JSON file."""
|
||||
@@ -471,21 +428,17 @@ def discover_list(ctx, timeout):
|
||||
"""List discovered peers on the network."""
|
||||
discovery = DiscoveryService()
|
||||
peers = discovery.discover_peers(timeout)
|
||||
|
||||
if not peers:
|
||||
console.print("[dim]No peers discovered[/dim]")
|
||||
return
|
||||
|
||||
table = Table(show_header=True)
|
||||
table.add_column("Peer ID", style="cyan")
|
||||
table.add_column("Name")
|
||||
table.add_column("Address")
|
||||
table.add_column("Port")
|
||||
|
||||
for p in peers:
|
||||
addr = ", ".join(p.get("addresses", [])) or "unknown"
|
||||
table.add_row(p["peer_id"], p.get("peer_name", ""), addr, str(p.get("port", "?")))
|
||||
|
||||
console.print(table)
|
||||
|
||||
|
||||
@@ -497,16 +450,12 @@ def sync(ctx, peer_id, timeout):
|
||||
"""Sync snippets with a peer."""
|
||||
discovery = DiscoveryService()
|
||||
protocol = SyncProtocol()
|
||||
|
||||
protocol.start_server()
|
||||
|
||||
try:
|
||||
peers = discovery.discover_peers(5.0)
|
||||
|
||||
if not peers:
|
||||
console.print("[yellow]No peers discovered[/yellow]")
|
||||
return
|
||||
|
||||
target_peer = None
|
||||
if peer_id:
|
||||
for p in peers:
|
||||
@@ -529,15 +478,12 @@ def sync(ctx, peer_id, timeout):
|
||||
else:
|
||||
console.print("[red]Invalid selection[/red]")
|
||||
return
|
||||
|
||||
console.print(f"[cyan]Syncing with {target_peer['peer_name']}...[/cyan]")
|
||||
result = protocol.sync_with_peer(target_peer)
|
||||
|
||||
if result["status"] == "success":
|
||||
console.print(f"[green]Sync complete! Merged: {result['merged']}, Pushed: {result['pushed']}[/green]")
|
||||
else:
|
||||
console.print(f"[red]Sync failed: {result.get('message', 'Unknown error')}[/red]")
|
||||
|
||||
finally:
|
||||
protocol.stop_server()
|
||||
|
||||
@@ -548,23 +494,19 @@ def peers(ctx):
|
||||
"""List known sync peers."""
|
||||
db = ctx.obj["db"]
|
||||
peer_list = db.list_sync_peers()
|
||||
|
||||
if not peer_list:
|
||||
console.print("[dim]No known peers[/dim]")
|
||||
return
|
||||
|
||||
table = Table(show_header=True)
|
||||
table.add_column("Peer ID", style="cyan")
|
||||
table.add_column("Name")
|
||||
table.add_column("Last Sync")
|
||||
table.add_column("Address")
|
||||
|
||||
for p in peer_list:
|
||||
last_sync = p.get("last_sync", "never")
|
||||
if last_sync:
|
||||
last_sync = last_sync[:19]
|
||||
table.add_row(p["peer_id"], p.get("peer_name", ""), last_sync, p.get("peer_address", ""))
|
||||
|
||||
console.print(table)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user