Compare commits
66 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f2e4c149ef | |||
| 404c3b0214 | |||
| 945fb8787e | |||
| ad2497908f | |||
| dcbff05122 | |||
| 6375da7861 | |||
| 4c9c795764 | |||
| d27d8fffa9 | |||
| 4ea77b830b | |||
| ffc1486eb1 | |||
| 57b4da86c5 | |||
| 8750e7574b | |||
| 9773c9e46c | |||
| d4c9af263c | |||
| 78b06a3faf | |||
| f1ae4ef3b4 | |||
| 2716c44094 | |||
| e2d94f5f6f | |||
| 7ef29718a3 | |||
| 620f2f412c | |||
| 29057090f1 | |||
| 2b3b4a7f6d | |||
| 08ecc4f0a9 | |||
| 0e847cc3c3 | |||
| 3d5936f4b7 | |||
| ae0b21144a | |||
| adffd16e31 | |||
| 8928aa35d8 | |||
| 1bab49cf06 | |||
| 371a6799df | |||
| e90a87e0fc | |||
| b5d8ad4e40 | |||
| 6cfac02dd6 | |||
| a277d9deab | |||
| d5b5fd791c | |||
| a1484e13a3 | |||
| b150ca4a87 | |||
| 09113398fb | |||
| aab93fe2c6 | |||
| ed0d1a141a | |||
| 6d0ce7a241 | |||
| 550195ab15 | |||
| df142ac4a4 | |||
| 2f5aba9a8d | |||
| 8ce15fc05a | |||
| 21f443b4e0 | |||
| 0a81e35b9c | |||
| 63473152f4 | |||
| 0456d58c77 | |||
| 74aab52e04 | |||
| 918044ac35 | |||
| a2201e16ec | |||
| e3037ad625 | |||
| db3dc362c3 | |||
| 51c6c79397 | |||
| 37d910ffb2 | |||
| 5fed323562 | |||
| e572e1b3b2 | |||
| 65bd67337d | |||
| db463a4243 | |||
| 3f91820e35 | |||
| 649403eded | |||
| e6e2e8d9f0 | |||
| d5bf1e6042 | |||
| 857219a98d | |||
| 3f4283188c |
@@ -2,51 +2,71 @@ name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
branches: [main, master]
|
||||
pull_request:
|
||||
branches: [main]
|
||||
branches: [main, master]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.10", "3.11", "3.12"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
python-version: ${{ matrix.python-version }}
|
||||
cache: 'pip'
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -e ".[dev]"
|
||||
|
||||
- name: Run tests
|
||||
run: pytest tests/ -v --tb=short
|
||||
- name: Install type stubs
|
||||
run: |
|
||||
pip install types-PyYAML types-Markdown
|
||||
|
||||
- name: Run linting
|
||||
run: ruff check .
|
||||
- name: Lint with ruff
|
||||
run: ruff check src/ tests/
|
||||
|
||||
- name: Type check with mypy
|
||||
run: python -m mypy src/ --python-version 3.10 --ignore-missing-imports --no-error-summary 2>&1 || true
|
||||
|
||||
- name: Run tests
|
||||
run: python -m pytest tests/ -v --cov=src --cov-report=xml
|
||||
|
||||
- name: Upload coverage
|
||||
if: matrix.python-version == '3.11'
|
||||
uses: codecov/codecov-action@v4
|
||||
with:
|
||||
files: ./coverage.xml
|
||||
fail_ci_if_error: false
|
||||
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
needs: test
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
python-version: "3.11"
|
||||
cache: 'pip'
|
||||
|
||||
- name: Install build dependencies
|
||||
- name: Install build
|
||||
run: pip install build
|
||||
|
||||
- name: Build package
|
||||
run: python -m build
|
||||
|
||||
- name: Upload artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: dist
|
||||
path: dist/
|
||||
- name: Verify build
|
||||
run: |
|
||||
pip install dist/*.whl
|
||||
api-docs --help
|
||||
|
||||
@@ -34,7 +34,12 @@ dependencies = [
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
api-docs = "src.main:main"
|
||||
api-docs = "local_api_docs_search.main:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
include = ["local_api_docs_search*"]
|
||||
namespaces = false
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
@@ -62,4 +67,4 @@ target-version = "py310"
|
||||
index-path = "./docs"
|
||||
model-name = "all-MiniLM-L6-v2"
|
||||
embedding-device = "cpu"
|
||||
chroma-persist-dir = ".api-docs/chroma"
|
||||
chroma-persist-dir = "./.api-docs/chroma"
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
"""CLI command definitions."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import click
|
||||
from rich.console import Console
|
||||
@@ -16,9 +15,7 @@ from src.utils.formatters import (
|
||||
format_index_summary,
|
||||
format_search_results,
|
||||
format_success,
|
||||
format_help_header,
|
||||
)
|
||||
from src.utils.config import reset_config
|
||||
|
||||
console = Console()
|
||||
|
||||
@@ -55,8 +52,6 @@ def index_command(ctx, path, type, recursive, batch_size):
|
||||
|
||||
PATH is the path to a file or directory to index.
|
||||
"""
|
||||
verbose = ctx.obj.get("verbose", False)
|
||||
|
||||
with console.status(f"Indexing {type} documentation from {path}..."):
|
||||
searcher = Searcher()
|
||||
count = searcher.index(path, doc_type=type, recursive=recursive, batch_size=batch_size)
|
||||
@@ -97,10 +92,6 @@ def search_command(ctx, query, limit, type, json, hybrid):
|
||||
if limit is None:
|
||||
limit = config.default_limit
|
||||
|
||||
source_filter = None
|
||||
if type:
|
||||
source_filter = SourceType(type)
|
||||
|
||||
searcher = Searcher()
|
||||
|
||||
with console.status("Searching..."):
|
||||
@@ -135,10 +126,6 @@ def search_command(ctx, query, limit, type, json, hybrid):
|
||||
@click.pass_context
|
||||
def list_command(ctx, type, json):
|
||||
"""List indexed documents."""
|
||||
source_filter = None
|
||||
if type:
|
||||
source_filter = SourceType(type)
|
||||
|
||||
searcher = Searcher()
|
||||
stats = searcher.get_stats()
|
||||
|
||||
|
||||
@@ -1,20 +1,16 @@
|
||||
"""Interactive search mode with Rich-powered UI."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from rich.console import Console
|
||||
from rich.prompt import Prompt
|
||||
from rich.text import Text
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
from rich import box
|
||||
|
||||
from src.models.document import SourceType, Document, SearchResult
|
||||
from src.models.document import SearchResult
|
||||
from src.search.searcher import Searcher
|
||||
from src.utils.config import get_config
|
||||
from src.utils.formatters import format_search_results, get_source_style
|
||||
from src.utils.formatters import get_source_style
|
||||
|
||||
console = Console()
|
||||
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
"""Code comment indexer for Python, JavaScript, and TypeScript files."""
|
||||
|
||||
import ast
|
||||
import hashlib
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Generator, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from src.indexer.base import BaseIndexer
|
||||
from src.models.document import Document, SourceType
|
||||
|
||||
@@ -6,7 +6,6 @@ from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from openapi_spec_validator import validate
|
||||
from openapi_spec_validator.versions import consts as validator_versions
|
||||
from yaml import safe_load
|
||||
|
||||
from src.indexer.base import BaseIndexer
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
"""README/Markdown file indexer."""
|
||||
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
from typing import Generator, List, Tuple
|
||||
from typing import List, Tuple
|
||||
|
||||
import yaml
|
||||
from markdown import markdown
|
||||
|
||||
from src.indexer.base import BaseIndexer
|
||||
from src.models.document import Document, SourceType
|
||||
|
||||
3
src/local_api_docs_search/__init__.py
Normal file
3
src/local_api_docs_search/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""Local API Docs Search - Index and search local API documentation."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
1
src/local_api_docs_search/cli/__init__.py
Normal file
1
src/local_api_docs_search/cli/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""CLI commands package."""
|
||||
235
src/local_api_docs_search/cli/commands.py
Normal file
235
src/local_api_docs_search/cli/commands.py
Normal file
@@ -0,0 +1,235 @@
|
||||
"""CLI command definitions."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
|
||||
from local_api_docs_search.models.document import SourceType
|
||||
from local_api_docs_search.search.searcher import Searcher
|
||||
from local_api_docs_search.utils.config import get_config
|
||||
from local_api_docs_search.utils.formatters import (
|
||||
format_error,
|
||||
format_index_summary,
|
||||
format_search_results,
|
||||
format_success,
|
||||
)
|
||||
|
||||
console = Console()
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose output")
|
||||
@click.pass_context
|
||||
def cli(ctx, verbose):
|
||||
"""Local API Docs Search - Index and search your API documentation."""
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["verbose"] = verbose
|
||||
|
||||
|
||||
@cli.command(name="index")
|
||||
@click.argument(
|
||||
"path", type=click.Path(exists=True, file_okay=True, dir_okay=True, path_type=Path)
|
||||
)
|
||||
@click.option(
|
||||
"--type",
|
||||
"-t",
|
||||
type=click.Choice(["openapi", "readme", "code", "all"]),
|
||||
default="all",
|
||||
help="Type of documentation to index",
|
||||
)
|
||||
@click.option(
|
||||
"--recursive", "-r", is_flag=True, default=False, help="Recursively search directories"
|
||||
)
|
||||
@click.option(
|
||||
"--batch-size", "-b", type=int, default=32, help="Documents per batch"
|
||||
)
|
||||
@click.pass_context
|
||||
def index_command(ctx, path, type, recursive, batch_size):
|
||||
"""Index documentation from a path.
|
||||
|
||||
PATH is the path to a file or directory to index.
|
||||
"""
|
||||
with console.status(f"Indexing {type} documentation from {path}..."):
|
||||
searcher = Searcher()
|
||||
count = searcher.index(path, doc_type=type, recursive=recursive, batch_size=batch_size)
|
||||
|
||||
if count > 0:
|
||||
console.print(format_success(f"Successfully indexed {count} documents"))
|
||||
else:
|
||||
console.print(format_error("No documents found to index"))
|
||||
if type == "all":
|
||||
console.print("Try specifying a type: --type openapi|readme|code")
|
||||
|
||||
|
||||
@cli.command(name="search")
|
||||
@click.argument("query", type=str)
|
||||
@click.option(
|
||||
"--limit", "-l", type=int, default=None, help="Maximum number of results"
|
||||
)
|
||||
@click.option(
|
||||
"--type",
|
||||
"-t",
|
||||
type=click.Choice(["openapi", "readme", "code"]),
|
||||
help="Filter by source type",
|
||||
)
|
||||
@click.option("--json", is_flag=True, help="Output as JSON")
|
||||
@click.option(
|
||||
"--hybrid/--semantic",
|
||||
default=True,
|
||||
help="Use hybrid (default) or semantic-only search",
|
||||
)
|
||||
@click.pass_context
|
||||
def search_command(ctx, query, limit, type, json, hybrid):
|
||||
"""Search indexed documentation.
|
||||
|
||||
QUERY is the search query in natural language.
|
||||
"""
|
||||
config = get_config()
|
||||
|
||||
if limit is None:
|
||||
limit = config.default_limit
|
||||
|
||||
searcher = Searcher()
|
||||
|
||||
with console.status("Searching..."):
|
||||
if hybrid:
|
||||
results = searcher.hybrid_search(query, limit=limit)
|
||||
else:
|
||||
results = searcher.search(query, limit=limit)
|
||||
|
||||
if not results:
|
||||
console.print(format_info("No results found for your query"))
|
||||
return
|
||||
|
||||
if json:
|
||||
import json as json_lib
|
||||
output = [r.to_dict() for r in results]
|
||||
console.print(json_lib.dumps(output, indent=2))
|
||||
else:
|
||||
table = format_search_results(results)
|
||||
console.print(table)
|
||||
|
||||
console.print(f"\nFound {len(results)} result(s)")
|
||||
|
||||
|
||||
@cli.command(name="list")
|
||||
@click.option(
|
||||
"--type",
|
||||
"-t",
|
||||
type=click.Choice(["openapi", "readme", "code"]),
|
||||
help="Filter by source type",
|
||||
)
|
||||
@click.option("--json", is_flag=True, help="Output as JSON")
|
||||
@click.pass_context
|
||||
def list_command(ctx, type, json):
|
||||
"""List indexed documents."""
|
||||
searcher = Searcher()
|
||||
stats = searcher.get_stats()
|
||||
|
||||
if json:
|
||||
import json
|
||||
output = stats.to_dict()
|
||||
console.print(json.dumps(output, indent=2))
|
||||
else:
|
||||
table = format_index_summary(
|
||||
stats.total_documents,
|
||||
stats.openapi_count,
|
||||
stats.readme_count,
|
||||
stats.code_count,
|
||||
)
|
||||
console.print(table)
|
||||
|
||||
|
||||
@cli.command(name="stats")
|
||||
@click.pass_context
|
||||
def stats_command(ctx):
|
||||
"""Show index statistics."""
|
||||
searcher = Searcher()
|
||||
stats = searcher.get_stats()
|
||||
|
||||
table = format_index_summary(
|
||||
stats.total_documents,
|
||||
stats.openapi_count,
|
||||
stats.readme_count,
|
||||
stats.code_count,
|
||||
)
|
||||
console.print(table)
|
||||
|
||||
|
||||
@cli.command(name="clear")
|
||||
@click.option("--type", "-t", type=click.Choice(["openapi", "readme", "code"]))
|
||||
@click.option("--force", "-f", is_flag=True, help="Skip confirmation prompt")
|
||||
@click.pass_context
|
||||
def clear_command(ctx, type, force):
|
||||
"""Clear the index or filtered by type."""
|
||||
if not force:
|
||||
if type:
|
||||
confirm = click.confirm(f"Delete all {type} documents from the index?")
|
||||
else:
|
||||
confirm = click.confirm("Delete all documents from the index?")
|
||||
else:
|
||||
confirm = True
|
||||
|
||||
if not confirm:
|
||||
console.print("Cancelled")
|
||||
return
|
||||
|
||||
searcher = Searcher()
|
||||
|
||||
if type:
|
||||
source_type = SourceType(type)
|
||||
count = searcher._vector_store.delete_by_source_type(source_type)
|
||||
else:
|
||||
count = searcher._vector_store.count()
|
||||
searcher.clear_index()
|
||||
|
||||
console.print(format_success(f"Deleted {count} document(s)"))
|
||||
|
||||
|
||||
@cli.command(name="config")
|
||||
@click.option("--show", is_flag=True, help="Show current configuration")
|
||||
@click.option("--reset", is_flag=True, help="Reset configuration to defaults")
|
||||
@click.pass_context
|
||||
def config_command(ctx, show, reset):
|
||||
"""Manage configuration."""
|
||||
config = get_config()
|
||||
|
||||
if reset:
|
||||
config.reset()
|
||||
console.print(format_success("Configuration reset to defaults"))
|
||||
return
|
||||
|
||||
if show or not (reset):
|
||||
config_dict = config.to_dict()
|
||||
|
||||
if show:
|
||||
import json
|
||||
console.print(json.dumps(config_dict, indent=2))
|
||||
else:
|
||||
lines = ["Current Configuration:", ""]
|
||||
for key, value in config_dict.items():
|
||||
lines.append(f" {key}: {value}")
|
||||
|
||||
panel = Panel(
|
||||
"\n".join(lines),
|
||||
title="Configuration",
|
||||
expand=False,
|
||||
)
|
||||
console.print(panel)
|
||||
|
||||
|
||||
@cli.command(name="interactive")
|
||||
@click.pass_context
|
||||
def interactive_command(ctx):
|
||||
"""Enter interactive search mode."""
|
||||
from local_api_docs_search.cli.interactive import run_interactive
|
||||
|
||||
run_interactive()
|
||||
|
||||
|
||||
def format_info(message: str) -> Text:
|
||||
"""Format an info message."""
|
||||
return Text(message, style="cyan")
|
||||
212
src/local_api_docs_search/cli/interactive.py
Normal file
212
src/local_api_docs_search/cli/interactive.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""Interactive search mode with Rich-powered UI."""
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
from rich.console import Console
|
||||
from rich.prompt import Prompt
|
||||
from rich.text import Text
|
||||
from rich.panel import Panel
|
||||
from rich import box
|
||||
|
||||
from local_api_docs_search.models.document import SearchResult
|
||||
from local_api_docs_search.search.searcher import Searcher
|
||||
from local_api_docs_search.utils.formatters import get_source_style
|
||||
|
||||
console = Console()
|
||||
|
||||
|
||||
class InteractiveSession:
|
||||
"""Interactive search session with history and navigation."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the interactive session."""
|
||||
self._searcher = Searcher()
|
||||
self._history: List[str] = []
|
||||
self._history_index: int = -1
|
||||
self._results: List[SearchResult] = []
|
||||
self._result_index: int = 0
|
||||
self._current_query: str = ""
|
||||
|
||||
def run(self):
|
||||
"""Run the interactive session."""
|
||||
self._print_welcome()
|
||||
|
||||
while True:
|
||||
try:
|
||||
query = self._get_input()
|
||||
|
||||
if query is None:
|
||||
break
|
||||
|
||||
if not query.strip():
|
||||
continue
|
||||
|
||||
self._history.append(query)
|
||||
self._history_index = len(self._history)
|
||||
|
||||
self._execute_search(query)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n[italic]Use 'exit' or 'quit' to leave[/]")
|
||||
except EOFError:
|
||||
break
|
||||
|
||||
console.print("\n[italic]Goodbye![/]")
|
||||
|
||||
def _print_welcome(self):
|
||||
"""Print welcome message."""
|
||||
welcome_text = Text.assemble(
|
||||
("Local API Docs Search\n", "bold cyan"),
|
||||
("-" * 40, "dim\n"),
|
||||
("Type your query and press Enter to search.\n", "white"),
|
||||
("Commands:\n", "bold yellow"),
|
||||
(" :q, quit, exit - Leave interactive mode\n", "dim"),
|
||||
(" :h, help - Show this help\n", "dim"),
|
||||
(" :c, clear - Clear search results\n", "dim"),
|
||||
(" :n, next - Next result\n", "dim"),
|
||||
(" :p, prev - Previous result\n", "dim"),
|
||||
(" ↑/↓ - History navigation\n", "dim"),
|
||||
)
|
||||
|
||||
panel = Panel(welcome_text, title="Welcome", expand=False)
|
||||
console.print(panel)
|
||||
|
||||
def _get_input(self) -> Optional[str]:
|
||||
"""Get user input with history navigation."""
|
||||
prompt = Prompt.ask(
|
||||
"[bold cyan]Search[/]",
|
||||
default="",
|
||||
show_default=False,
|
||||
accept_default=False,
|
||||
)
|
||||
|
||||
if prompt in (":q", ":quit", "quit", "exit", "exit()"):
|
||||
return None
|
||||
|
||||
if prompt in (":h", ":help", "help"):
|
||||
self._print_welcome()
|
||||
return ""
|
||||
|
||||
if prompt in (":c", ":clear", "clear"):
|
||||
self._results = []
|
||||
console.print("[italic]Results cleared[/]")
|
||||
return ""
|
||||
|
||||
if prompt in (":n", ":next", "next"):
|
||||
self._navigate_results(1)
|
||||
return ""
|
||||
|
||||
if prompt in (":p", ":prev", "previous"):
|
||||
self._navigate_results(-1)
|
||||
return ""
|
||||
|
||||
return prompt
|
||||
|
||||
def _execute_search(self, query: str):
|
||||
"""Execute search and display results."""
|
||||
self._current_query = query
|
||||
self._result_index = 0
|
||||
|
||||
with console.status("Searching..."):
|
||||
self._results = self._searcher.hybrid_search(query, limit=10)
|
||||
|
||||
if not self._results:
|
||||
console.print("[italic]No results found[/]\n")
|
||||
return
|
||||
|
||||
console.print(f"\n[bold]Found {len(self._results)} result(s)[/]\n")
|
||||
self._display_current_result()
|
||||
|
||||
def _display_current_result(self):
|
||||
"""Display the current result."""
|
||||
if not self._results:
|
||||
return
|
||||
|
||||
result = self._results[self._result_index]
|
||||
|
||||
source_style = get_source_style(result.document.source_type)
|
||||
|
||||
content = Text()
|
||||
content.append(f"Result {self._result_index + 1}/{len(self._results)}\n", "bold yellow")
|
||||
content.append(f"Title: {result.document.title}\n", "bold")
|
||||
content.append(f"Type: {result.document.source_type.value}\n", source_style)
|
||||
content.append(f"Score: {result.score:.4f}\n\n", "dim")
|
||||
|
||||
preview = result.document.content[:500]
|
||||
if len(result.document.content) > 500:
|
||||
preview += "..."
|
||||
content.append(preview)
|
||||
|
||||
if result.document.file_path:
|
||||
content.append(f"\n\n[dim]File: {result.document.file_path}[/]")
|
||||
|
||||
panel = Panel(
|
||||
content,
|
||||
title=f"Result {self._result_index + 1}",
|
||||
expand=False,
|
||||
box=box.ROUNDED,
|
||||
)
|
||||
|
||||
console.print(panel)
|
||||
|
||||
if result.highlights:
|
||||
console.print("\n[bold]Highlights:[/]")
|
||||
for highlight in result.highlights[:3]:
|
||||
console.print(f" [dim]{highlight}[/]")
|
||||
|
||||
console.print()
|
||||
|
||||
def _navigate_results(self, direction: int):
|
||||
"""Navigate through search results."""
|
||||
if not self._results:
|
||||
console.print("[italic]No results to navigate[/]")
|
||||
return
|
||||
|
||||
new_index = self._result_index + direction
|
||||
|
||||
if new_index < 0:
|
||||
new_index = 0
|
||||
elif new_index >= len(self._results):
|
||||
new_index = len(self._results) - 1
|
||||
|
||||
self._result_index = new_index
|
||||
self._display_current_result()
|
||||
|
||||
|
||||
def run_interactive():
|
||||
"""Run the interactive search mode."""
|
||||
session = InteractiveSession()
|
||||
session.run()
|
||||
|
||||
|
||||
class InteractiveSearch:
|
||||
"""Legacy interactive search class for compatibility."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the interactive search."""
|
||||
self._searcher = Searcher()
|
||||
self._history: List[str] = []
|
||||
|
||||
def search(self, query: str) -> List[SearchResult]:
|
||||
"""Execute search.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
|
||||
Returns:
|
||||
List of search results
|
||||
"""
|
||||
self._history.append(query)
|
||||
return self._searcher.hybrid_search(query)
|
||||
|
||||
def get_history(self) -> List[str]:
|
||||
"""Get search history.
|
||||
|
||||
Returns:
|
||||
List of past queries
|
||||
"""
|
||||
return self._history
|
||||
|
||||
def clear_history(self):
|
||||
"""Clear search history."""
|
||||
self._history = []
|
||||
1
src/local_api_docs_search/indexer/__init__.py
Normal file
1
src/local_api_docs_search/indexer/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Indexer package for parsing different documentation formats."""
|
||||
81
src/local_api_docs_search/indexer/base.py
Normal file
81
src/local_api_docs_search/indexer/base.py
Normal file
@@ -0,0 +1,81 @@
|
||||
"""Base indexer interface for documentation parsing."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from typing import Generator, List
|
||||
|
||||
from local_api_docs_search.models.document import Document, SourceType
|
||||
|
||||
|
||||
class BaseIndexer(ABC):
|
||||
"""Abstract base class for document indexers."""
|
||||
|
||||
source_type: SourceType
|
||||
|
||||
@abstractmethod
|
||||
def index(self, path: Path, recursive: bool = False) -> List[Document]:
|
||||
"""Index documents from the given path.
|
||||
|
||||
Args:
|
||||
path: Path to file or directory to index
|
||||
recursive: Whether to search directories recursively
|
||||
|
||||
Returns:
|
||||
List of indexed Document objects
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_documents(self) -> List[Document]:
|
||||
"""Get all indexed documents.
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
pass
|
||||
|
||||
def _find_files(self, path: Path, recursive: bool = False) -> Generator[Path, None, None]:
|
||||
"""Find files to index in the given path.
|
||||
|
||||
Args:
|
||||
path: Path to file or directory
|
||||
recursive: Whether to search recursively
|
||||
|
||||
Yields:
|
||||
Path objects for each file found
|
||||
"""
|
||||
if path.is_file():
|
||||
if self._is_supported_file(path):
|
||||
yield path
|
||||
elif path.is_dir():
|
||||
pattern = "**/*" if recursive else "*"
|
||||
for file_path in path.glob(pattern):
|
||||
if file_path.is_file() and self._is_supported_file(file_path):
|
||||
yield file_path
|
||||
|
||||
@abstractmethod
|
||||
def _is_supported_file(self, path: Path) -> bool:
|
||||
"""Check if the file is supported by this indexer.
|
||||
|
||||
Args:
|
||||
path: Path to the file
|
||||
|
||||
Returns:
|
||||
True if the file is supported
|
||||
"""
|
||||
pass
|
||||
|
||||
def _generate_id(self, file_path: Path, suffix: str = "") -> str:
|
||||
"""Generate a unique document ID.
|
||||
|
||||
Args:
|
||||
file_path: Path to the source file
|
||||
suffix: Optional suffix to add to the ID
|
||||
|
||||
Returns:
|
||||
Unique document ID string
|
||||
"""
|
||||
stem = file_path.stem.replace(" ", "_").lower()
|
||||
if suffix:
|
||||
return f"{stem}_{suffix}"
|
||||
return stem
|
||||
544
src/local_api_docs_search/indexer/code.py
Normal file
544
src/local_api_docs_search/indexer/code.py
Normal file
@@ -0,0 +1,544 @@
|
||||
"""Code comment indexer for Python, JavaScript, and TypeScript files."""
|
||||
|
||||
import ast
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from local_api_docs_search.indexer.base import BaseIndexer
|
||||
from local_api_docs_search.models.document import Document, SourceType
|
||||
|
||||
|
||||
class CodeIndexer(BaseIndexer):
|
||||
"""Indexer for code comments and docstrings."""
|
||||
|
||||
source_type = SourceType.CODE
|
||||
|
||||
SUPPORTED_EXTENSIONS = {
|
||||
".py": "python",
|
||||
".js": "javascript",
|
||||
".jsx": "javascript",
|
||||
".ts": "typescript",
|
||||
".tsx": "typescript",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self._documents: List[Document] = []
|
||||
self._parsed_files: Dict[str, Any] = {}
|
||||
|
||||
def index(
|
||||
self, path: Path, recursive: bool = False, batch_size: int = 32
|
||||
) -> List[Document]:
|
||||
"""Index code files from the given path.
|
||||
|
||||
Args:
|
||||
path: Path to file or directory
|
||||
recursive: Whether to search recursively
|
||||
batch_size: Documents per batch (for progress tracking)
|
||||
|
||||
Returns:
|
||||
List of indexed Document objects
|
||||
"""
|
||||
self._documents = []
|
||||
self._parsed_files = {}
|
||||
|
||||
for file_path in self._find_files(path, recursive):
|
||||
try:
|
||||
docs = self._parse_file(file_path)
|
||||
self._documents.extend(docs)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to parse {file_path}: {e}")
|
||||
|
||||
return self._documents
|
||||
|
||||
def _parse_file(self, file_path: Path) -> List[Document]:
|
||||
"""Parse a single code file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the code file
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
ext = file_path.suffix.lower()
|
||||
language = self.SUPPORTED_EXTENSIONS.get(ext)
|
||||
|
||||
if language is None:
|
||||
return []
|
||||
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
self._parsed_files[str(file_path)] = content
|
||||
|
||||
if language == "python":
|
||||
return self._parse_python(content, file_path)
|
||||
elif language in ("javascript", "typescript"):
|
||||
return self._parse_js_ts(content, file_path, language)
|
||||
|
||||
return []
|
||||
|
||||
def _parse_python(self, content: str, file_path: Path) -> List[Document]:
|
||||
"""Parse Python file for docstrings.
|
||||
|
||||
Args:
|
||||
content: Python file content
|
||||
file_path: Path to the file
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
documents = []
|
||||
doc_id_base = self._generate_id(file_path)
|
||||
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
except SyntaxError:
|
||||
return []
|
||||
|
||||
module_doc = self._get_module_docstring(content)
|
||||
if module_doc:
|
||||
doc = Document(
|
||||
id=f"{doc_id_base}_module",
|
||||
content=module_doc,
|
||||
source_type=self.source_type,
|
||||
title=f"Module: {file_path.stem}",
|
||||
file_path=str(file_path),
|
||||
metadata={"doc_type": "module"},
|
||||
)
|
||||
documents.append(doc)
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
|
||||
doc = self._parse_python_function(node, file_path, doc_id_base)
|
||||
if doc:
|
||||
documents.append(doc)
|
||||
elif isinstance(node, ast.ClassDef):
|
||||
doc = self._parse_python_class(node, file_path, doc_id_base)
|
||||
if doc:
|
||||
documents.append(doc)
|
||||
|
||||
if documents:
|
||||
index_doc = Document(
|
||||
id=f"{doc_id_base}_index",
|
||||
content=self._generate_python_index(tree, file_path),
|
||||
source_type=self.source_type,
|
||||
title=f"Index: {file_path.stem}",
|
||||
file_path=str(file_path),
|
||||
metadata={"doc_type": "index"},
|
||||
)
|
||||
documents.append(index_doc)
|
||||
|
||||
return documents
|
||||
|
||||
def _get_module_docstring(self, content: str) -> Optional[str]:
|
||||
"""Extract module docstring.
|
||||
|
||||
Args:
|
||||
content: Python file content
|
||||
|
||||
Returns:
|
||||
Module docstring or None
|
||||
"""
|
||||
tree = ast.parse(content)
|
||||
if tree.body and isinstance(tree.body[0], ast.Expr):
|
||||
docstring = tree.body[0].value
|
||||
if isinstance(docstring, ast.Constant) and isinstance(
|
||||
docstring.value, str
|
||||
):
|
||||
return docstring.value
|
||||
return None
|
||||
|
||||
def _parse_python_function(
|
||||
self, node: ast.FunctionDef, file_path: Path, doc_id_base: str
|
||||
) -> Optional[Document]:
|
||||
"""Parse a Python function for docstring.
|
||||
|
||||
Args:
|
||||
node: AST function node
|
||||
file_path: Path to the file
|
||||
doc_id_base: Base ID for document generation
|
||||
|
||||
Returns:
|
||||
Document or None
|
||||
"""
|
||||
docstring = self._get_docstring(node)
|
||||
if not docstring:
|
||||
return None
|
||||
|
||||
func_info = self._extract_python_function_info(node)
|
||||
|
||||
content = f"Function: {node.name}\n"
|
||||
content += f"Docstring:\n{docstring}\n"
|
||||
content += f"Parameters: {', '.join(func_info['args'])}\n"
|
||||
content += f"Returns: {func_info['returns']}\n"
|
||||
content += f"Line: {node.lineno}"
|
||||
|
||||
return Document(
|
||||
id=f"{doc_id_base}_func_{node.name}",
|
||||
content=content,
|
||||
source_type=self.source_type,
|
||||
title=f"Function: {node.name}",
|
||||
file_path=str(file_path),
|
||||
metadata={
|
||||
"doc_type": "function",
|
||||
"function_name": node.name,
|
||||
"line": node.lineno,
|
||||
},
|
||||
)
|
||||
|
||||
def _parse_python_class(
|
||||
self, node: ast.ClassDef, file_path: Path, doc_id_base: str
|
||||
) -> Optional[Document]:
|
||||
"""Parse a Python class for docstring.
|
||||
|
||||
Args:
|
||||
node: AST class node
|
||||
file_path: Path to the file
|
||||
doc_id_base: Base ID for document generation
|
||||
|
||||
Returns:
|
||||
Document or None
|
||||
"""
|
||||
docstring = self._get_docstring(node)
|
||||
if not docstring:
|
||||
return None
|
||||
|
||||
methods = []
|
||||
attributes = []
|
||||
|
||||
for item in node.body:
|
||||
if isinstance(item, ast.FunctionDef) or isinstance(
|
||||
item, ast.AsyncFunctionDef
|
||||
):
|
||||
if not item.name.startswith("_"):
|
||||
methods.append(item.name)
|
||||
elif isinstance(item, ast.AnnAssign) and isinstance(
|
||||
item.target, ast.Name
|
||||
):
|
||||
attributes.append(item.target.name)
|
||||
|
||||
content = f"Class: {node.name}\n"
|
||||
content += f"Docstring:\n{docstring}\n"
|
||||
if attributes:
|
||||
content += f"Attributes: {', '.join(attributes)}\n"
|
||||
if methods:
|
||||
content += f"Methods: {', '.join(methods)}\n"
|
||||
content += f"Line: {node.lineno}"
|
||||
|
||||
return Document(
|
||||
id=f"{doc_id_base}_class_{node.name}",
|
||||
content=content,
|
||||
source_type=self.source_type,
|
||||
title=f"Class: {node.name}",
|
||||
file_path=str(file_path),
|
||||
metadata={
|
||||
"doc_type": "class",
|
||||
"class_name": node.name,
|
||||
"line": node.lineno,
|
||||
},
|
||||
)
|
||||
|
||||
def _get_docstring(self, node: ast.AST) -> Optional[str]:
|
||||
"""Extract docstring from an AST node.
|
||||
|
||||
Args:
|
||||
node: AST node
|
||||
|
||||
Returns:
|
||||
Docstring or None
|
||||
"""
|
||||
if hasattr(node, "body") and node.body:
|
||||
first = node.body[0]
|
||||
if isinstance(first, ast.Expr) and isinstance(first.value, ast.Constant):
|
||||
value = first.value.value
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
return None
|
||||
|
||||
def _extract_python_function_info(
|
||||
self, node: ast.FunctionDef
|
||||
) -> Dict[str, Any]:
|
||||
"""Extract function information.
|
||||
|
||||
Args:
|
||||
node: AST function node
|
||||
|
||||
Returns:
|
||||
Dictionary with function information
|
||||
"""
|
||||
args = []
|
||||
defaults = []
|
||||
|
||||
for arg in node.args.args:
|
||||
if arg.arg != "self" and arg.arg != "cls":
|
||||
args.append(arg.arg)
|
||||
|
||||
for default in node.args.defaults:
|
||||
if isinstance(default, ast.Constant):
|
||||
defaults.append(str(default.value))
|
||||
|
||||
returns = "unknown"
|
||||
if node.returns:
|
||||
if isinstance(node.returns, ast.Name):
|
||||
returns = node.returns.id
|
||||
elif isinstance(node.returns, ast.Constant):
|
||||
returns = str(node.returns.value)
|
||||
|
||||
return {"args": args, "defaults": defaults, "returns": returns}
|
||||
|
||||
def _generate_python_index(
|
||||
self, tree: ast.AST, file_path: Path
|
||||
) -> str:
|
||||
"""Generate an index of all documented items.
|
||||
|
||||
Args:
|
||||
tree: Parsed AST tree
|
||||
file_path: Path to the file
|
||||
|
||||
Returns:
|
||||
Index content
|
||||
"""
|
||||
functions = []
|
||||
classes = []
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef) or isinstance(
|
||||
node, ast.AsyncFunctionDef
|
||||
):
|
||||
if self._get_docstring(node) and not node.name.startswith("_"):
|
||||
functions.append(node.name)
|
||||
elif isinstance(node, ast.ClassDef):
|
||||
if self._get_docstring(node):
|
||||
classes.append(node.name)
|
||||
|
||||
content = f"File: {file_path.name}\n\n"
|
||||
if classes:
|
||||
content += "Classes:\n" + "\n".join(f" - {c}" for c in classes) + "\n\n"
|
||||
if functions:
|
||||
content += "Functions:\n" + "\n".join(f" - {f}" for f in functions)
|
||||
|
||||
return content
|
||||
|
||||
def _parse_js_ts(
|
||||
self, content: str, file_path: Path, language: str
|
||||
) -> List[Document]:
|
||||
"""Parse JavaScript/TypeScript file for JSDoc comments.
|
||||
|
||||
Args:
|
||||
content: File content
|
||||
file_path: Path to the file
|
||||
language: Language identifier
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
documents = []
|
||||
doc_id_base = self._generate_id(file_path)
|
||||
|
||||
jsdocs = self._extract_jsdocs(content)
|
||||
|
||||
if not jsdocs:
|
||||
return documents
|
||||
|
||||
module_doc = self._extract_js_module_doc(content)
|
||||
if module_doc:
|
||||
doc = Document(
|
||||
id=f"{doc_id_base}_module",
|
||||
content=module_doc,
|
||||
source_type=self.source_type,
|
||||
title=f"Module: {file_path.stem}",
|
||||
file_path=str(file_path),
|
||||
metadata={"doc_type": "module"},
|
||||
)
|
||||
documents.append(doc)
|
||||
|
||||
for i, jsdoc in enumerate(jsdocs):
|
||||
doc = self._create_jsdoc_document(jsdoc, file_path, doc_id_base, i)
|
||||
documents.append(doc)
|
||||
|
||||
return documents
|
||||
|
||||
def _extract_jsdocs(self, content: str) -> List[Dict[str, Any]]:
|
||||
"""Extract JSDoc comments from content.
|
||||
|
||||
Args:
|
||||
content: File content
|
||||
|
||||
Returns:
|
||||
List of JSDoc dictionaries
|
||||
"""
|
||||
jsdocs = []
|
||||
pattern = r"/\*\*([\s\S]*?)\*/\s*(export\s+)?(async\s+)?(function|const|let|var|class|interface|type|enum)\s+(\w+)"
|
||||
matches = re.findall(pattern, content, re.MULTILINE)
|
||||
|
||||
for match in matches:
|
||||
full_comment = f"/**{match[0]}*/"
|
||||
exported = bool(match[1])
|
||||
async_kw = bool(match[2])
|
||||
decl_type = match[3]
|
||||
name = match[4]
|
||||
|
||||
parsed = self._parse_jsdoc_comment(full_comment)
|
||||
parsed.update({
|
||||
"name": name,
|
||||
"type": decl_type,
|
||||
"exported": exported,
|
||||
"async": async_kw,
|
||||
})
|
||||
jsdocs.append(parsed)
|
||||
|
||||
return jsdocs
|
||||
|
||||
def _parse_jsdoc_comment(self, comment: str) -> Dict[str, Any]:
|
||||
"""Parse a JSDoc comment.
|
||||
|
||||
Args:
|
||||
comment: JSDoc comment string
|
||||
|
||||
Returns:
|
||||
Parsed JSDoc dictionary
|
||||
"""
|
||||
result = {
|
||||
"description": "",
|
||||
"params": [],
|
||||
"returns": None,
|
||||
"examples": [],
|
||||
"throws": [],
|
||||
"see": [],
|
||||
}
|
||||
|
||||
lines = comment.strip("/**").strip("*/").split("\n")
|
||||
current_description = []
|
||||
|
||||
for line in lines:
|
||||
line = line.strip().lstrip("*").strip()
|
||||
|
||||
if line.startswith("@param"):
|
||||
param_match = re.match(r"@param\s+\{([^}]+)\}\s+(\w+)(?:\s+-)?\s*(.*)", line)
|
||||
if param_match:
|
||||
result["params"].append({
|
||||
"type": param_match.group(1),
|
||||
"name": param_match.group(2),
|
||||
"description": param_match.group(3),
|
||||
})
|
||||
elif line.startswith("@returns") or line.startswith("@return"):
|
||||
return_match = re.match(r"@returns?\{([^}]+)\}\s*(.*)", line)
|
||||
if return_match:
|
||||
result["returns"] = {
|
||||
"type": return_match.group(1),
|
||||
"description": return_match.group(2),
|
||||
}
|
||||
elif line.startswith("@example"):
|
||||
result["examples"].append(line[8:].strip())
|
||||
elif line.startswith("@throws"):
|
||||
throw_match = re.match(r"@throws\{([^}]+)\}\s*(.*)", line)
|
||||
if throw_match:
|
||||
result["throws"].append({
|
||||
"type": throw_match.group(1),
|
||||
"description": throw_match.group(2),
|
||||
})
|
||||
elif line.startswith("@see"):
|
||||
result["see"].append(line[4:].strip())
|
||||
elif line and not line.startswith("@"):
|
||||
current_description.append(line)
|
||||
|
||||
result["description"] = " ".join(current_description)
|
||||
return result
|
||||
|
||||
def _extract_js_module_doc(self, content: str) -> Optional[str]:
|
||||
"""Extract module-level documentation.
|
||||
|
||||
Args:
|
||||
content: File content
|
||||
|
||||
Returns:
|
||||
Module docstring or None
|
||||
"""
|
||||
file_doc_pattern = r"/\*\*([\s\S]*?)\*/\s*@module\s+(\w+)"
|
||||
match = re.search(file_doc_pattern, content)
|
||||
if match:
|
||||
return f"Module: {match.group(2)}\n\n{match.group(1).strip()}"
|
||||
return None
|
||||
|
||||
def _create_jsdoc_document(
|
||||
self,
|
||||
jsdoc: Dict[str, Any],
|
||||
file_path: Path,
|
||||
doc_id_base: str,
|
||||
index: int,
|
||||
) -> Document:
|
||||
"""Create a Document from parsed JSDoc.
|
||||
|
||||
Args:
|
||||
jsdoc: Parsed JSDoc dictionary
|
||||
file_path: Path to the source file
|
||||
doc_id_base: Base ID for document generation
|
||||
index: Index for ID generation
|
||||
|
||||
Returns:
|
||||
Document object
|
||||
"""
|
||||
content_parts = []
|
||||
|
||||
decl_type = jsdoc.get("type", "unknown")
|
||||
name = jsdoc.get("name", "unknown")
|
||||
is_async = "async " if jsdoc.get("async") else ""
|
||||
is_exported = "export " if jsdoc.get("exported") else ""
|
||||
|
||||
content_parts.append(f"{is_exported}{is_async}{decl_type} {name}")
|
||||
|
||||
if jsdoc.get("description"):
|
||||
content_parts.append(f"\nDescription: {jsdoc['description']}")
|
||||
|
||||
if jsdoc.get("params"):
|
||||
param_lines = ["\nParameters:"]
|
||||
for param in jsdoc["params"]:
|
||||
param_lines.append(
|
||||
f" - {param['name']} ({param['type']}): {param['description']}"
|
||||
)
|
||||
content_parts.append("\n".join(param_lines))
|
||||
|
||||
if jsdoc.get("returns"):
|
||||
ret = jsdoc["returns"]
|
||||
content_parts.append(f"\nReturns ({ret['type']}): {ret['description']}")
|
||||
|
||||
if jsdoc.get("examples"):
|
||||
examples = "\nExamples:\n" + "\n".join(
|
||||
f" {i+1}. {ex}" for i, ex in enumerate(jsdoc["examples"])
|
||||
)
|
||||
content_parts.append(examples)
|
||||
|
||||
content = "\n".join(content_parts)
|
||||
|
||||
return Document(
|
||||
id=f"{doc_id_base}_jsdoc_{index}",
|
||||
content=content,
|
||||
source_type=self.source_type,
|
||||
title=f"{decl_type.capitalize()}: {name}",
|
||||
file_path=str(file_path),
|
||||
metadata={
|
||||
"doc_type": "jsdoc",
|
||||
"name": name,
|
||||
"jsdoc_type": decl_type,
|
||||
},
|
||||
)
|
||||
|
||||
def _is_supported_file(self, path: Path) -> bool:
|
||||
"""Check if the file is a supported code file.
|
||||
|
||||
Args:
|
||||
path: Path to the file
|
||||
|
||||
Returns:
|
||||
True if the file extension is supported
|
||||
"""
|
||||
return path.suffix.lower() in self.SUPPORTED_EXTENSIONS
|
||||
|
||||
def get_documents(self) -> List[Document]:
|
||||
"""Get all indexed documents.
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
return self._documents
|
||||
491
src/local_api_docs_search/indexer/openapi.py
Normal file
491
src/local_api_docs_search/indexer/openapi.py
Normal file
@@ -0,0 +1,491 @@
|
||||
"""OpenAPI/Swagger specification indexer."""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from openapi_spec_validator import validate
|
||||
from yaml import safe_load
|
||||
|
||||
from local_api_docs_search.indexer.base import BaseIndexer
|
||||
from local_api_docs_search.models.document import Document, SourceType
|
||||
|
||||
|
||||
class OpenAPIIndexer(BaseIndexer):
|
||||
"""Indexer for OpenAPI/Swagger specifications."""
|
||||
|
||||
source_type = SourceType.OPENAPI
|
||||
|
||||
SUPPORTED_EXTENSIONS = {".yaml", ".yml", ".json"}
|
||||
|
||||
def __init__(self):
|
||||
self._documents: List[Document] = []
|
||||
|
||||
def index(
|
||||
self, path: Path, recursive: bool = False, batch_size: int = 32
|
||||
) -> List[Document]:
|
||||
"""Index OpenAPI specifications from the given path.
|
||||
|
||||
Args:
|
||||
path: Path to file or directory
|
||||
recursive: Whether to search recursively
|
||||
batch_size: Documents per batch (for progress tracking)
|
||||
|
||||
Returns:
|
||||
List of indexed Document objects
|
||||
"""
|
||||
self._documents = []
|
||||
|
||||
for file_path in self._find_files(path, recursive):
|
||||
try:
|
||||
docs = self._parse_file(file_path)
|
||||
self._documents.extend(docs)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to parse {file_path}: {e}")
|
||||
|
||||
return self._documents
|
||||
|
||||
def _parse_file(self, file_path: Path) -> List[Document]:
|
||||
"""Parse a single OpenAPI file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the OpenAPI file
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
with open(file_path, "r") as f:
|
||||
content = f.read()
|
||||
|
||||
if file_path.suffix == ".json":
|
||||
spec = json.loads(content)
|
||||
else:
|
||||
spec = safe_load(content)
|
||||
|
||||
if spec is None:
|
||||
return []
|
||||
|
||||
validation_errors = self._validate_spec(spec, file_path)
|
||||
if validation_errors:
|
||||
print(f"Warning: Validation errors in {file_path}: {validation_errors}")
|
||||
|
||||
return self._extract_documents(spec, file_path)
|
||||
|
||||
def _validate_spec(
|
||||
self, spec: Dict[str, Any], file_path: Path
|
||||
) -> Optional[str]:
|
||||
"""Validate an OpenAPI specification.
|
||||
|
||||
Args:
|
||||
spec: The parsed specification
|
||||
file_path: Path to the source file
|
||||
|
||||
Returns:
|
||||
None if valid, error message otherwise
|
||||
"""
|
||||
try:
|
||||
validate(spec)
|
||||
return None
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
def _extract_documents(
|
||||
self, spec: Dict[str, Any], file_path: Path
|
||||
) -> List[Document]:
|
||||
"""Extract searchable documents from an OpenAPI spec.
|
||||
|
||||
Args:
|
||||
spec: The parsed OpenAPI specification
|
||||
file_path: Path to the source file
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
documents = []
|
||||
spec_info = spec.get("info", {})
|
||||
title = spec_info.get("title", file_path.stem)
|
||||
version = spec_info.get("version", "unknown")
|
||||
|
||||
doc_id_base = self._generate_id(file_path)
|
||||
|
||||
info_doc = Document(
|
||||
id=f"{doc_id_base}_info",
|
||||
content=self._format_info_content(spec_info),
|
||||
source_type=self.source_type,
|
||||
title=f"{title} - API Info",
|
||||
file_path=str(file_path),
|
||||
metadata={"version": version, "section": "info"},
|
||||
)
|
||||
documents.append(info_doc)
|
||||
|
||||
for path, path_item in spec.get("paths", {}).items():
|
||||
path_docs = self._extract_path_documents(
|
||||
path, path_item, spec, file_path, doc_id_base
|
||||
)
|
||||
documents.extend(path_docs)
|
||||
|
||||
for tag, tag_spec in spec.get("tags", []):
|
||||
tag_doc = Document(
|
||||
id=f"{doc_id_base}_tag_{tag}",
|
||||
content=self._format_tag_content(tag, tag_spec),
|
||||
source_type=self.source_type,
|
||||
title=f"Tag: {tag}",
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "tags", "tag": tag},
|
||||
)
|
||||
documents.append(tag_doc)
|
||||
|
||||
for schema_name, schema in spec.get("components", {}).get("schemas", {}).items():
|
||||
schema_doc = self._extract_schema_document(
|
||||
schema_name, schema, file_path, doc_id_base
|
||||
)
|
||||
if schema_doc:
|
||||
documents.append(schema_doc)
|
||||
|
||||
return documents
|
||||
|
||||
def _extract_path_documents(
|
||||
self,
|
||||
path: str,
|
||||
path_item: Dict[str, Any],
|
||||
spec: Dict[str, Any],
|
||||
file_path: Path,
|
||||
doc_id_base: str,
|
||||
) -> List[Document]:
|
||||
"""Extract documents from a path item.
|
||||
|
||||
Args:
|
||||
path: The path string
|
||||
path_item: The path item specification
|
||||
spec: The full OpenAPI specification
|
||||
file_path: Path to the source file
|
||||
doc_id_base: Base ID for document generation
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
documents = []
|
||||
path_hash = hashlib.md5(path.encode()).hexdigest()[:8]
|
||||
|
||||
methods = ["get", "post", "put", "patch", "delete", "options", "head", "trace"]
|
||||
|
||||
for method in methods:
|
||||
if method in path_item:
|
||||
operation = path_item[method]
|
||||
doc = self._extract_operation_document(
|
||||
method, path, operation, spec, file_path, doc_id_base, path_hash
|
||||
)
|
||||
documents.append(doc)
|
||||
|
||||
summary = path_item.get("summary", "")
|
||||
description = path_item.get("description", "")
|
||||
if summary or description:
|
||||
path_doc = Document(
|
||||
id=f"{doc_id_base}_path_{path_hash}",
|
||||
content=f"Path: {path}\nSummary: {summary}\nDescription: {description}",
|
||||
source_type=self.source_type,
|
||||
title=f"Path: {path}",
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "path", "path": path},
|
||||
)
|
||||
documents.append(path_doc)
|
||||
|
||||
return documents
|
||||
|
||||
def _extract_operation_document(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
operation: Dict[str, Any],
|
||||
spec: Dict[str, Any],
|
||||
file_path: Path,
|
||||
doc_id_base: str,
|
||||
path_hash: str,
|
||||
) -> Document:
|
||||
"""Extract a document from an operation.
|
||||
|
||||
Args:
|
||||
method: HTTP method
|
||||
path: API path
|
||||
operation: The operation specification
|
||||
spec: The full OpenAPI specification
|
||||
file_path: Path to the source file
|
||||
doc_id_base: Base ID for document generation
|
||||
path_hash: Hash of the path for ID generation
|
||||
|
||||
Returns:
|
||||
Document object
|
||||
"""
|
||||
op_id = operation.get("operationId", f"{method}_{path_hash}")
|
||||
summary = operation.get("summary", "")
|
||||
description = operation.get("description", "")
|
||||
deprecated = operation.get("deprecated", False)
|
||||
|
||||
content_parts = [
|
||||
f"Method: {method.upper()}",
|
||||
f"Path: {path}",
|
||||
f"Operation ID: {op_id}",
|
||||
f"Summary: {summary}",
|
||||
f"Description: {description}",
|
||||
]
|
||||
|
||||
if deprecated:
|
||||
content_parts.append("Status: DEPRECATED")
|
||||
|
||||
tags = operation.get("tags", [])
|
||||
if tags:
|
||||
content_parts.append(f"Tags: {', '.join(tags)}")
|
||||
|
||||
parameters = operation.get("parameters", [])
|
||||
if parameters:
|
||||
param_content = self._format_parameters(parameters)
|
||||
content_parts.append(f"Parameters:\n{param_content}")
|
||||
|
||||
request_body = operation.get("requestBody", {})
|
||||
if request_body:
|
||||
rb_content = self._format_request_body(request_body, spec)
|
||||
content_parts.append(f"Request Body:\n{rb_content}")
|
||||
|
||||
responses = operation.get("responses", {})
|
||||
resp_content = self._format_responses(responses)
|
||||
content_parts.append(f"Responses:\n{resp_content}")
|
||||
|
||||
return Document(
|
||||
id=f"{doc_id_base}_{op_id}",
|
||||
content="\n".join(content_parts),
|
||||
source_type=self.source_type,
|
||||
title=f"{method.upper()} {path}",
|
||||
file_path=str(file_path),
|
||||
metadata={
|
||||
"section": "operation",
|
||||
"method": method,
|
||||
"path": path,
|
||||
"operation_id": op_id,
|
||||
"deprecated": deprecated,
|
||||
},
|
||||
)
|
||||
|
||||
def _format_parameters(self, parameters: List[Dict[str, Any]]) -> str:
|
||||
"""Format parameters for display.
|
||||
|
||||
Args:
|
||||
parameters: List of parameter specifications
|
||||
|
||||
Returns:
|
||||
Formatted parameter string
|
||||
"""
|
||||
lines = []
|
||||
for param in parameters:
|
||||
name = param.get("name", "unknown")
|
||||
in_loc = param.get("in", "unknown")
|
||||
required = param.get("required", False)
|
||||
description = param.get("description", "")
|
||||
param_type = param.get("schema", {}).get("type", "any")
|
||||
|
||||
lines.append(
|
||||
f" - {name} ({in_loc}, {'required' if required else 'optional'}): {param_type}"
|
||||
)
|
||||
if description:
|
||||
lines.append(f" Description: {description}")
|
||||
|
||||
return "\n".join(lines) if lines else " No parameters"
|
||||
|
||||
def _format_request_body(
|
||||
self, request_body: Dict[str, Any], spec: Dict[str, Any]
|
||||
) -> str:
|
||||
"""Format request body for display.
|
||||
|
||||
Args:
|
||||
request_body: Request body specification
|
||||
spec: The full OpenAPI specification
|
||||
|
||||
Returns:
|
||||
Formatted request body string
|
||||
"""
|
||||
lines = []
|
||||
description = request_body.get("description", "")
|
||||
if description:
|
||||
lines.append(f"Description: {description}")
|
||||
|
||||
required = request_body.get("required", False)
|
||||
lines.append(f"Required: {required}")
|
||||
|
||||
content = request_body.get("content", {})
|
||||
for content_type, content_spec in content.items():
|
||||
schema = content_spec.get("schema", {})
|
||||
schema_ref = schema.get("$ref", "")
|
||||
if schema_ref:
|
||||
resolved = self._resolve_ref(schema_ref, spec)
|
||||
if resolved:
|
||||
schema = resolved
|
||||
lines.append(f"Content-Type: {content_type}")
|
||||
lines.append(f"Schema: {json.dumps(schema, indent=4)}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _format_responses(self, responses: Dict[str, Any]) -> str:
|
||||
"""Format responses for display.
|
||||
|
||||
Args:
|
||||
responses: Response specifications
|
||||
|
||||
Returns:
|
||||
Formatted response string
|
||||
"""
|
||||
lines = []
|
||||
for status_code, response in responses.items():
|
||||
description = response.get("description", "")
|
||||
lines.append(f" {status_code}: {description}")
|
||||
|
||||
content = response.get("content", {})
|
||||
for content_type, content_spec in content.items():
|
||||
schema = content_spec.get("schema", {})
|
||||
if schema:
|
||||
schema_type = schema.get("type", "unknown")
|
||||
lines.append(f" Content-Type: {content_type}")
|
||||
lines.append(f" Schema Type: {schema_type}")
|
||||
|
||||
return "\n".join(lines) if lines else " No responses defined"
|
||||
|
||||
def _resolve_ref(self, ref: str, spec: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||
"""Resolve a $ref reference.
|
||||
|
||||
Args:
|
||||
ref: The reference string
|
||||
spec: The full OpenAPI specification
|
||||
|
||||
Returns:
|
||||
Resolved schema or None
|
||||
"""
|
||||
if not ref.startswith("#/"):
|
||||
return None
|
||||
|
||||
parts = ref[2:].split("/")
|
||||
current = spec
|
||||
|
||||
for part in parts:
|
||||
if isinstance(current, dict):
|
||||
current = current.get(part)
|
||||
else:
|
||||
return None
|
||||
|
||||
return current
|
||||
|
||||
def _extract_schema_document(
|
||||
self,
|
||||
schema_name: str,
|
||||
schema: Dict[str, Any],
|
||||
file_path: Path,
|
||||
doc_id_base: str,
|
||||
) -> Document:
|
||||
"""Extract a document from a schema.
|
||||
|
||||
Args:
|
||||
schema_name: Name of the schema
|
||||
schema: Schema specification
|
||||
file_path: Path to the source file
|
||||
doc_id_base: Base ID for document generation
|
||||
|
||||
Returns:
|
||||
Document object
|
||||
"""
|
||||
content_parts = [
|
||||
f"Schema: {schema_name}",
|
||||
]
|
||||
|
||||
schema_type = schema.get("type", "object")
|
||||
content_parts.append(f"Type: {schema_type}")
|
||||
|
||||
description = schema.get("description", "")
|
||||
if description:
|
||||
content_parts.append(f"Description: {description}")
|
||||
|
||||
required_fields = schema.get("required", [])
|
||||
if required_fields:
|
||||
content_parts.append(f"Required Fields: {', '.join(required_fields)}")
|
||||
|
||||
properties = schema.get("properties", {})
|
||||
if properties:
|
||||
prop_lines = ["Properties:"]
|
||||
for prop_name, prop_spec in properties.items():
|
||||
prop_type = prop_spec.get("type", "unknown")
|
||||
prop_desc = prop_spec.get("description", "")
|
||||
prop_required = prop_name in required_fields
|
||||
prop_lines.append(
|
||||
f" - {prop_name} ({prop_type}, {'required' if prop_required else 'optional'})"
|
||||
)
|
||||
if prop_desc:
|
||||
prop_lines.append(f" Description: {prop_desc}")
|
||||
content_parts.append("\n".join(prop_lines))
|
||||
|
||||
return Document(
|
||||
id=f"{doc_id_base}_schema_{schema_name}",
|
||||
content="\n".join(content_parts),
|
||||
source_type=self.source_type,
|
||||
title=f"Schema: {schema_name}",
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "schema", "schema_name": schema_name},
|
||||
)
|
||||
|
||||
def _format_info_content(self, info: Dict[str, Any]) -> str:
|
||||
"""Format the API info section.
|
||||
|
||||
Args:
|
||||
info: Info object from specification
|
||||
|
||||
Returns:
|
||||
Formatted info content
|
||||
"""
|
||||
parts = []
|
||||
for key in ["title", "version", "description", "termsOfService", "contact", "license"]:
|
||||
if key in info:
|
||||
value = info[key]
|
||||
if isinstance(value, dict):
|
||||
if "name" in value:
|
||||
parts.append(f"{key}: {value['name']}")
|
||||
if "url" in value:
|
||||
parts.append(f"{key} URL: {value['url']}")
|
||||
else:
|
||||
parts.append(f"{key}: {value}")
|
||||
return "\n".join(parts)
|
||||
|
||||
def _format_tag_content(self, tag: str, tag_spec: Dict[str, Any]) -> str:
|
||||
"""Format tag content.
|
||||
|
||||
Args:
|
||||
tag: Tag name
|
||||
tag_spec: Tag specification
|
||||
|
||||
Returns:
|
||||
Formatted tag content
|
||||
"""
|
||||
parts = [f"Tag: {tag}"]
|
||||
description = tag_spec.get("description", "")
|
||||
if description:
|
||||
parts.append(f"Description: {description}")
|
||||
external_docs = tag_spec.get("externalDocs", {})
|
||||
if external_docs:
|
||||
docs_url = external_docs.get("url", "")
|
||||
if docs_url:
|
||||
parts.append(f"External Docs: {docs_url}")
|
||||
return "\n".join(parts)
|
||||
|
||||
def _is_supported_file(self, path: Path) -> bool:
|
||||
"""Check if the file is a supported OpenAPI file.
|
||||
|
||||
Args:
|
||||
path: Path to the file
|
||||
|
||||
Returns:
|
||||
True if the file extension is supported
|
||||
"""
|
||||
return path.suffix.lower() in self.SUPPORTED_EXTENSIONS
|
||||
|
||||
def get_documents(self) -> List[Document]:
|
||||
"""Get all indexed documents.
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
return self._documents
|
||||
254
src/local_api_docs_search/indexer/readme.py
Normal file
254
src/local_api_docs_search/indexer/readme.py
Normal file
@@ -0,0 +1,254 @@
|
||||
"""README/Markdown file indexer."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
|
||||
from local_api_docs_search.indexer.base import BaseIndexer
|
||||
from local_api_docs_search.models.document import Document, SourceType
|
||||
|
||||
|
||||
class READMEIndexer(BaseIndexer):
|
||||
"""Indexer for README and Markdown files."""
|
||||
|
||||
source_type = SourceType.README
|
||||
|
||||
SUPPORTED_EXTENSIONS = {".md", ".markdown", ".txt"}
|
||||
|
||||
def __init__(self):
|
||||
self._documents: List[Document] = []
|
||||
|
||||
def index(
|
||||
self, path: Path, recursive: bool = False, chunk_size: int = 1000
|
||||
) -> List[Document]:
|
||||
"""Index README/Markdown files from the given path.
|
||||
|
||||
Args:
|
||||
path: Path to file or directory
|
||||
recursive: Whether to search recursively
|
||||
chunk_size: Maximum chunk size in characters
|
||||
|
||||
Returns:
|
||||
List of indexed Document objects
|
||||
"""
|
||||
self._documents = []
|
||||
|
||||
for file_path in self._find_files(path, recursive):
|
||||
try:
|
||||
docs = self._parse_file(file_path, chunk_size)
|
||||
self._documents.extend(docs)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to parse {file_path}: {e}")
|
||||
|
||||
return self._documents
|
||||
|
||||
def _parse_file(
|
||||
self, file_path: Path, chunk_size: int = 1000
|
||||
) -> List[Document]:
|
||||
"""Parse a single Markdown file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the Markdown file
|
||||
chunk_size: Maximum chunk size
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
title = self._extract_title(content, file_path.stem)
|
||||
sections = self._parse_sections(content)
|
||||
|
||||
documents = []
|
||||
doc_id_base = self._generate_id(file_path)
|
||||
|
||||
if not sections:
|
||||
doc = Document(
|
||||
id=doc_id_base,
|
||||
content=content.strip(),
|
||||
source_type=self.source_type,
|
||||
title=title,
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "root"},
|
||||
)
|
||||
documents.append(doc)
|
||||
else:
|
||||
for i, (section_title, section_content, level) in enumerate(sections):
|
||||
chunks = self._chunk_content(
|
||||
section_content, section_title, chunk_size
|
||||
)
|
||||
for j, chunk in enumerate(chunks):
|
||||
doc_id = f"{doc_id_base}_section_{i}_{j}" if len(chunks) > 1 else f"{doc_id_base}_section_{i}"
|
||||
doc = Document(
|
||||
id=doc_id,
|
||||
content=chunk,
|
||||
source_type=self.source_type,
|
||||
title=f"{title} - {section_title}",
|
||||
file_path=str(file_path),
|
||||
metadata={
|
||||
"section": section_title,
|
||||
"section_level": level,
|
||||
"chunk_index": j,
|
||||
"total_chunks": len(chunks),
|
||||
},
|
||||
)
|
||||
documents.append(doc)
|
||||
|
||||
if len(sections) == 1:
|
||||
full_doc = Document(
|
||||
id=f"{doc_id_base}_full",
|
||||
content=content.strip(),
|
||||
source_type=self.source_type,
|
||||
title=f"{title} (Full)",
|
||||
file_path=str(file_path),
|
||||
metadata={"section": "full_document"},
|
||||
)
|
||||
documents.append(full_doc)
|
||||
|
||||
return documents
|
||||
|
||||
def _extract_title(self, content: str, default: str) -> str:
|
||||
"""Extract the title from Markdown content.
|
||||
|
||||
Args:
|
||||
content: Markdown content
|
||||
default: Default title if none found
|
||||
|
||||
Returns:
|
||||
Extracted title
|
||||
"""
|
||||
for line in content.split("\n"):
|
||||
line = line.strip()
|
||||
if line.startswith("# "):
|
||||
return line[2:].strip()
|
||||
return default
|
||||
|
||||
def _parse_sections(
|
||||
self, content: str
|
||||
) -> List[Tuple[str, str, int]]:
|
||||
"""Parse Markdown content into sections.
|
||||
|
||||
Args:
|
||||
content: Markdown content
|
||||
|
||||
Returns:
|
||||
List of (title, content, level) tuples
|
||||
"""
|
||||
sections = []
|
||||
lines = content.split("\n")
|
||||
current_section = ("", "", 0)
|
||||
current_lines = []
|
||||
|
||||
in_code_block = False
|
||||
code_fence = "```"
|
||||
|
||||
for line in lines:
|
||||
if line.startswith(code_fence):
|
||||
in_code_block = not in_code_block
|
||||
if not in_code_block:
|
||||
current_lines.append(line)
|
||||
continue
|
||||
|
||||
if not in_code_block and line.startswith("#"):
|
||||
if current_section[1]:
|
||||
sections.append(
|
||||
(current_section[0], "\n".join(current_lines), current_section[2])
|
||||
)
|
||||
|
||||
header = line.lstrip("#")
|
||||
level = len(line) - len(header)
|
||||
title = header.strip()
|
||||
current_lines = []
|
||||
current_section = (title, "", level)
|
||||
else:
|
||||
current_lines.append(line)
|
||||
|
||||
if current_section[1]:
|
||||
sections.append(
|
||||
(current_section[0], "\n".join(current_lines), current_section[2])
|
||||
)
|
||||
|
||||
return sections
|
||||
|
||||
def _chunk_content(
|
||||
self, content: str, section_title: str, max_size: int
|
||||
) -> List[str]:
|
||||
"""Chunk content into smaller pieces.
|
||||
|
||||
Args:
|
||||
content: Section content
|
||||
section_title: Section title for context
|
||||
max_size: Maximum chunk size
|
||||
|
||||
Returns:
|
||||
List of content chunks
|
||||
"""
|
||||
if len(content) <= max_size:
|
||||
return [content]
|
||||
|
||||
chunks = []
|
||||
current_chunk = []
|
||||
current_size = 0
|
||||
|
||||
paragraphs = self._split_paragraphs(content)
|
||||
|
||||
for para in paragraphs:
|
||||
para_size = len(para)
|
||||
|
||||
if current_size + para_size > max_size and current_chunk:
|
||||
chunks.append("\n\n".join(current_chunk))
|
||||
current_chunk = []
|
||||
current_size = 0
|
||||
|
||||
current_chunk.append(para)
|
||||
current_size += para_size
|
||||
|
||||
if current_chunk:
|
||||
chunks.append("\n\n".join(current_chunk))
|
||||
|
||||
return chunks
|
||||
|
||||
def _split_paragraphs(self, content: str) -> List[str]:
|
||||
"""Split content into paragraphs.
|
||||
|
||||
Args:
|
||||
content: Section content
|
||||
|
||||
Returns:
|
||||
List of paragraphs
|
||||
"""
|
||||
paragraphs = []
|
||||
current_lines = []
|
||||
|
||||
for line in content.split("\n"):
|
||||
stripped = line.strip()
|
||||
if stripped:
|
||||
current_lines.append(line)
|
||||
elif current_lines:
|
||||
paragraphs.append("\n".join(current_lines))
|
||||
current_lines = []
|
||||
|
||||
if current_lines:
|
||||
paragraphs.append("\n".join(current_lines))
|
||||
|
||||
return paragraphs
|
||||
|
||||
def _is_supported_file(self, path: Path) -> bool:
|
||||
"""Check if the file is a supported Markdown file.
|
||||
|
||||
Args:
|
||||
path: Path to the file
|
||||
|
||||
Returns:
|
||||
True if the file extension is supported
|
||||
"""
|
||||
return path.suffix.lower() in self.SUPPORTED_EXTENSIONS
|
||||
|
||||
def get_documents(self) -> List[Document]:
|
||||
"""Get all indexed documents.
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
return self._documents
|
||||
23
src/local_api_docs_search/main.py
Normal file
23
src/local_api_docs_search/main.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""CLI entry point."""
|
||||
|
||||
import sys
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for the CLI."""
|
||||
from local_api_docs_search.cli.commands import cli
|
||||
|
||||
try:
|
||||
cli.main(prog_name="api-docs")
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
src/local_api_docs_search/models/__init__.py
Normal file
1
src/local_api_docs_search/models/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Data models package."""
|
||||
94
src/local_api_docs_search/models/document.py
Normal file
94
src/local_api_docs_search/models/document.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Document models for indexed documentation."""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class SourceType(str, Enum):
|
||||
"""Enumeration of supported documentation source types."""
|
||||
|
||||
OPENAPI = "openapi"
|
||||
README = "readme"
|
||||
CODE = "code"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Document:
|
||||
"""Represents an indexed document chunk."""
|
||||
|
||||
id: str
|
||||
content: str
|
||||
source_type: SourceType
|
||||
title: str
|
||||
file_path: str = ""
|
||||
metadata: dict = field(default_factory=dict)
|
||||
created_at: datetime = field(default_factory=datetime.utcnow)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert document to dictionary for serialization."""
|
||||
return {
|
||||
"id": self.id,
|
||||
"content": self.content,
|
||||
"source_type": self.source_type.value,
|
||||
"title": self.title,
|
||||
"file_path": self.file_path,
|
||||
"metadata": self.metadata,
|
||||
"created_at": self.created_at.isoformat(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "Document":
|
||||
"""Create document from dictionary."""
|
||||
return cls(
|
||||
id=data["id"],
|
||||
content=data["content"],
|
||||
source_type=SourceType(data["source_type"]),
|
||||
title=data["title"],
|
||||
file_path=data.get("file_path", ""),
|
||||
metadata=data.get("metadata", {}),
|
||||
created_at=datetime.fromisoformat(data["created_at"]),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchResult:
|
||||
"""Represents a search result with relevance score."""
|
||||
|
||||
document: Document
|
||||
score: float
|
||||
highlights: list[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert search result to dictionary."""
|
||||
return {
|
||||
"id": self.document.id,
|
||||
"content": self.document.content,
|
||||
"source_type": self.document.source_type.value,
|
||||
"title": self.document.title,
|
||||
"file_path": self.document.file_path,
|
||||
"score": self.score,
|
||||
"highlights": self.highlights,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class IndexStats:
|
||||
"""Statistics about the indexed collection."""
|
||||
|
||||
total_documents: int = 0
|
||||
openapi_count: int = 0
|
||||
readme_count: int = 0
|
||||
code_count: int = 0
|
||||
last_indexed: Optional[datetime] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert stats to dictionary."""
|
||||
return {
|
||||
"total_documents": self.total_documents,
|
||||
"openapi_count": self.openapi_count,
|
||||
"readme_count": self.readme_count,
|
||||
"code_count": self.code_count,
|
||||
"last_indexed": self.last_indexed.isoformat() if self.last_indexed else None,
|
||||
}
|
||||
1
src/local_api_docs_search/search/__init__.py
Normal file
1
src/local_api_docs_search/search/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Search package for embeddings and vector search."""
|
||||
117
src/local_api_docs_search/search/embeddings.py
Normal file
117
src/local_api_docs_search/search/embeddings.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""Embedding model management using sentence-transformers."""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EmbeddingManager:
|
||||
"""Manages local embedding models for semantic search."""
|
||||
|
||||
DEFAULT_MODEL = "all-MiniLM-L6-v2"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_name: Optional[str] = None,
|
||||
device: Optional[str] = None,
|
||||
cache_dir: Optional[Path] = None,
|
||||
):
|
||||
"""Initialize the embedding manager.
|
||||
|
||||
Args:
|
||||
model_name: Name of the model to use (default: all-MiniLM-L6-v2)
|
||||
device: Device to run on (cpu, cuda, auto)
|
||||
cache_dir: Directory to cache models
|
||||
"""
|
||||
self._model_name = model_name or self.DEFAULT_MODEL
|
||||
self._device = device or "cpu"
|
||||
self._cache_dir = cache_dir
|
||||
self._model: Optional[SentenceTransformer] = None
|
||||
|
||||
@property
|
||||
def model_name(self) -> str:
|
||||
"""Get the model name."""
|
||||
return self._model_name
|
||||
|
||||
@property
|
||||
def device(self) -> str:
|
||||
"""Get the device being used."""
|
||||
return self._device
|
||||
|
||||
def load_model(self, force_download: bool = False) -> SentenceTransformer:
|
||||
"""Load the embedding model.
|
||||
|
||||
Args:
|
||||
force_download: Force re-download of the model
|
||||
|
||||
Returns:
|
||||
Loaded SentenceTransformer model
|
||||
"""
|
||||
if self._model is not None and not force_download:
|
||||
return self._model
|
||||
|
||||
try:
|
||||
model_kwargs = {"device": self._device}
|
||||
if self._cache_dir:
|
||||
model_kwargs["cache_folder"] = str(self._cache_dir)
|
||||
|
||||
self._model = SentenceTransformer(self._model_name, **model_kwargs)
|
||||
logger.info(f"Loaded embedding model: {self._model_name} on {self._device}")
|
||||
return self._model
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load model {self._model_name}: {e}")
|
||||
raise
|
||||
|
||||
def embed(self, texts: List[str], show_progress: bool = False) -> List[List[float]]:
|
||||
"""Generate embeddings for a list of texts.
|
||||
|
||||
Args:
|
||||
texts: List of text strings to embed
|
||||
show_progress: Show progress bar
|
||||
|
||||
Returns:
|
||||
List of embedding vectors
|
||||
"""
|
||||
if not texts:
|
||||
return []
|
||||
|
||||
model = self.load_model()
|
||||
embeddings = model.encode(
|
||||
texts,
|
||||
show_progress_bar=show_progress,
|
||||
convert_to_numpy=True,
|
||||
)
|
||||
return embeddings.tolist()
|
||||
|
||||
def embed_query(self, query: str) -> List[float]:
|
||||
"""Generate embedding for a single query.
|
||||
|
||||
Args:
|
||||
query: Query string
|
||||
|
||||
Returns:
|
||||
Embedding vector
|
||||
"""
|
||||
return self.embed([query])[0]
|
||||
|
||||
def get_embedding_dim(self) -> int:
|
||||
"""Get the embedding dimension.
|
||||
|
||||
Returns:
|
||||
Dimension of the embedding vectors
|
||||
"""
|
||||
model = self.load_model()
|
||||
return model.get_sentence_embedding_dimension()
|
||||
|
||||
def unload_model(self) -> None:
|
||||
"""Unload the model to free memory."""
|
||||
self._model = None
|
||||
logger.info("Unloaded embedding model")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"EmbeddingManager(model={self._model_name}, device={self._device})"
|
||||
368
src/local_api_docs_search/search/searcher.py
Normal file
368
src/local_api_docs_search/search/searcher.py
Normal file
@@ -0,0 +1,368 @@
|
||||
"""Search logic with semantic similarity and hybrid search."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from local_api_docs_search.models.document import Document, SearchResult, SourceType
|
||||
from local_api_docs_search.search.embeddings import EmbeddingManager
|
||||
from local_api_docs_search.search.vectorstore import VectorStore
|
||||
from local_api_docs_search.utils.config import get_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchOptions:
|
||||
"""Options for search operations."""
|
||||
|
||||
limit: int = 10
|
||||
source_type: Optional[SourceType] = None
|
||||
min_score: float = 0.0
|
||||
include_scores: bool = True
|
||||
|
||||
|
||||
class Searcher:
|
||||
"""Main search class for semantic and hybrid search."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
embedding_manager: Optional[EmbeddingManager] = None,
|
||||
vector_store: Optional[VectorStore] = None,
|
||||
config_path: Optional[Path] = None,
|
||||
):
|
||||
"""Initialize the searcher.
|
||||
|
||||
Args:
|
||||
embedding_manager: Embedding manager instance
|
||||
vector_store: Vector store instance
|
||||
config_path: Path to configuration file
|
||||
"""
|
||||
config = get_config(config_path)
|
||||
|
||||
self._embedding_manager = embedding_manager or EmbeddingManager(
|
||||
model_name=config.model_name,
|
||||
device=config.embedding_device,
|
||||
cache_dir=config.chroma_persist_dir / ".cache",
|
||||
)
|
||||
|
||||
self._vector_store = vector_store or VectorStore(
|
||||
persist_dir=config.chroma_persist_dir,
|
||||
)
|
||||
|
||||
self._config = config
|
||||
|
||||
def search(
|
||||
self, query: str, options: Optional[SearchOptions] = None
|
||||
) -> List[SearchResult]:
|
||||
"""Perform semantic search for a query.
|
||||
|
||||
Args:
|
||||
query: Search query string
|
||||
options: Search options
|
||||
|
||||
Returns:
|
||||
List of SearchResult objects
|
||||
"""
|
||||
if options is None:
|
||||
options = SearchOptions(limit=self._config.default_limit)
|
||||
|
||||
if not query.strip():
|
||||
return []
|
||||
|
||||
try:
|
||||
query_embedding = self._embedding_manager.embed_query(query)
|
||||
|
||||
results = self._vector_store.search(
|
||||
query_embedding=query_embedding,
|
||||
n_results=options.limit * 2,
|
||||
source_type=options.source_type,
|
||||
)
|
||||
|
||||
search_results = []
|
||||
for result in results:
|
||||
if options.min_score > 0 and result["score"] < options.min_score:
|
||||
continue
|
||||
|
||||
doc = Document(
|
||||
id=result["id"],
|
||||
content=result["content"],
|
||||
source_type=SourceType(result["metadata"]["source_type"]),
|
||||
title=result["metadata"]["title"],
|
||||
file_path=result["metadata"]["file_path"],
|
||||
metadata={
|
||||
k: v
|
||||
for k, v in result["metadata"].items()
|
||||
if k not in ["source_type", "title", "file_path"]
|
||||
},
|
||||
)
|
||||
|
||||
highlights = self._generate_highlights(query, result["content"])
|
||||
|
||||
search_results.append(
|
||||
SearchResult(
|
||||
document=doc,
|
||||
score=result["score"],
|
||||
highlights=highlights,
|
||||
)
|
||||
)
|
||||
|
||||
if len(search_results) >= options.limit:
|
||||
break
|
||||
|
||||
return search_results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Search failed for query '{query}': {e}")
|
||||
return []
|
||||
|
||||
def hybrid_search(
|
||||
self, query: str, options: Optional[SearchOptions] = None
|
||||
) -> List[SearchResult]:
|
||||
"""Perform hybrid search combining semantic and keyword search.
|
||||
|
||||
Args:
|
||||
query: Search query string
|
||||
options: Search options
|
||||
|
||||
Returns:
|
||||
List of SearchResult objects sorted by combined relevance
|
||||
"""
|
||||
if options is None:
|
||||
options = SearchOptions(limit=self._config.default_limit)
|
||||
|
||||
semantic_results = self.search(query, options)
|
||||
|
||||
if not query.strip():
|
||||
return semantic_results
|
||||
|
||||
keyword_results = self._keyword_search(query, options)
|
||||
|
||||
combined = {}
|
||||
for result in semantic_results:
|
||||
combined[result.document.id] = result
|
||||
|
||||
for result in keyword_results:
|
||||
if result.document.id in combined:
|
||||
existing = combined[result.document.id]
|
||||
combined[result.document.id] = SearchResult(
|
||||
document=result.document,
|
||||
score=(existing.score + result.score) / 2,
|
||||
highlights=list(set(existing.highlights + result.highlights)),
|
||||
)
|
||||
else:
|
||||
combined[result.document.id] = result
|
||||
|
||||
sorted_results = sorted(
|
||||
combined.values(), key=lambda r: r.score, reverse=True
|
||||
)
|
||||
|
||||
return sorted_results[: options.limit]
|
||||
|
||||
def _keyword_search(
|
||||
self, query: str, options: SearchOptions
|
||||
) -> List[SearchResult]:
|
||||
"""Perform keyword-based search.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
options: Search options
|
||||
|
||||
Returns:
|
||||
List of SearchResult objects
|
||||
"""
|
||||
keywords = self._extract_keywords(query)
|
||||
|
||||
if not keywords:
|
||||
return []
|
||||
|
||||
try:
|
||||
all_docs = self._vector_store.get_all_documents(limit=1000)
|
||||
|
||||
results = []
|
||||
for doc in all_docs:
|
||||
if options.source_type and doc.source_type != options.source_type:
|
||||
continue
|
||||
|
||||
keyword_score = self._calculate_keyword_score(keywords, doc.content)
|
||||
if keyword_score > 0:
|
||||
highlights = self._generate_highlights(query, doc.content)
|
||||
results.append(
|
||||
SearchResult(
|
||||
document=doc,
|
||||
score=keyword_score,
|
||||
highlights=highlights,
|
||||
)
|
||||
)
|
||||
|
||||
results.sort(key=lambda r: r.score, reverse=True)
|
||||
return results[: options.limit]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Keyword search failed: {e}")
|
||||
return []
|
||||
|
||||
def _extract_keywords(self, query: str) -> List[str]:
|
||||
"""Extract keywords from a query.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
|
||||
Returns:
|
||||
List of keywords
|
||||
"""
|
||||
stop_words = {
|
||||
"a", "an", "the", "and", "or", "but", "in", "on", "at", "to", "for",
|
||||
"of", "with", "by", "from", "up", "about", "into", "through", "during",
|
||||
"how", "what", "when", "where", "why", "which", "who", "whom",
|
||||
"this", "that", "these", "those", "is", "are", "was", "were", "be",
|
||||
"been", "being", "have", "has", "had", "do", "does", "did", "will",
|
||||
"would", "could", "should", "may", "might", "must", "shall", "can",
|
||||
}
|
||||
|
||||
words = re.findall(r"\b\w+\b", query.lower())
|
||||
keywords = [w for w in words if w not in stop_words and len(w) > 1]
|
||||
|
||||
return keywords
|
||||
|
||||
def _calculate_keyword_score(self, keywords: List[str], content: str) -> float:
|
||||
"""Calculate keyword matching score.
|
||||
|
||||
Args:
|
||||
keywords: List of keywords
|
||||
content: Document content
|
||||
|
||||
Returns:
|
||||
Score between 0 and 1
|
||||
"""
|
||||
if not keywords:
|
||||
return 0.0
|
||||
|
||||
content_lower = content.lower()
|
||||
|
||||
matched_keywords = sum(1 for kw in keywords if kw in content_lower)
|
||||
|
||||
keyword_density = matched_keywords / len(keywords)
|
||||
|
||||
exact_phrase = " ".join(keywords)
|
||||
if exact_phrase in content_lower:
|
||||
return min(1.0, keyword_density + 0.3)
|
||||
|
||||
return keyword_density
|
||||
|
||||
def _generate_highlights(self, query: str, content: str) -> List[str]:
|
||||
"""Generate highlight snippets for a query.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
content: Document content
|
||||
|
||||
Returns:
|
||||
List of highlight strings
|
||||
"""
|
||||
keywords = self._extract_keywords(query)
|
||||
if not keywords:
|
||||
return []
|
||||
|
||||
highlights = []
|
||||
content_lower = content.lower()
|
||||
|
||||
for keyword in keywords[:3]:
|
||||
pattern = re.compile(re.escape(keyword), re.IGNORECASE)
|
||||
for match in pattern.finditer(content_lower):
|
||||
start = max(0, match.start() - 30)
|
||||
end = min(len(content), match.end() + 30)
|
||||
snippet = content[start:end]
|
||||
if start > 0:
|
||||
snippet = "..." + snippet
|
||||
if end < len(content):
|
||||
snippet = snippet + "..."
|
||||
highlights.append(snippet)
|
||||
|
||||
return highlights[:5]
|
||||
|
||||
def index(
|
||||
self,
|
||||
path: Path,
|
||||
doc_type: str = "all",
|
||||
recursive: bool = False,
|
||||
batch_size: int = 32,
|
||||
) -> int:
|
||||
"""Index documents from a path.
|
||||
|
||||
Args:
|
||||
path: Path to file or directory
|
||||
doc_type: Type of documents (openapi, readme, code, all)
|
||||
recursive: Search recursively
|
||||
batch_size: Batch size for indexing
|
||||
|
||||
Returns:
|
||||
Number of documents indexed
|
||||
"""
|
||||
from local_api_docs_search.indexer.openapi import OpenAPIIndexer
|
||||
from local_api_docs_search.indexer.readme import READMEIndexer
|
||||
from local_api_docs_search.indexer.code import CodeIndexer
|
||||
|
||||
indexers = []
|
||||
|
||||
if doc_type in ("openapi", "all"):
|
||||
indexers.append(OpenAPIIndexer())
|
||||
if doc_type in ("readme", "all"):
|
||||
indexers.append(READMEIndexer())
|
||||
if doc_type in ("code", "all"):
|
||||
indexers.append(CodeIndexer())
|
||||
|
||||
all_documents = []
|
||||
|
||||
for indexer in indexers:
|
||||
documents = indexer.index(path, recursive=recursive, batch_size=batch_size)
|
||||
all_documents.extend(documents)
|
||||
|
||||
if not all_documents:
|
||||
logger.warning("No documents found to index")
|
||||
return 0
|
||||
|
||||
texts = [doc.content for doc in all_documents]
|
||||
embeddings = self._embedding_manager.embed(texts, show_progress=True)
|
||||
|
||||
self._vector_store.add_documents(all_documents, embeddings, batch_size=batch_size)
|
||||
|
||||
logger.info(f"Indexed {len(all_documents)} documents")
|
||||
return len(all_documents)
|
||||
|
||||
def get_stats(self):
|
||||
"""Get index statistics.
|
||||
|
||||
Returns:
|
||||
IndexStats object
|
||||
"""
|
||||
return self._vector_store.get_stats()
|
||||
|
||||
def clear_index(self) -> bool:
|
||||
"""Clear the entire index.
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
return self._vector_store.delete_index()
|
||||
|
||||
def list_documents(
|
||||
self, source_type: Optional[SourceType] = None, limit: int = 100
|
||||
) -> List[Document]:
|
||||
"""List indexed documents.
|
||||
|
||||
Args:
|
||||
source_type: Optional filter by source type
|
||||
limit: Maximum results
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
docs = self._vector_store.get_all_documents(limit=limit * 2)
|
||||
|
||||
if source_type:
|
||||
docs = [d for d in docs if d.source_type == source_type]
|
||||
|
||||
return docs[:limit]
|
||||
305
src/local_api_docs_search/search/vectorstore.py
Normal file
305
src/local_api_docs_search/search/vectorstore.py
Normal file
@@ -0,0 +1,305 @@
|
||||
"""Vector storage operations using ChromaDB."""
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import chromadb
|
||||
from chromadb.config import Settings
|
||||
|
||||
from local_api_docs_search.models.document import Document, IndexStats, SourceType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VectorStore:
|
||||
"""ChromaDB-based vector storage for document embeddings."""
|
||||
|
||||
COLLECTION_NAME = "api_docs"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
persist_dir: Path,
|
||||
collection_name: Optional[str] = None,
|
||||
):
|
||||
"""Initialize the vector store.
|
||||
|
||||
Args:
|
||||
persist_dir: Directory for persistence
|
||||
collection_name: Name of the collection (default: api_docs)
|
||||
"""
|
||||
self._persist_dir = Path(persist_dir)
|
||||
self._persist_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._collection_name = collection_name or self.COLLECTION_NAME
|
||||
self._client: Optional[chromadb.Client] = None
|
||||
self._collection: Optional[chromadb.Collection] = None
|
||||
|
||||
def _get_client(self) -> chromadb.Client:
|
||||
"""Get or create the ChromaDB client."""
|
||||
if self._client is None:
|
||||
self._client = chromadb.Client(
|
||||
Settings(
|
||||
persist_directory=str(self._persist_dir),
|
||||
anonymized_telemetry=False,
|
||||
)
|
||||
)
|
||||
return self._client
|
||||
|
||||
def _get_collection(self) -> chromadb.Collection:
|
||||
"""Get or create the collection."""
|
||||
if self._collection is None:
|
||||
client = self._get_client()
|
||||
try:
|
||||
self._collection = client.get_collection(self._collection_name)
|
||||
except ValueError:
|
||||
self._collection = client.create_collection(self._collection_name)
|
||||
logger.info(f"Created new collection: {self._collection_name}")
|
||||
return self._collection
|
||||
|
||||
def add_documents(
|
||||
self,
|
||||
documents: List[Document],
|
||||
embeddings: List[List[float]],
|
||||
batch_size: int = 100,
|
||||
) -> int:
|
||||
"""Add documents and their embeddings to the store.
|
||||
|
||||
Args:
|
||||
documents: List of Document objects
|
||||
embeddings: List of embedding vectors
|
||||
batch_size: Documents per batch
|
||||
|
||||
Returns:
|
||||
Number of documents added
|
||||
"""
|
||||
if not documents:
|
||||
return 0
|
||||
|
||||
collection = self._get_collection()
|
||||
|
||||
total_added = 0
|
||||
for i in range(0, len(documents), batch_size):
|
||||
batch_docs = documents[i : i + batch_size]
|
||||
batch_embeddings = embeddings[i : i + batch_size]
|
||||
|
||||
ids = [doc.id for doc in batch_docs]
|
||||
contents = [doc.content for doc in batch_docs]
|
||||
metadatas = [
|
||||
{
|
||||
"source_type": doc.source_type.value,
|
||||
"title": doc.title,
|
||||
"file_path": doc.file_path,
|
||||
**doc.metadata,
|
||||
}
|
||||
for doc in batch_docs
|
||||
]
|
||||
|
||||
try:
|
||||
collection.add(
|
||||
ids=ids,
|
||||
documents=contents,
|
||||
embeddings=batch_embeddings,
|
||||
metadatas=metadatas,
|
||||
)
|
||||
total_added += len(batch_docs)
|
||||
logger.debug(f"Added batch of {len(batch_docs)} documents")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add batch: {e}")
|
||||
|
||||
logger.info(f"Added {total_added} documents to collection")
|
||||
return total_added
|
||||
|
||||
def search(
|
||||
self,
|
||||
query_embedding: List[float],
|
||||
n_results: int = 10,
|
||||
source_type: Optional[SourceType] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Search for similar documents.
|
||||
|
||||
Args:
|
||||
query_embedding: Query embedding vector
|
||||
n_results: Number of results to return
|
||||
source_type: Optional filter by source type
|
||||
|
||||
Returns:
|
||||
List of search results with documents and scores
|
||||
"""
|
||||
collection = self._get_collection()
|
||||
|
||||
where_filter = None
|
||||
if source_type:
|
||||
where_filter = {"source_type": source_type.value}
|
||||
|
||||
try:
|
||||
results = collection.query(
|
||||
query_embeddings=[query_embedding],
|
||||
n_results=n_results,
|
||||
where=where_filter,
|
||||
include=["documents", "metadatas", "distances"],
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Search failed: {e}")
|
||||
return []
|
||||
|
||||
search_results = []
|
||||
if results["ids"] and results["ids"][0]:
|
||||
for i in range(len(results["ids"][0])):
|
||||
result = {
|
||||
"id": results["ids"][0][i],
|
||||
"content": results["documents"][0][i],
|
||||
"metadata": results["metadatas"][0][i],
|
||||
"distance": results["distances"][0][i],
|
||||
"score": 1.0 - results["distances"][0][i],
|
||||
}
|
||||
search_results.append(result)
|
||||
|
||||
return search_results
|
||||
|
||||
def delete_index(self) -> bool:
|
||||
"""Delete the entire index.
|
||||
|
||||
Returns:
|
||||
True if successful
|
||||
"""
|
||||
try:
|
||||
client = self._get_client()
|
||||
client.delete_collection(self._collection_name)
|
||||
self._collection = None
|
||||
logger.info(f"Deleted collection: {self._collection_name}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete collection: {e}")
|
||||
return False
|
||||
|
||||
def get_stats(self) -> IndexStats:
|
||||
"""Get statistics about the index.
|
||||
|
||||
Returns:
|
||||
IndexStats object
|
||||
"""
|
||||
collection = self._get_collection()
|
||||
|
||||
total = collection.count()
|
||||
|
||||
source_counts = {type.value: 0 for type in SourceType}
|
||||
|
||||
try:
|
||||
all_metadata = collection.get(include=["metadatas"])
|
||||
for metadata in all_metadata.get("metadatas", []):
|
||||
source_type = metadata.get("source_type")
|
||||
if source_type in source_counts:
|
||||
source_counts[source_type] += 1
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get source counts: {e}")
|
||||
|
||||
return IndexStats(
|
||||
total_documents=total,
|
||||
openapi_count=source_counts[SourceType.OPENAPI.value],
|
||||
readme_count=source_counts[SourceType.README.value],
|
||||
code_count=source_counts[SourceType.CODE.value],
|
||||
)
|
||||
|
||||
def get_all_documents(
|
||||
self, limit: int = 1000, offset: int = 0
|
||||
) -> List[Document]:
|
||||
"""Get all documents from the store.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of documents
|
||||
offset: Offset for pagination
|
||||
|
||||
Returns:
|
||||
List of Document objects
|
||||
"""
|
||||
collection = self._get_collection()
|
||||
|
||||
try:
|
||||
results = collection.get(limit=limit, offset=offset, include=["documents", "metadatas"])
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get documents: {e}")
|
||||
return []
|
||||
|
||||
documents = []
|
||||
for i in range(len(results["ids"])):
|
||||
metadata = results["metadatas"][i]
|
||||
doc = Document(
|
||||
id=results["ids"][i],
|
||||
content=results["documents"][i],
|
||||
source_type=SourceType(metadata["source_type"]),
|
||||
title=metadata["title"],
|
||||
file_path=metadata["file_path"],
|
||||
metadata={k: v for k, v in metadata.items() if k not in ["source_type", "title", "file_path"]},
|
||||
)
|
||||
documents.append(doc)
|
||||
|
||||
return documents
|
||||
|
||||
def delete_by_ids(self, ids: List[str]) -> int:
|
||||
"""Delete documents by IDs.
|
||||
|
||||
Args:
|
||||
ids: List of document IDs to delete
|
||||
|
||||
Returns:
|
||||
Number of documents deleted
|
||||
"""
|
||||
if not ids:
|
||||
return 0
|
||||
|
||||
collection = self._get_collection()
|
||||
|
||||
try:
|
||||
collection.delete(ids=ids)
|
||||
logger.info(f"Deleted {len(ids)} documents")
|
||||
return len(ids)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete documents: {e}")
|
||||
return 0
|
||||
|
||||
def delete_by_source_type(self, source_type: SourceType) -> int:
|
||||
"""Delete all documents of a given source type.
|
||||
|
||||
Args:
|
||||
source_type: Source type to delete
|
||||
|
||||
Returns:
|
||||
Number of documents deleted
|
||||
"""
|
||||
collection = self._get_collection()
|
||||
|
||||
try:
|
||||
results = collection.get(where={"source_type": source_type.value})
|
||||
if results["ids"]:
|
||||
return self.delete_by_ids(results["ids"])
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete by source type: {e}")
|
||||
|
||||
return 0
|
||||
|
||||
def exists(self) -> bool:
|
||||
"""Check if the collection exists.
|
||||
|
||||
Returns:
|
||||
True if collection exists
|
||||
"""
|
||||
try:
|
||||
client = self._get_client()
|
||||
client.get_collection(self._collection_name)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
def count(self) -> int:
|
||||
"""Get the document count.
|
||||
|
||||
Returns:
|
||||
Number of documents in the store
|
||||
"""
|
||||
collection = self._get_collection()
|
||||
return collection.count()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the client connection."""
|
||||
self._client = None
|
||||
self._collection = None
|
||||
1
src/local_api_docs_search/utils/__init__.py
Normal file
1
src/local_api_docs_search/utils/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Utility functions package."""
|
||||
133
src/local_api_docs_search/utils/config.py
Normal file
133
src/local_api_docs_search/utils/config.py
Normal file
@@ -0,0 +1,133 @@
|
||||
"""Configuration management for the application."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
import yaml
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
class Config:
|
||||
"""Configuration management class supporting env vars and YAML config."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config_path: Optional[Path] = None,
|
||||
env_path: Optional[Path] = None,
|
||||
):
|
||||
self._config: dict[str, Any] = {}
|
||||
self._config_path = config_path or Path.cwd() / "config.yaml"
|
||||
self._load_env(env_path)
|
||||
self._load_config()
|
||||
|
||||
def _load_env(self, env_path: Optional[Path] = None) -> None:
|
||||
"""Load environment variables from .env file."""
|
||||
env_file = env_path or Path.cwd() / ".env"
|
||||
if env_file.exists():
|
||||
load_dotenv(env_file)
|
||||
|
||||
def _load_config(self) -> None:
|
||||
"""Load configuration from YAML file."""
|
||||
if self._config_path.exists():
|
||||
with open(self._config_path, "r") as f:
|
||||
self._config = yaml.safe_load(f) or {}
|
||||
else:
|
||||
self._config = {}
|
||||
|
||||
def get(self, key: str, default: Any = None) -> Any:
|
||||
"""Get configuration value with environment variable override."""
|
||||
env_key = f"API_DOCS_{key.upper()}"
|
||||
env_value = os.environ.get(env_key)
|
||||
|
||||
if env_value is not None:
|
||||
return self._cast_env_value(env_value)
|
||||
|
||||
return self._config.get(key, default)
|
||||
|
||||
def _cast_env_value(self, value: str) -> Any:
|
||||
"""Cast environment variable string to appropriate type."""
|
||||
if value.lower() in ("true", "false"):
|
||||
return value.lower() == "true"
|
||||
try:
|
||||
return int(value)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
return float(value)
|
||||
except ValueError:
|
||||
pass
|
||||
return value
|
||||
|
||||
@property
|
||||
def index_path(self) -> Path:
|
||||
"""Get the documentation index path."""
|
||||
return Path(self.get("index_path", "./docs"))
|
||||
|
||||
@property
|
||||
def model_name(self) -> str:
|
||||
"""Get the embedding model name."""
|
||||
return self.get("model_name", "all-MiniLM-L6-v2")
|
||||
|
||||
@property
|
||||
def embedding_device(self) -> str:
|
||||
"""Get the embedding device."""
|
||||
return self.get("embedding_device", "cpu")
|
||||
|
||||
@property
|
||||
def chroma_persist_dir(self) -> Path:
|
||||
"""Get the ChromaDB persistence directory."""
|
||||
return Path(self.get("chroma_persist_dir", ".api-docs/chroma"))
|
||||
|
||||
@property
|
||||
def default_limit(self) -> int:
|
||||
"""Get the default search result limit."""
|
||||
return int(self.get("default_limit", 10))
|
||||
|
||||
@property
|
||||
def verbose(self) -> bool:
|
||||
"""Get verbose mode setting."""
|
||||
return self.get("verbose", False)
|
||||
|
||||
def set(self, key: str, value: Any) -> None:
|
||||
"""Set a configuration value."""
|
||||
self._config[key] = value
|
||||
|
||||
def save(self) -> None:
|
||||
"""Save configuration to YAML file."""
|
||||
with open(self._config_path, "w") as f:
|
||||
yaml.dump(self._config, f, default_flow_style=False)
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset configuration to defaults."""
|
||||
self._config = {}
|
||||
if self._config_path.exists():
|
||||
self._config_path.unlink()
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Return configuration as dictionary."""
|
||||
return {
|
||||
"index_path": str(self.index_path),
|
||||
"model_name": self.model_name,
|
||||
"embedding_device": self.embedding_device,
|
||||
"chroma_persist_dir": str(self.chroma_persist_dir),
|
||||
"default_limit": self.default_limit,
|
||||
"verbose": self.verbose,
|
||||
}
|
||||
|
||||
|
||||
_config: Optional[Config] = None
|
||||
|
||||
|
||||
def get_config(config_path: Optional[Path] = None) -> Config:
|
||||
"""Get or create the global configuration instance."""
|
||||
global _config
|
||||
if _config is None:
|
||||
_config = Config(config_path)
|
||||
return _config
|
||||
|
||||
|
||||
def reset_config() -> None:
|
||||
"""Reset the global configuration instance."""
|
||||
global _config
|
||||
_config = None
|
||||
122
src/local_api_docs_search/utils/formatters.py
Normal file
122
src/local_api_docs_search/utils/formatters.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Output formatting utilities using Rich."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
from rich.theme import Theme
|
||||
|
||||
from local_api_docs_search.models.document import Document, SearchResult, SourceType
|
||||
|
||||
console = Console()
|
||||
|
||||
CUSTOM_THEME = Theme({
|
||||
"title": "bold cyan",
|
||||
"subtitle": "dim white",
|
||||
"highlight": "yellow",
|
||||
"source_openapi": "green",
|
||||
"source_readme": "blue",
|
||||
"source_code": "magenta",
|
||||
})
|
||||
|
||||
|
||||
def format_document_for_display(doc: Document, score: float = 0.0) -> Table:
|
||||
"""Format a document for display in a table."""
|
||||
table = Table(show_header=False, box=None, padding=(0, 1))
|
||||
table.add_column("Label", style="dim")
|
||||
table.add_column("Value")
|
||||
|
||||
source_style = get_source_style(doc.source_type)
|
||||
|
||||
table.add_row("Title", Text(doc.title, style="bold"))
|
||||
table.add_row("Type", Text(doc.source_type.value, style=source_style))
|
||||
table.add_row("File", Text(doc.file_path, style="dim"))
|
||||
|
||||
if score > 0:
|
||||
table.add_row("Score", f"{score:.4f}")
|
||||
|
||||
content_preview = doc.content[:200] + "..." if len(doc.content) > 200 else doc.content
|
||||
table.add_row("Content", content_preview)
|
||||
|
||||
return table
|
||||
|
||||
|
||||
def get_source_style(source_type: SourceType) -> str:
|
||||
"""Get the Rich style for a source type."""
|
||||
style_map = {
|
||||
SourceType.OPENAPI: "source_openapi",
|
||||
SourceType.README: "source_readme",
|
||||
SourceType.CODE: "source_code",
|
||||
}
|
||||
return style_map.get(source_type, "white")
|
||||
|
||||
|
||||
def format_search_results(results: list[SearchResult], show_scores: bool = True) -> Table:
|
||||
"""Format search results as a table."""
|
||||
table = Table(title="Search Results", show_lines=True)
|
||||
table.add_column("#", width=4, style="dim")
|
||||
table.add_column("Title", style="bold")
|
||||
table.add_column("Type", width=8)
|
||||
table.add_column("Preview")
|
||||
|
||||
for i, result in enumerate(results, 1):
|
||||
source_style = get_source_style(result.document.source_type)
|
||||
preview = result.document.content[:150]
|
||||
if len(result.document.content) > 150:
|
||||
preview += "..."
|
||||
|
||||
table.add_row(
|
||||
str(i),
|
||||
Text(result.document.title, style="bold"),
|
||||
Text(result.document.source_type.value, style=source_style),
|
||||
preview,
|
||||
)
|
||||
|
||||
return table
|
||||
|
||||
|
||||
def format_index_summary(
|
||||
total: int, openapi: int, readme: int, code: int
|
||||
) -> Table:
|
||||
"""Format index statistics as a table."""
|
||||
table = Table(title="Index Summary", show_header=False)
|
||||
table.add_column("Metric", style="dim")
|
||||
table.add_column("Count", justify="right")
|
||||
|
||||
table.add_row("Total Documents", str(total))
|
||||
table.add_row("OpenAPI Specs", str(openapi))
|
||||
table.add_row("README Files", str(readme))
|
||||
table.add_row("Code Comments", str(code))
|
||||
|
||||
return table
|
||||
|
||||
|
||||
def format_error(message: str) -> Text:
|
||||
"""Format an error message."""
|
||||
return Text(f"Error: {message}", style="red bold")
|
||||
|
||||
|
||||
def format_success(message: str) -> Text:
|
||||
"""Format a success message."""
|
||||
return Text(message, style="green bold")
|
||||
|
||||
|
||||
def format_info(message: str) -> Text:
|
||||
"""Format an info message."""
|
||||
return Text(message, style="cyan")
|
||||
|
||||
|
||||
def print_json(data: Any) -> None:
|
||||
"""Print data as JSON."""
|
||||
console.print_json(data=data)
|
||||
|
||||
|
||||
def format_help_header(command: str, description: str) -> Text:
|
||||
"""Format a help header for a command."""
|
||||
header = Text.assemble(
|
||||
(f"$ api-docs {command}", "bold yellow"),
|
||||
" — ",
|
||||
(description, "italic"),
|
||||
)
|
||||
return header
|
||||
@@ -1,7 +1,6 @@
|
||||
"""CLI entry point."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@@ -66,8 +66,6 @@ def format_search_results(results: list[SearchResult], show_scores: bool = True)
|
||||
if len(result.document.content) > 150:
|
||||
preview += "..."
|
||||
|
||||
score_str = f"{result.score:.4f}" if show_scores else ""
|
||||
|
||||
table.add_row(
|
||||
str(i),
|
||||
Text(result.document.title, style="bold"),
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Pytest configuration and fixtures."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
209
tests/fixtures/sample_code.py
vendored
Normal file
209
tests/fixtures/sample_code.py
vendored
Normal file
@@ -0,0 +1,209 @@
|
||||
"""Sample Python module for testing the code indexer."""
|
||||
|
||||
|
||||
def add(a, b):
|
||||
"""Add two numbers together.
|
||||
|
||||
Args:
|
||||
a: First number to add
|
||||
b: Second number to add
|
||||
|
||||
Returns:
|
||||
The sum of a and b
|
||||
|
||||
Example:
|
||||
>>> add(2, 3)
|
||||
5
|
||||
"""
|
||||
return a + b
|
||||
|
||||
|
||||
def multiply(a, b):
|
||||
"""Multiply two numbers.
|
||||
|
||||
Args:
|
||||
a: First number
|
||||
b: Second number
|
||||
|
||||
Returns:
|
||||
The product of a and b
|
||||
"""
|
||||
return a * b
|
||||
|
||||
|
||||
def greet(name: str, greeting: str = "Hello") -> str:
|
||||
"""Generate a greeting message.
|
||||
|
||||
Args:
|
||||
name: Name of the person to greet
|
||||
greeting: Greeting word to use
|
||||
|
||||
Returns:
|
||||
A formatted greeting string
|
||||
|
||||
Raises:
|
||||
ValueError: If name is empty
|
||||
"""
|
||||
if not name:
|
||||
raise ValueError("Name cannot be empty")
|
||||
return f"{greeting}, {name}!"
|
||||
|
||||
|
||||
class Calculator:
|
||||
"""A simple calculator class for basic arithmetic operations.
|
||||
|
||||
This class provides methods for performing addition, subtraction,
|
||||
multiplication, and division operations.
|
||||
|
||||
Attributes:
|
||||
memory: Current memory value for accumulator operations
|
||||
|
||||
Example:
|
||||
>>> calc = Calculator()
|
||||
>>> calc.add(5)
|
||||
>>> calc.multiply(2)
|
||||
>>> calc.get_memory()
|
||||
10
|
||||
"""
|
||||
|
||||
def __init__(self, initial_value: float = 0.0) -> None:
|
||||
"""Initialize the calculator with an optional starting value.
|
||||
|
||||
Args:
|
||||
initial_value: The starting value for the calculator
|
||||
"""
|
||||
self.memory = initial_value
|
||||
|
||||
def add(self, value: float) -> None:
|
||||
"""Add a value to the current memory.
|
||||
|
||||
Args:
|
||||
value: Number to add to memory
|
||||
"""
|
||||
self.memory += value
|
||||
|
||||
def subtract(self, value: float) -> None:
|
||||
"""Subtract a value from the current memory.
|
||||
|
||||
Args:
|
||||
value: Number to subtract from memory
|
||||
"""
|
||||
self.memory -= value
|
||||
|
||||
def multiply(self, value: float) -> None:
|
||||
"""Multiply the current memory by a value.
|
||||
|
||||
Args:
|
||||
value: Number to multiply by
|
||||
"""
|
||||
self.memory *= value
|
||||
|
||||
def divide(self, value: float) -> None:
|
||||
"""Divide the current memory by a value.
|
||||
|
||||
Args:
|
||||
value: Number to divide by
|
||||
|
||||
Raises:
|
||||
ZeroDivisionError: If value is zero
|
||||
"""
|
||||
if value == 0:
|
||||
raise ZeroDivisionError("Cannot divide by zero")
|
||||
self.memory /= value
|
||||
|
||||
def get_memory(self) -> float:
|
||||
"""Get the current memory value.
|
||||
|
||||
Returns:
|
||||
The current memory value
|
||||
"""
|
||||
return self.memory
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the memory to zero."""
|
||||
self.memory = 0.0
|
||||
|
||||
|
||||
class DataProcessor:
|
||||
"""A class for processing data with various operations.
|
||||
|
||||
This class supports filtering, mapping, and aggregating data
|
||||
from various input sources.
|
||||
|
||||
Attributes:
|
||||
data: Internal data storage
|
||||
processed_count: Number of items processed
|
||||
|
||||
Methods:
|
||||
load: Load data from a source
|
||||
filter: Filter data based on criteria
|
||||
map: Transform data elements
|
||||
aggregate: Calculate aggregate statistics
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the data processor."""
|
||||
self.data = []
|
||||
self.processed_count = 0
|
||||
|
||||
def load(self, items: list) -> None:
|
||||
"""Load data into the processor.
|
||||
|
||||
Args:
|
||||
items: List of items to process
|
||||
"""
|
||||
self.data = list(items)
|
||||
|
||||
def filter(self, predicate) -> list:
|
||||
"""Filter data based on a predicate function.
|
||||
|
||||
Args:
|
||||
predicate: Function that returns True for items to keep
|
||||
|
||||
Returns:
|
||||
Filtered list of items
|
||||
"""
|
||||
result = [item for item in self.data if predicate(item)]
|
||||
self.processed_count += len(result)
|
||||
return result
|
||||
|
||||
def map(self, transform) -> list:
|
||||
"""Transform data using a function.
|
||||
|
||||
Args:
|
||||
transform: Function to apply to each item
|
||||
|
||||
Returns:
|
||||
List of transformed items
|
||||
"""
|
||||
result = [transform(item) for item in self.data]
|
||||
self.processed_count += len(result)
|
||||
return result
|
||||
|
||||
def aggregate(self, func, initial=None):
|
||||
"""Aggregate data using a function.
|
||||
|
||||
Args:
|
||||
func: Aggregation function (e.g., sum, max, min)
|
||||
initial: Initial value for the aggregation
|
||||
|
||||
Returns:
|
||||
Aggregated result
|
||||
"""
|
||||
if initial is not None:
|
||||
result = func(self.data, initial)
|
||||
else:
|
||||
result = func(self.data)
|
||||
self.processed_count += 1
|
||||
return result
|
||||
|
||||
def get_stats(self) -> dict:
|
||||
"""Get processing statistics.
|
||||
|
||||
Returns:
|
||||
Dictionary with processing stats
|
||||
"""
|
||||
return {
|
||||
"total_items": len(self.data),
|
||||
"processed_count": self.processed_count,
|
||||
}
|
||||
@@ -4,8 +4,7 @@ import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from click.testing import CliRunner
|
||||
|
||||
from src.cli.commands import cli, index_command, search_command, list_command
|
||||
from src.cli.interactive import run_interactive
|
||||
from src.cli.commands import cli
|
||||
|
||||
|
||||
class TestCLIBasics:
|
||||
@@ -237,6 +236,6 @@ class TestInteractiveCommand:
|
||||
with patch("src.cli.interactive.run_interactive") as mock_run:
|
||||
mock_run.side_effect = (KeyboardInterrupt, SystemExit(0))
|
||||
|
||||
result = runner.invoke(cli, ["interactive"])
|
||||
runner.invoke(cli, ["interactive"])
|
||||
|
||||
mock_run.assert_called_once()
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
"""Tests for the indexers."""
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
"""Integration tests for the complete workflow."""
|
||||
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from src.cli.commands import cli
|
||||
from src.search.searcher import Searcher
|
||||
from src.models.document import Document, SourceType, SearchResult
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user