Add SLURM Ray smoke kit#2010
Conversation
…ster-ray-slurm # Conflicts: # Dockerfile
Greptile SummaryThis PR adds a
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/txt/ray_data.py | Adds _coerce_binary_payload helper and integrates it into TxtSplitCPUActor.process; the bare except Exception: continue silently drops rows without logging, and the helper has unbounded recursion for nested as_py() wrappers. |
| nemo_retriever/src/nemo_retriever/txt/split.py | Adds local-path detection to _get_tokenizer to skip HF revision lookup when a local directory is supplied; clean and well-contained change. |
| nemo_retriever/tests/test_actor_operators.py | Adds parametrised tests for binary payload coercion; assert_called_once_with(..., params=actor._params) compares against None from the archetype wrapper, while the delegate normalises None to TextChunkParams(), so the assertion will always fail. |
| nemo_retriever/tests/test_txt_split.py | Adds test_get_tokenizer_local_path_skips_revision_lookup to verify the local-path branch; straightforward and correct. |
| tools/slurm-ray-smoke/smokes/_common.py | New shared helpers for JSON output, Ray state introspection, and cluster readiness assertions; missing SPDX license header. |
| tools/slurm-ray-smoke/smokes/nemo_executor_smoke.py | Direct RayDataExecutor smoke that pins stages to custom resources and asserts cross-node execution; missing SPDX license header. |
| tools/slurm-ray-smoke/smokes/nemo_graph_ingestor_smoke.py | GraphIngestor batch-mode smoke; missing SPDX license header. |
| tools/slurm-ray-smoke/submit.sh | Profile-driven launcher that validates inputs, stages tarballs, renders job.sbatch, and submits over SSH; input validation is thorough (allowlist, regex, quote checks). |
| tools/slurm-ray-smoke/templates/job.sbatch.sh | SLURM sbatch template that bootstraps Ray across nodes; installs uv via `curl |
Comments Outside Diff (1)
-
nemo_retriever/src/nemo_retriever/txt/ray_data.py, line 121-131 (link)Bare except silently drops rows
Every failure inside the
tryblock (coercion error, tokenizer crash, or anytxt_bytes_to_chunks_dfexception) is swallowed withcontinueand produces no log output. In production this means documents disappear from the pipeline without any trace, violating theno-bare-exceptrule which requires at least logging with full context at an exception boundary.Prompt To Fix With AI
This is a comment left during a code review. Path: nemo_retriever/src/nemo_retriever/txt/ray_data.py Line: 121-131 Comment: **Bare except silently drops rows** Every failure inside the `try` block (coercion error, tokenizer crash, or any `txt_bytes_to_chunks_df` exception) is swallowed with `continue` and produces no log output. In production this means documents disappear from the pipeline without any trace, violating the `no-bare-except` rule which requires at least logging with full context at an exception boundary. How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
Fix the following 5 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 5
nemo_retriever/src/nemo_retriever/txt/ray_data.py:121-131
**Bare except silently drops rows**
Every failure inside the `try` block (coercion error, tokenizer crash, or any `txt_bytes_to_chunks_df` exception) is swallowed with `continue` and produces no log output. In production this means documents disappear from the pipeline without any trace, violating the `no-bare-except` rule which requires at least logging with full context at an exception boundary.
### Issue 2 of 5
nemo_retriever/tests/test_actor_operators.py:634-641
**`params=actor._params` asserts `None`, not the delegate's default**
`actor = TxtSplitActor()` sets `actor._params = None`. `ArchetypeOperator._resolve_delegate` constructs `TxtSplitCPUActor(params=None)`, whose `__init__` normalises `None` via `params or TextChunkParams()`, so `delegate._params = TextChunkParams()`. The actual call to `txt_bytes_to_chunks_df` passes `params=TextChunkParams()`, but `assert_called_once_with(..., params=actor._params)` asserts `params=None`. The assertion will always fail unless `TextChunkParams() == None`, which it does not. Use `params=TextChunkParams()` (or the equivalent default instance) as the expected value.
### Issue 3 of 5
tools/slurm-ray-smoke/smokes/_common.py:1-3
**Missing SPDX license header**
All three new Python files in `smokes/` (`_common.py`, `nemo_executor_smoke.py`, `nemo_graph_ingestor_smoke.py`) are missing the required SPDX header block. Every `.py` file added to this repository must begin with:
```
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
```
### Issue 4 of 5
tools/slurm-ray-smoke/templates/job.sbatch.sh:80
**Remote script piped to shell without integrity check**
`curl -LsSf https://astral.sh/uv/install.sh | ... sh` downloads and executes arbitrary code from the network at job runtime with no checksum or signature verification. If the CDN is compromised or the URL redirected, every cluster node running this job would execute malicious code. Consider pinning the installer to a specific version digest, vendoring the `uv` binary into the repository, or downloading to a file first so the contents can be inspected before execution.
### Issue 5 of 5
nemo_retriever/src/nemo_retriever/txt/ray_data.py:24-42
**Potential infinite recursion via `as_py()` chain**
`_coerce_binary_payload` calls itself recursively when `as_py()` is present. If `as_py()` returns an object that also exposes `as_py()` (e.g. a nested Arrow scalar wrapper), this recurses without bound. Adding a depth guard or converting only one level would prevent an unexpected `RecursionError` crashing a Ray worker.
Reviews (1): Last reviewed commit: "Merge remote-tracking branch 'upstream/m..." | Re-trigger Greptile
| def test_process_coerces_binary_payload_variants(self, mock_fn, raw, expected_bytes): | ||
| expected = pd.DataFrame({"text": ["chunk"], "path": ["/a.txt"], "page_number": [0], "metadata": [{}]}) | ||
| mock_fn.return_value = expected | ||
| actor = self._make() | ||
| df = pd.DataFrame({"bytes": [raw], "path": ["/a.txt"]}) | ||
| result = actor.process(df) | ||
| mock_fn.assert_called_once_with(expected_bytes, "/a.txt", params=actor._params) | ||
| pd.testing.assert_frame_equal(result, expected) |
There was a problem hiding this comment.
params=actor._params asserts None, not the delegate's default
actor = TxtSplitActor() sets actor._params = None. ArchetypeOperator._resolve_delegate constructs TxtSplitCPUActor(params=None), whose __init__ normalises None via params or TextChunkParams(), so delegate._params = TextChunkParams(). The actual call to txt_bytes_to_chunks_df passes params=TextChunkParams(), but assert_called_once_with(..., params=actor._params) asserts params=None. The assertion will always fail unless TextChunkParams() == None, which it does not. Use params=TextChunkParams() (or the equivalent default instance) as the expected value.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/tests/test_actor_operators.py
Line: 634-641
Comment:
**`params=actor._params` asserts `None`, not the delegate's default**
`actor = TxtSplitActor()` sets `actor._params = None`. `ArchetypeOperator._resolve_delegate` constructs `TxtSplitCPUActor(params=None)`, whose `__init__` normalises `None` via `params or TextChunkParams()`, so `delegate._params = TextChunkParams()`. The actual call to `txt_bytes_to_chunks_df` passes `params=TextChunkParams()`, but `assert_called_once_with(..., params=actor._params)` asserts `params=None`. The assertion will always fail unless `TextChunkParams() == None`, which it does not. Use `params=TextChunkParams()` (or the equivalent default instance) as the expected value.
How can I resolve this? If you propose a fix, please make it concise.| from __future__ import annotations | ||
|
|
||
| import json |
There was a problem hiding this comment.
All three new Python files in smokes/ (_common.py, nemo_executor_smoke.py, nemo_graph_ingestor_smoke.py) are missing the required SPDX header block. Every .py file added to this repository must begin with:
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
Rule Used: Python files added in this PR must include the SPD... (source)
Prompt To Fix With AI
This is a comment left during a code review.
Path: tools/slurm-ray-smoke/smokes/_common.py
Line: 1-3
Comment:
**Missing SPDX license header**
All three new Python files in `smokes/` (`_common.py`, `nemo_executor_smoke.py`, `nemo_graph_ingestor_smoke.py`) are missing the required SPDX header block. Every `.py` file added to this repository must begin with:
```
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
```
**Rule Used:** Python files added in this PR must include the SPD... ([source](https://app.greptile.com/review/custom-context?memory=spdx-license-header))
How can I resolve this? If you propose a fix, please make it concise.| tar -xzf "$SOURCE_TARBALL" -C "$SOURCE_ROOT" | ||
|
|
||
| if [[ ! -x "$UV_BIN" ]]; then | ||
| curl -LsSf https://astral.sh/uv/install.sh | /usr/bin/env UV_INSTALL_DIR="$UV_INSTALL_DIR" sh |
There was a problem hiding this comment.
Remote script piped to shell without integrity check
curl -LsSf https://astral.sh/uv/install.sh | ... sh downloads and executes arbitrary code from the network at job runtime with no checksum or signature verification. If the CDN is compromised or the URL redirected, every cluster node running this job would execute malicious code. Consider pinning the installer to a specific version digest, vendoring the uv binary into the repository, or downloading to a file first so the contents can be inspected before execution.
Prompt To Fix With AI
This is a comment left during a code review.
Path: tools/slurm-ray-smoke/templates/job.sbatch.sh
Line: 80
Comment:
**Remote script piped to shell without integrity check**
`curl -LsSf https://astral.sh/uv/install.sh | ... sh` downloads and executes arbitrary code from the network at job runtime with no checksum or signature verification. If the CDN is compromised or the URL redirected, every cluster node running this job would execute malicious code. Consider pinning the installer to a specific version digest, vendoring the `uv` binary into the repository, or downloading to a file first so the contents can be inspected before execution.
How can I resolve this? If you propose a fix, please make it concise.| def _coerce_binary_payload(value: Any) -> Any: | ||
| if value is None: | ||
| return None | ||
| if isinstance(value, bytes): | ||
| return value | ||
| if isinstance(value, bytearray): | ||
| return bytes(value) | ||
| if isinstance(value, memoryview): | ||
| return value.tobytes() | ||
|
|
||
| as_py = getattr(value, "as_py", None) | ||
| if callable(as_py): | ||
| return _coerce_binary_payload(as_py()) | ||
|
|
||
| tobytes = getattr(value, "tobytes", None) | ||
| if callable(tobytes): | ||
| return tobytes() | ||
|
|
||
| return value |
There was a problem hiding this comment.
Potential infinite recursion via
as_py() chain
_coerce_binary_payload calls itself recursively when as_py() is present. If as_py() returns an object that also exposes as_py() (e.g. a nested Arrow scalar wrapper), this recurses without bound. Adding a depth guard or converting only one level would prevent an unexpected RecursionError crashing a Ray worker.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/txt/ray_data.py
Line: 24-42
Comment:
**Potential infinite recursion via `as_py()` chain**
`_coerce_binary_payload` calls itself recursively when `as_py()` is present. If `as_py()` returns an object that also exposes `as_py()` (e.g. a nested Arrow scalar wrapper), this recurses without bound. Adding a depth guard or converting only one level would prevent an unexpected `RecursionError` crashing a Ray worker.
How can I resolve this? If you propose a fix, please make it concise.
Description
Adds a SLURM Ray smoke kit for validating NeMo-Retriever against an externally started intra-cluster Ray cluster.
This includes:
tools/slurm-ray-smoke/submit.shlauncher that stages the current source tree, rendersjob.sbatch, and submits it over SSHRayDataExecutorandGraphIngestor(run_mode="batch", ray_address=...)Validation:
RayDataExecutorsmoke succeeded with three rows and split/chunk work placed on different nodesGraphIngestorsmoke succeeded with three rows against the same external Ray addressnemo_headandnemo_workercustom resourcesChecklist
No docker-compose or Helm values changes are included.