fix: resolve CI test failures - API compatibility fixes

This commit is contained in:
2026-03-22 12:11:46 +00:00
parent 18ffa0793e
commit 19db98bd22

View File

@@ -1,68 +1,57 @@
"""FTS5 search engine for snippets."""
import json
import re import re
from typing import Any from typing import Any
from snip.db.database import Database, get_database from ..db import get_database
class SearchEngine: class SearchEngine:
def __init__(self, db: Database | str | None = None): def __init__(self, db_path: str | None = None):
if isinstance(db, str) or db is None: self.db = get_database(db_path)
self.db = get_database(db)
else:
self.db = db
def search( def search(self, query: str, language: str | None = None, tag: str | None = None,
self, limit: int = 50, offset: int = 0) -> list[dict[str, Any]]:
query: str, if not query or not query.strip() or query == "*":
limit: int = 50, return self.db.list_snippets(language=language, tag=tag, limit=limit, offset=offset)
language: str | None = None, results = self.db.search_snippets(query, limit=limit + offset)
tag: str | None = None, if language or tag:
) -> list[dict[str, Any]]: filtered = []
"""Search snippets using FTS5.""" for snippet in results:
return self.db.search_snippets(query, limit=limit, language=language, tag=tag) if language and snippet.get("language", "").lower() != language.lower():
continue
if tag and tag.lower() not in [t.lower() for t in snippet.get("tags", [])]:
continue
filtered.append(snippet)
results = filtered
return results[offset : offset + limit]
def highlight(self, text: str, query: str) -> str: def highlight_matches(self, text: str, query: str) -> str:
"""Add highlighting markers around matched terms.""" if not query:
terms = re.split(r'\s+', query) return text
result = text terms = query.split()
for term in terms: pattern = "|".join(re.escape(term) for term in terms if term not in ("AND", "OR", "NOT"))
if term: if not pattern:
result = re.sub(f'({re.escape(term)})', r'**\1**', result, flags=re.IGNORECASE) return text
return result regex = re.compile(f"({pattern})", re.IGNORECASE)
return regex.sub(r"**\1**", text)
def suggest(self, prefix: str, limit: int = 10) -> list[str]: def suggest_completions(self, prefix: str, limit: int = 10) -> list[str]:
"""Suggest completions for a prefix.""" all_tags = self.db.list_tags()
snippets = self.db.list_snippets(limit=100) prefix_lower = prefix.lower()
suggestions = set() return [tag for tag in all_tags if tag.startswith(prefix_lower)][:limit]
for s in snippets:
title = s.get("title", "")
if title.lower().startswith(prefix.lower()):
suggestions.add(title)
tags = s.get("tags", [])
if isinstance(tags, str):
tags = json.loads(tags)
for tag in tags:
if tag.lower().startswith(prefix.lower()):
suggestions.add(tag)
return sorted(list(suggestions))[:limit]
def parse_query(self, query: str) -> dict[str, Any]: def parse_query(self, query: str) -> dict[str, Any]:
"""Parse a search query into components.""" tokens = query.split()
result = { terms = []
"terms": [], operators = []
"language": None, current_operator = "AND"
"tag": None, for token in tokens:
} if token.upper() in ("AND", "OR"):
language_match = re.search(r'language:(\w+)', query) operators.append(token.upper())
if language_match: elif token.upper() == "NOT":
result["language"] = language_match.group(1) operators.append("NOT")
query = re.sub(r'language:\w+', '', query) else:
tag_match = re.search(r'tag:(\w+)', query) terms.append({"term": token.strip('"'), "operator": current_operator})
if tag_match: if operators and operators[-1] == "NOT":
result["tag"] = tag_match.group(1) operators.pop()
query = re.sub(r'tag:\w+', '', query) current_operator = "AND" if not operators else operators[-1]
result["terms"] = query.split() return {"terms": terms, "raw": query}
return result