From 6410d755299a8e8882f2738a1971589ada2168bd Mon Sep 17 00:00:00 2001 From: jioffe502 Date: Thu, 14 May 2026 18:30:41 +0000 Subject: [PATCH 1/6] Implement LanceDB hybrid retrieval --- .../src/nemo_retriever/vdb/lancedb.py | 45 +++++++++- .../src/nemo_retriever/vdb/operators.py | 3 + .../tests/test_lancedb_retrieval_where.py | 85 ++++++++++++++++++- .../tests/test_nv_ingest_vdb_operator.py | 32 +++++++ 4 files changed, 159 insertions(+), 6 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py index 74f6b509d4..5cc3c839d0 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py @@ -546,10 +546,12 @@ def retrieval(self, vectors, **kwargs): ``table.search`` (e.g. ``query_type``, ``fts_columns``). Do not pass ``vector_column_name`` here; use the top-level ``vector_column_name`` retrieval argument instead. + query_texts: + Raw query strings aligned with ``vectors``. Required for + ``hybrid=True`` and ignored for dense-only retrieval. """ hybrid = kwargs.pop("hybrid", self.hybrid) - if hybrid: - raise NotImplementedError("LanceDB hybrid retrieval with precomputed vectors is not implemented yet.") + query_texts = kwargs.pop("query_texts", None) table_path = kwargs.pop("table_path", self.uri) table_name = kwargs.pop("table_name", self.table_name) @@ -567,6 +569,23 @@ def retrieval(self, vectors, **kwargs): else: search_kwargs = dict(search_kwargs_raw) + if hybrid: + if query_texts is None: + raise ValueError( + "LanceDB hybrid retrieval requires query_texts because it needs raw query text " + "in addition to precomputed vectors." + ) + query_type = search_kwargs.get("query_type") + if query_type is not None: + query_type_value = getattr(query_type, "value", query_type) + if str(query_type_value).lower() != "hybrid": + raise ValueError( + "LanceDB hybrid retrieval requires search_kwargs['query_type']='hybrid'; " + f"got {query_type!r}." + ) + search_kwargs["query_type"] = "hybrid" + search_kwargs.setdefault("fts_columns", "text") + where_clause = kwargs.pop("where", None) _filter_fallback = kwargs.pop("_filter", None) if where_clause is None: @@ -576,9 +595,27 @@ def retrieval(self, vectors, **kwargs): table = lancedb.connect(uri=table_path).open_table(table_name) + vectors_list = list(vectors) + if hybrid: + query_texts_list = [query_texts] if isinstance(query_texts, str) else list(query_texts) + if len(query_texts_list) != len(vectors_list): + raise ValueError( + "LanceDB hybrid retrieval requires query_texts length to match vectors length; " + f"got query_texts={len(query_texts_list)} vectors={len(vectors_list)}." + ) + else: + query_texts_list = [] + search_results = [] - for vector in vectors: - query = table.search([vector], vector_column_name=vector_column_name, **search_kwargs) + for idx, vector in enumerate(vectors_list): + if hybrid: + query = ( + table.search(vector_column_name=vector_column_name, **search_kwargs) + .vector(vector) + .text(str(query_texts_list[idx])) + ) + else: + query = table.search([vector], vector_column_name=vector_column_name, **search_kwargs) if where_clause is not None: query = query.where(where_clause) query = query.limit(top_k).refine_factor(refine_factor).nprobes(n_probe) diff --git a/nemo_retriever/src/nemo_retriever/vdb/operators.py b/nemo_retriever/src/nemo_retriever/vdb/operators.py index caf5c87710..07fe88baed 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/operators.py +++ b/nemo_retriever/src/nemo_retriever/vdb/operators.py @@ -147,6 +147,7 @@ def __init__( ) -> None: merged = dict(vdb_kwargs or {}) clean_kwargs, _sidecar = split_sidecar_from_vdb_kwargs(merged) + clean_kwargs.pop("query_texts", None) super().__init__(vdb=vdb, vdb_op=vdb_op, vdb_kwargs=clean_kwargs, explode_for_rerank=explode_for_rerank) self._vdb_kwargs = clean_kwargs self._retrieval_vdb_kwargs = clean_kwargs @@ -162,6 +163,8 @@ def process(self, data: Any, **kwargs: Any) -> list[list[dict[str, Any]]]: from nemo_retriever.retriever_graph_utils import filter_retrieval_kwargs retrieval_kwargs = {**self._retrieval_vdb_kwargs, **filter_retrieval_kwargs(kwargs)} + if retrieval_kwargs.get("hybrid") and "query_texts" in kwargs: + retrieval_kwargs["query_texts"] = kwargs["query_texts"] return normalize_retrieval_results(self._vdb.retrieval(data, **retrieval_kwargs)) def postprocess(self, data: Any, **kwargs: Any) -> Any: diff --git a/nemo_retriever/tests/test_lancedb_retrieval_where.py b/nemo_retriever/tests/test_lancedb_retrieval_where.py index ffdb534f2f..a0fcc7d89e 100644 --- a/nemo_retriever/tests/test_lancedb_retrieval_where.py +++ b/nemo_retriever/tests/test_lancedb_retrieval_where.py @@ -17,7 +17,7 @@ from nemo_retriever.vdb.lancedb import LanceDB -def _tiny_table(uri: str) -> None: +def _tiny_table(uri: str, *, create_fts_index: bool = False) -> None: schema = pa.schema( [ pa.field("vector", pa.list_(pa.float32(), 2)), @@ -41,7 +41,9 @@ def _tiny_table(uri: str) -> None: }, ] db = lancedb.connect(uri) - db.create_table("t", rows, schema=schema, mode="overwrite") + table = db.create_table("t", rows, schema=schema, mode="overwrite") + if create_fts_index: + table.create_fts_index("text", replace=True) def test_retrieval_where_filters_rows() -> None: @@ -101,3 +103,82 @@ def test_retrieval_search_kwargs_must_be_dict() -> None: op = LanceDB(uri=d, table_name="t", overwrite=False, vector_dim=2, validate_vector_length=False) with pytest.raises(TypeError, match="search_kwargs"): op.retrieval([[1.0, 0.0]], top_k=5, table_path=d, table_name="t", search_kwargs="bad") + + +def test_hybrid_retrieval_uses_query_texts() -> None: + d = tempfile.mkdtemp() + _tiny_table(d, create_fts_index=True) + op = LanceDB(uri=d, table_name="t", overwrite=False, vector_dim=2, validate_vector_length=False) + + results = op.retrieval( + [[1.0, 0.0]], + top_k=2, + table_path=d, + table_name="t", + hybrid=True, + query_texts=["alpha"], + ) + + assert results[0] + assert results[0][0]["text"] == "alpha" + + +def test_hybrid_retrieval_requires_query_texts() -> None: + d = tempfile.mkdtemp() + _tiny_table(d, create_fts_index=True) + op = LanceDB(uri=d, table_name="t", overwrite=False, vector_dim=2, validate_vector_length=False) + + with pytest.raises(ValueError, match="requires query_texts"): + op.retrieval([[1.0, 0.0]], top_k=2, table_path=d, table_name="t", hybrid=True) + + +def test_hybrid_retrieval_requires_query_texts_aligned_with_vectors() -> None: + d = tempfile.mkdtemp() + _tiny_table(d, create_fts_index=True) + op = LanceDB(uri=d, table_name="t", overwrite=False, vector_dim=2, validate_vector_length=False) + + with pytest.raises(ValueError, match="length to match vectors length"): + op.retrieval( + [[1.0, 0.0]], + top_k=2, + table_path=d, + table_name="t", + hybrid=True, + query_texts=["alpha", "beta"], + ) + + +def test_hybrid_retrieval_where_filters_rows() -> None: + d = tempfile.mkdtemp() + _tiny_table(d, create_fts_index=True) + op = LanceDB(uri=d, table_name="t", overwrite=False, vector_dim=2, validate_vector_length=False) + + filtered = op.retrieval( + [[1.0, 0.0]], + top_k=10, + table_path=d, + table_name="t", + hybrid=True, + query_texts=["beta"], + where="text = 'beta'", + ) + + assert len(filtered[0]) == 1 + assert filtered[0][0]["text"] == "beta" + + +def test_hybrid_retrieval_rejects_non_hybrid_query_type() -> None: + d = tempfile.mkdtemp() + _tiny_table(d, create_fts_index=True) + op = LanceDB(uri=d, table_name="t", overwrite=False, vector_dim=2, validate_vector_length=False) + + with pytest.raises(ValueError, match="query_type"): + op.retrieval( + [[1.0, 0.0]], + top_k=2, + table_path=d, + table_name="t", + hybrid=True, + query_texts=["alpha"], + search_kwargs={"query_type": "vector"}, + ) diff --git a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py index 8213405eea..ee882d9fa0 100644 --- a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py +++ b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py @@ -161,6 +161,38 @@ def test_retrieve_operator_delegates_vectors_to_retrieval() -> None: assert vdb.retrieval_calls == [([[0.1, 0.2]], {"collection_name": "docs", "model_name": "embedder", "top_k": 3})] +def test_retrieve_operator_forwards_runtime_query_texts() -> None: + vdb = FakeVDB() + operator = RetrieveVdbOperator( + vdb=vdb, + vdb_kwargs={"collection_name": "docs", "model_name": "embedder", "hybrid": True, "query_texts": ["stale"]}, + ) + + operator.process([[0.1, 0.2]], top_k=3, query_texts=["current"]) + + assert vdb.retrieval_calls == [ + ( + [[0.1, 0.2]], + { + "collection_name": "docs", + "model_name": "embedder", + "hybrid": True, + "top_k": 3, + "query_texts": ["current"], + }, + ) + ] + + +def test_retrieve_operator_does_not_forward_query_texts_for_dense_retrieval() -> None: + vdb = FakeVDB() + operator = RetrieveVdbOperator(vdb=vdb, vdb_kwargs={"collection_name": "docs", "model_name": "embedder"}) + + operator.process([[0.1, 0.2]], top_k=3, query_texts=["current"]) + + assert vdb.retrieval_calls == [([[0.1, 0.2]], {"collection_name": "docs", "model_name": "embedder", "top_k": 3})] + + def test_constructor_requires_exactly_one_vdb_source() -> None: with pytest.raises(ValueError, match="Either vdb or vdb_op is required"): IngestVdbOperator() From 1bf91683211a94d0f995e6f6f54f6e17cc465f9c Mon Sep 17 00:00:00 2001 From: jioffe502 Date: Thu, 14 May 2026 21:03:41 +0000 Subject: [PATCH 2/6] Document hybrid query text ordering assumption --- nemo_retriever/src/nemo_retriever/retriever.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nemo_retriever/src/nemo_retriever/retriever.py b/nemo_retriever/src/nemo_retriever/retriever.py index 8f6d36da58..47d991fe14 100644 --- a/nemo_retriever/src/nemo_retriever/retriever.py +++ b/nemo_retriever/src/nemo_retriever/retriever.py @@ -178,6 +178,9 @@ def _execute_queries_graph( text_col = str(embed_params.text_column) df = pd.DataFrame({text_col: query_texts}) + # Hybrid retrieval relies on these ordered query strings staying aligned + # with the embedded rows produced from ``df``. If this query graph grows + # distributed/shuffled stages, carry row-local query text or IDs instead. graph = self._get_graph(embed_extra=embed_extra) if not callable(getattr(graph, "resolve_for_local_execution", None)): raise TypeError("graph must provide resolve_for_local_execution() (e.g. pipeline_graph.Graph)") From d96dcf470b907afd9efd06486b28d74c3b850dc3 Mon Sep 17 00:00:00 2001 From: jioffe502 Date: Mon, 18 May 2026 15:07:59 +0000 Subject: [PATCH 3/6] Address LanceDB hybrid review comments --- nemo_retriever/src/nemo_retriever/vdb/lancedb.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py index 5cc3c839d0..9087006d3a 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py @@ -7,6 +7,7 @@ import os import time +from collections.abc import Iterable, Sequence from datetime import timedelta from typing import Any, Final, FrozenSet @@ -529,7 +530,7 @@ def run(self, records): logger.info("Skipping LanceDB index creation for table %r because build_index=False.", self.table_name) return records - def retrieval(self, vectors, **kwargs): + def retrieval(self, vectors: Iterable[Sequence[float]], **kwargs: Any) -> list[list[dict[str, Any]]]: """Search LanceDB with precomputed query vectors. Keyword arguments @@ -572,8 +573,8 @@ def retrieval(self, vectors, **kwargs): if hybrid: if query_texts is None: raise ValueError( - "LanceDB hybrid retrieval requires query_texts because it needs raw query text " - "in addition to precomputed vectors." + "LanceDB hybrid retrieval requires query_texts. Pass query_texts=your_queries " + "alongside vectors when calling retrieval() with hybrid=True." ) query_type = search_kwargs.get("query_type") if query_type is not None: @@ -595,19 +596,20 @@ def retrieval(self, vectors, **kwargs): table = lancedb.connect(uri=table_path).open_table(table_name) - vectors_list = list(vectors) if hybrid: + vectors_for_search = list(vectors) query_texts_list = [query_texts] if isinstance(query_texts, str) else list(query_texts) - if len(query_texts_list) != len(vectors_list): + if len(query_texts_list) != len(vectors_for_search): raise ValueError( "LanceDB hybrid retrieval requires query_texts length to match vectors length; " - f"got query_texts={len(query_texts_list)} vectors={len(vectors_list)}." + f"got query_texts={len(query_texts_list)} vectors={len(vectors_for_search)}." ) else: + vectors_for_search = vectors query_texts_list = [] search_results = [] - for idx, vector in enumerate(vectors_list): + for idx, vector in enumerate(vectors_for_search): if hybrid: query = ( table.search(vector_column_name=vector_column_name, **search_kwargs) From 3965bfb3e7f90e455a7b7540b58e07cf175a6e1b Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Mon, 1 Jun 2026 20:12:17 +0000 Subject: [PATCH 4/6] Tighten hybrid retrieval VDB contract --- .../src/nemo_retriever/vdb/README.md | 11 +++++----- .../src/nemo_retriever/vdb/adt_vdb.py | 22 +++++++++++++------ .../src/nemo_retriever/vdb/operators.py | 6 ++++- .../tests/test_nv_ingest_vdb_operator.py | 18 +++++++++++++++ 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/vdb/README.md b/nemo_retriever/src/nemo_retriever/vdb/README.md index 205089abd0..5fc72ec20f 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/README.md +++ b/nemo_retriever/src/nemo_retriever/vdb/README.md @@ -93,16 +93,17 @@ Common constructor arguments include: `RetrieveVdbOperator` wraps the same concrete **`VDB`** instance but calls **`retrieval(vectors, **kwargs)`** instead of `run`. It merges per-call kwargs with the operator’s stored `vdb_kwargs` and returns **`normalize_retrieval_results(...)`** output (see `operators.py`, `records.py`). -Important: retrieval here expects **`vectors`** — a list of query embedding vectors — **not** raw query strings. String queries are embedded elsewhere (e.g. in `Retriever`). +Important: retrieval here expects **`vectors`** — a list of query embedding vectors — as the primary input. String queries are embedded elsewhere (e.g. in `Retriever`). Hybrid backends that need raw text receive aligned `query_texts` as execution-only call context. ### LanceDB inside `RetrieveVdbOperator` For `vdb_op="lancedb"`, **`LanceDB.retrieval`**: - Opens the table with `lancedb.connect(table_path).open_table(table_name)`. -- For each query vector: **`table.search([vector], vector_column_name=..., **search_kwargs)`**, optional **`.where(where_clause)`** (Lance / DataFusion SQL; `metadata` / `source` are stored as JSON strings), then **`.limit(top_k).refine_factor(...).nprobes(...)`**. +- For dense retrieval, each query vector uses **`table.search([vector], vector_column_name=..., **search_kwargs)`**, optional **`.where(where_clause)`** (Lance / DataFusion SQL; `metadata` / `source` are stored as JSON strings), then **`.limit(top_k).refine_factor(...).nprobes(...)`**. +- For hybrid retrieval, callers pass `hybrid=True` plus `query_texts` aligned with the vectors. LanceDB uses **`table.search(query_type="hybrid", vector_column_name=..., fts_columns="text").vector(vector).text(query_text)`** before applying the same `where`, limit, refine, probe, and select handling. -Notable kwargs: `top_k`, `refine_factor`, `n_probe` / `nprobes`, `where` or `_filter`, `table_path`, `table_name`, `search_kwargs`. **Hybrid search with precomputed vectors is not implemented** in this path (`NotImplementedError` if `hybrid=True`). +Notable kwargs: `top_k`, `refine_factor`, `n_probe` / `nprobes`, `where` or `_filter`, `table_path`, `table_name`, `search_kwargs`, `hybrid`, and `query_texts`. `query_texts` is stripped from constructor kwargs and forwarded only for retrieval calls whose effective mode is hybrid. Example of **direct** operator use (you supply vectors): @@ -224,9 +225,9 @@ hits = filter_hits_by_content_metadata( Each hit's `metadata` field is a JSON string. Use **`parse_hit_content_metadata(hit)`** to get a `dict` you can read directly (this is what `filter_hits_by_content_metadata` uses internally). Both helpers are exported from `nemo_retriever.vdb`. -### Not implemented in this path +### Hybrid retrieval -Hybrid search (`hybrid=True`) is not implemented for the precomputed-vector retrieval path — `LanceDB.retrieval` raises `NotImplementedError`. Filters above apply only to dense vector search. +Hybrid search (`hybrid=True`) is implemented for LanceDB's precomputed-vector retrieval path. It requires `query_texts` aligned one-to-one with the query vectors so the backend can combine the dense vector query with full-text search. Filters above apply to both dense and hybrid search. --- diff --git a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py index d99378638d..1f928c0f7a 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py @@ -10,7 +10,9 @@ ingestion of Nemo Retriever Library (NRL) record batches. - `retrieval` — nearest-neighbor search over **precomputed query vectors**. Query strings are embedded upstream (see `nemo_retriever.Retriever`); - the VDB only sees vectors. + the VDB search boundary receives vectors as the primary input. Backends + that combine dense and lexical evidence may also receive aligned + `query_texts` as execution-only retrieval context. Methods accept `**kwargs` so backend-specific options (e.g. LanceDB's `where` predicate for metadata filtering, refinement factors, @@ -102,9 +104,11 @@ def retrieval(self, queries: list, **kwargs): Despite the parameter name `queries` (kept for backward compatibility), this method receives a list of embedding vectors, - one per query — *not* raw text. Query text is embedded upstream, - typically inside `nemo_retriever.Retriever`, before this method - is called. + one per query. Query text is embedded upstream, typically inside + `nemo_retriever.Retriever`, before this method is called. Backends + that need the raw query string for retrieval-time lexical matching + may additionally consume `query_texts` from `kwargs`; those strings + must be aligned one-to-one with the input vectors. Implementations search the index, apply any post-filtering, and return a list of hit lists aligned with the input (one inner list @@ -120,6 +124,10 @@ def retrieval(self, queries: list, **kwargs): serialized JSON, e.g. `metadata LIKE '%"meta_a":"alpha"%'`. The `_filter` alias is accepted in addition to `where`. + - query_texts: raw query strings aligned with the input vectors. + Dense retrieval backends should not require this. LanceDB hybrid + search requires it because the backend combines the precomputed + dense vector with a full-text query at search time. - refine_factor / nprobes / search_kwargs: ANN tuning passed through to the backend. @@ -128,9 +136,9 @@ def retrieval(self, queries: list, **kwargs): for the full filter cookbook (sidecar merge, server-side vs client-side filtering, escaping). - Hybrid search with precomputed vectors is not implemented by the - reference `LanceDB` backend; passing `hybrid=True` raises - `NotImplementedError` on that path. + `query_texts` is execution-only context. Operators should avoid + persisting it in backend constructor kwargs and pass it only for + retrieval calls whose effective mode needs raw text. """ pass diff --git a/nemo_retriever/src/nemo_retriever/vdb/operators.py b/nemo_retriever/src/nemo_retriever/vdb/operators.py index cef58f8e9b..c0ca78fdf4 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/operators.py +++ b/nemo_retriever/src/nemo_retriever/vdb/operators.py @@ -217,7 +217,11 @@ def process(self, data: Any, **kwargs: Any) -> list[list[dict[str, Any]]]: from nemo_retriever.retriever_graph_utils import filter_retrieval_kwargs retrieval_kwargs = {**self._retrieval_vdb_kwargs, **filter_retrieval_kwargs(kwargs)} - if retrieval_kwargs.get("hybrid") and "query_texts" in kwargs: + if "hybrid" in retrieval_kwargs: + effective_hybrid = bool(retrieval_kwargs["hybrid"]) + else: + effective_hybrid = bool(getattr(self._vdb, "hybrid", False)) + if effective_hybrid and "query_texts" in kwargs: retrieval_kwargs["query_texts"] = kwargs["query_texts"] return normalize_retrieval_results(self._vdb.retrieval(data, **retrieval_kwargs)) diff --git a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py index a6ab99bd41..174bdad634 100644 --- a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py +++ b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py @@ -191,6 +191,24 @@ def test_retrieve_operator_forwards_runtime_query_texts() -> None: ] +def test_retrieve_operator_forwards_query_texts_for_hybrid_vdb_instance() -> None: + vdb = FakeVDB(hybrid=True) + operator = RetrieveVdbOperator(vdb=vdb) + + operator.process([[0.1, 0.2]], top_k=3, query_texts=["current"]) + + assert vdb.retrieval_calls == [([[0.1, 0.2]], {"top_k": 3, "query_texts": ["current"]})] + + +def test_retrieve_operator_respects_dense_override_for_hybrid_vdb_instance() -> None: + vdb = FakeVDB(hybrid=True) + operator = RetrieveVdbOperator(vdb=vdb) + + operator.process([[0.1, 0.2]], top_k=3, hybrid=False, query_texts=["current"]) + + assert vdb.retrieval_calls == [([[0.1, 0.2]], {"top_k": 3, "hybrid": False})] + + def test_retrieve_operator_does_not_forward_query_texts_for_dense_retrieval() -> None: vdb = FakeVDB() operator = RetrieveVdbOperator(vdb=vdb, vdb_kwargs={"collection_name": "docs", "model_name": "embedder"}) From e3a5f3066ef134185640b06ef3999d52252399fe Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Tue, 2 Jun 2026 20:39:19 +0000 Subject: [PATCH 5/6] Clarify hybrid FTS index build --- .../src/nemo_retriever/vdb/README.md | 4 +- .../tests/test_lancedb_retrieval_where.py | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/vdb/README.md b/nemo_retriever/src/nemo_retriever/vdb/README.md index 5fc72ec20f..b40ee9d139 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/README.md +++ b/nemo_retriever/src/nemo_retriever/vdb/README.md @@ -71,7 +71,7 @@ When `vdb_op="lancedb"` (or `vdb=LanceDB(...)` is passed explicitly), `_construc `LanceDB.run` (in `lancedb.py`) orchestrates: 1. **`create_index`** — connects with `lancedb.connect(self.uri)`, transforms ingestion batches into Arrow rows (`vector`, `text`, `metadata`, `source`), and **`db.create_table(...)`** with schema and `on_bad_vectors` policy. -2. **`write_to_index`** — builds the **vector index** (e.g. IVF/HNSW) and optionally an **FTS** index when `hybrid=True`. +2. **`write_to_index`** — builds the **vector index** (e.g. IVF/HNSW) and optionally an **FTS/BM25** index over the ingested `text` column when `hybrid=True`. Common constructor arguments include: @@ -82,7 +82,7 @@ Common constructor arguments include: | `overwrite` | Table create mode vs append | | `vector_dim` | Expected embedding dimension (default 2048) | | `index_type` / `metric` / `num_partitions` / `num_sub_vectors` | Vector index tuning | -| `hybrid` | Also build FTS on `text` | +| `hybrid` | Also build the LanceDB FTS/BM25 index on ingested `text` | | `on_bad_vectors`| `drop`, `fill`, `null`, or `error` | --- diff --git a/nemo_retriever/tests/test_lancedb_retrieval_where.py b/nemo_retriever/tests/test_lancedb_retrieval_where.py index a0fcc7d89e..76dd368ac9 100644 --- a/nemo_retriever/tests/test_lancedb_retrieval_where.py +++ b/nemo_retriever/tests/test_lancedb_retrieval_where.py @@ -123,6 +123,65 @@ def test_hybrid_retrieval_uses_query_texts() -> None: assert results[0][0]["text"] == "alpha" +def test_hybrid_ingestion_builds_searchable_fts_index_from_record_text() -> None: + """`LanceDB.run(..., hybrid=True)` builds the BM25/FTS side of hybrid search.""" + d = tempfile.mkdtemp() + records = [ + [ + { + "document_type": "text", + "metadata": { + "embedding": [1.0, 0.0], + "content": "quarterly alpha revenue outlook", + "content_metadata": {"id": "alpha", "page_number": 1}, + "source_metadata": {"source_id": "alpha.pdf"}, + }, + }, + { + "document_type": "text", + "metadata": { + "embedding": [0.0, 1.0], + "content": "beta safety compliance manual", + "content_metadata": {"id": "beta", "page_number": 2}, + "source_metadata": {"source_id": "beta.pdf"}, + }, + }, + ] + ] + op = LanceDB( + uri=d, + table_name="t", + vector_dim=2, + hybrid=True, + num_partitions=1, + num_sub_vectors=1, + ) + + op.run(records) + + table = lancedb.connect(d).open_table("t") + assert table.count_rows() == 2 + index_names = {index.name.lower() for index in table.list_indices()} + assert any("text" in name or "fts" in name for name in index_names) + + fts_results = table.search("safety compliance", fts_columns="text").limit(1).to_list() + assert fts_results + assert fts_results[0]["text"] == "beta safety compliance manual" + assert "_score" in fts_results[0] + + results = op.retrieval( + [[0.0, 1.0]], + top_k=1, + table_path=d, + table_name="t", + hybrid=True, + query_texts=["safety compliance"], + ) + + assert results[0] + assert results[0][0]["text"] == "beta safety compliance manual" + + def test_hybrid_retrieval_requires_query_texts() -> None: d = tempfile.mkdtemp() _tiny_table(d, create_fts_index=True) From 07e295a6aaea97c0336957b43045fead5b184d3a Mon Sep 17 00:00:00 2001 From: Jacob Ioffe Date: Mon, 1 Jun 2026 20:47:04 +0000 Subject: [PATCH 6/6] Add VDB runtime summary observability --- .../src/nemo_retriever/pipeline/__main__.py | 6 + .../src/nemo_retriever/vdb/__init__.py | 2 + .../src/nemo_retriever/vdb/runtime.py | 131 ++++++++++++++++++ .../tests/test_graph_pipeline_cli.py | 11 ++ nemo_retriever/tests/test_vdb_runtime.py | 62 +++++++++ 5 files changed, 212 insertions(+) create mode 100644 nemo_retriever/src/nemo_retriever/vdb/runtime.py create mode 100644 nemo_retriever/tests/test_vdb_runtime.py diff --git a/nemo_retriever/src/nemo_retriever/pipeline/__main__.py b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py index 3d9ed7e981..97f952634a 100644 --- a/nemo_retriever/src/nemo_retriever/pipeline/__main__.py +++ b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py @@ -75,6 +75,7 @@ from nemo_retriever.params.models import BatchTuningParams from nemo_retriever.utils.input_files import resolve_input_patterns from nemo_retriever.utils.remote_auth import resolve_remote_api_key +from nemo_retriever.vdb.runtime import describe_vdb_runtime logger = logging.getLogger(__name__) @@ -1511,6 +1512,8 @@ def run( "meta_join_key": meta_join_key, } + vdb_runtime_summary = describe_vdb_runtime(resolved_vdb_op, resolved_vdb_kwargs) + remote_api_key = resolve_remote_api_key(api_key) extract_remote_api_key = remote_api_key embed_remote_api_key = remote_api_key @@ -1765,6 +1768,7 @@ def run( "evaluation_count": None, "recall_details": bool(recall_details), "vdb_op": str(resolved_vdb_op), + "vdb": vdb_runtime_summary, "qa_sweep_exit_code": qa_code, }, ) @@ -1847,6 +1851,7 @@ def run( "evaluation_metrics": {}, "recall_details": bool(recall_details), "vdb_op": str(resolved_vdb_op), + "vdb": vdb_runtime_summary, }, ) if run_mode == "batch": @@ -1880,6 +1885,7 @@ def run( "evaluation_count": evaluation_query_count, "recall_details": bool(recall_details), "vdb_op": str(resolved_vdb_op), + "vdb": vdb_runtime_summary, }, ) diff --git a/nemo_retriever/src/nemo_retriever/vdb/__init__.py b/nemo_retriever/src/nemo_retriever/vdb/__init__.py index cd4c4f3548..b0d66d2a2a 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/__init__.py +++ b/nemo_retriever/src/nemo_retriever/vdb/__init__.py @@ -8,6 +8,7 @@ from nemo_retriever.vdb.factory import get_vdb_op_cls from nemo_retriever.vdb.operators import IngestVdbOperator, PutVdbOperator, RetrieveVdbOperator from nemo_retriever.vdb.records import RetrievalHit, normalize_retrieval_results, to_client_vdb_records +from nemo_retriever.vdb.runtime import describe_vdb_runtime from nemo_retriever.vdb.sidecar_metadata import ( apply_sidecar_metadata_to_client_batches, build_sidecar_lookup, @@ -27,6 +28,7 @@ "RetrieveVdbOperator", "normalize_retrieval_results", "to_client_vdb_records", + "describe_vdb_runtime", "apply_sidecar_metadata_to_client_batches", "build_sidecar_lookup", "filter_hits_by_content_metadata", diff --git a/nemo_retriever/src/nemo_retriever/vdb/runtime.py b/nemo_retriever/src/nemo_retriever/vdb/runtime.py new file mode 100644 index 0000000000..b10f94e21e --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/vdb/runtime.py @@ -0,0 +1,131 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Runtime observability helpers for VDB-backed retrieval.""" + +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from pathlib import Path +from typing import Any + +_SENSITIVE_KEY_PARTS = frozenset( + { + "api_key", + "apikey", + "auth", + "bearer", + "credential", + "password", + "secret", + "token", + } +) +_EXECUTION_ONLY_CONFIG_KEYS = frozenset({"query_texts"}) + + +def _is_sensitive_key(key: object) -> bool: + normalized = str(key).strip().lower() + return any(part in normalized for part in _SENSITIVE_KEY_PARTS) + + +def _json_safe(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, Path): + return str(value) + if isinstance(value, Mapping): + return {str(k): _sanitize_config_value(k, v) for k, v in value.items()} + if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)): + return [_json_safe(v) for v in value] + return str(value) + + +def _sanitize_config_value(key: object, value: Any) -> Any: + if _is_sensitive_key(key): + return "" + return _json_safe(value) + + +def _sanitize_config(config: Mapping[str, Any]) -> dict[str, Any]: + return { + str(key): _sanitize_config_value(key, value) + for key, value in config.items() + if str(key).strip().lower() not in _EXECUTION_ONLY_CONFIG_KEYS + } + + +def _first_present(config: Mapping[str, Any], *keys: str) -> Any: + for key in keys: + value = config.get(key) + if value is not None and value != "": + return value + return None + + +def _target_summary(vdb_op: str, config: Mapping[str, Any]) -> dict[str, Any]: + target: dict[str, Any] = {} + + uri = _first_present(config, "uri", "lancedb_uri") + table_name = _first_present(config, "table_name", "lancedb_table") + collection_name = _first_present(config, "collection_name", "index_name") + + if uri is None and vdb_op == "lancedb": + uri = "lancedb" + if table_name is None and vdb_op == "lancedb": + table_name = "nv-ingest" + + if uri is not None: + target["uri"] = _json_safe(uri) + if table_name is not None: + target["table_name"] = _json_safe(table_name) + if collection_name is not None and collection_name != table_name: + target["collection_name"] = _json_safe(collection_name) + + return target + + +def _retrieval_summary(config: Mapping[str, Any]) -> dict[str, Any]: + hybrid = bool(config.get("hybrid", False)) + summary: dict[str, Any] = { + "mode": "hybrid" if hybrid else "dense", + "signals": ["dense_vector", "lexical_text"] if hybrid else ["dense_vector"], + "uses_query_texts": hybrid, + } + + top_k = _first_present(config, "top_k") + refine_factor = _first_present(config, "refine_factor") + nprobes = _first_present(config, "n_probe", "nprobes") + + if top_k is not None: + summary["top_k"] = _json_safe(top_k) + if refine_factor is not None: + summary["refine_factor"] = _json_safe(refine_factor) + if nprobes is not None: + summary["nprobes"] = _json_safe(nprobes) + + search_kwargs = config.get("search_kwargs") + if isinstance(search_kwargs, Mapping): + summary["search_kwargs"] = _sanitize_config(search_kwargs) + + return summary + + +def describe_vdb_runtime(vdb_op: str, vdb_kwargs: Mapping[str, Any] | None = None) -> dict[str, Any]: + """Return a JSON-safe, VDB-neutral runtime summary. + + The summary records the selected ADT VDB backend, target, normalized + retrieval mode, and sanitized config. It deliberately does not inspect a + live database; backend-specific health/index details can be layered in by + future helpers without changing this base contract. + """ + op = str(vdb_op or "").strip().lower() or "unknown" + config = dict(vdb_kwargs or {}) + + return { + "op": op, + "target": _target_summary(op, config), + "retrieval": _retrieval_summary(config), + "config": _sanitize_config(config), + } diff --git a/nemo_retriever/tests/test_graph_pipeline_cli.py b/nemo_retriever/tests/test_graph_pipeline_cli.py index 43b75aa23c..b10b79ba7c 100644 --- a/nemo_retriever/tests/test_graph_pipeline_cli.py +++ b/nemo_retriever/tests/test_graph_pipeline_cli.py @@ -464,6 +464,8 @@ def open_table(self, _name): str(runtime_dir), "--runtime-metrics-prefix", "sample-run", + "--vdb-kwargs-json", + '{"uri":"./kb","table_name":"docs","hybrid":true,"refine_factor":50}', "--no-recall-details", ], ) @@ -474,6 +476,15 @@ def open_table(self, _name): payload = json.loads(summary_path.read_text(encoding="utf-8")) assert payload["recall_details"] is False assert payload["evaluation_mode"] == "beir" + assert payload["vdb_op"] == "lancedb" + assert payload["vdb"]["op"] == "lancedb" + assert payload["vdb"]["target"] == {"uri": "./kb", "table_name": "docs"} + assert payload["vdb"]["retrieval"]["mode"] == "hybrid" + assert payload["vdb"]["retrieval"]["signals"] == ["dense_vector", "lexical_text"] + assert payload["vdb"]["retrieval"]["uses_query_texts"] is True + assert payload["vdb"]["retrieval"]["refine_factor"] == 50 + assert payload["vdb"]["config"]["hybrid"] is True + assert payload["vdb"]["config"]["overwrite"] is True def test_graph_pipeline_cli_service_mode_rejects_ingest_flag(tmp_path) -> None: diff --git a/nemo_retriever/tests/test_vdb_runtime.py b/nemo_retriever/tests/test_vdb_runtime.py new file mode 100644 index 0000000000..3926cf409b --- /dev/null +++ b/nemo_retriever/tests/test_vdb_runtime.py @@ -0,0 +1,62 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from pathlib import Path + +from nemo_retriever.vdb.runtime import describe_vdb_runtime + + +def test_describe_vdb_runtime_reports_dense_lancedb_defaults() -> None: + summary = describe_vdb_runtime("lancedb", {"uri": Path("/tmp/kb")}) + + assert summary["op"] == "lancedb" + assert summary["target"] == {"uri": "/tmp/kb", "table_name": "nv-ingest"} + assert summary["retrieval"] == { + "mode": "dense", + "signals": ["dense_vector"], + "uses_query_texts": False, + } + assert summary["config"] == {"uri": "/tmp/kb"} + + +def test_describe_vdb_runtime_preserves_backend_collection_target() -> None: + summary = describe_vdb_runtime( + "custom", + {"uri": "vdb://cluster", "table_name": "docs", "index_name": "semantic"}, + ) + + assert summary["target"] == { + "uri": "vdb://cluster", + "table_name": "docs", + "collection_name": "semantic", + } + + +def test_describe_vdb_runtime_reports_hybrid_mode_and_sanitizes_config() -> None: + summary = describe_vdb_runtime( + "lancedb", + { + "uri": "./kb", + "table_name": "docs", + "hybrid": True, + "refine_factor": 50, + "n_probe": 64, + "api_key": "secret-value", + "query_texts": ["do not persist"], + "search_kwargs": {"query_type": "hybrid", "token": "also-secret"}, + }, + ) + + assert summary["target"] == {"uri": "./kb", "table_name": "docs"} + assert summary["retrieval"]["mode"] == "hybrid" + assert summary["retrieval"]["signals"] == ["dense_vector", "lexical_text"] + assert summary["retrieval"]["uses_query_texts"] is True + assert summary["retrieval"]["refine_factor"] == 50 + assert summary["retrieval"]["nprobes"] == 64 + assert summary["retrieval"]["search_kwargs"] == { + "query_type": "hybrid", + "token": "", + } + assert summary["config"]["api_key"] == "" + assert "query_texts" not in summary["config"]