Skip to content

[WIP] [tx] Make it easy to run on a multi-node Ray cluster#955

Closed
Future-Outlier wants to merge 3 commits into
NovaSky-AI:mainfrom
Future-Outlier:tx-multi-node-ray-cluster
Closed

[WIP] [tx] Make it easy to run on a multi-node Ray cluster#955
Future-Outlier wants to merge 3 commits into
NovaSky-AI:mainfrom
Future-Outlier:tx-multi-node-ray-cluster

Conversation

@Future-Outlier
Copy link
Copy Markdown

@Future-Outlier Future-Outlier commented Jan 25, 2026

WIP: #935

cd skyrl-tx
uv run --extra ray --extra tinker -m tx.tinker.api \
    --base-model Qwen/Qwen3-0.6B \
    --backend-config '{"enable_ray": true, "ray_num_workers": 1}'

Open with Devin

Signed-off-by: Future-Outlier <eric901201@gmail.com>
@vercel
Copy link
Copy Markdown

vercel Bot commented Jan 25, 2026

@Future-Outlier is attempting to deploy a commit to the Tyler's projects Team on Vercel.

A member of the Team first needs to authorize it.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for running on a multi-node Ray cluster by adding a RayProcessManager and integrating it into the JAX backend and the Tinker engine. During the review, several issues were identified. A critical resource leak exists where Ray workers are not shut down gracefully, and there's a high-severity issue with a brittle synchronization mechanism that could cause startup failures. Furthermore, a security audit revealed two medium-severity vulnerabilities: leakage of database credentials in application logs (due to the API server logging the full command line including the database URL, potentially exposing passwords) and insecure configuration of the Ray dashboard (bound to 0.0.0.0, exposing it to the network). All these issues should be addressed.

Comment on lines +222 to +227
self._ray_process_manager = None
if hasattr(backend_config, "enable_ray") and backend_config.enable_ray:
logger.info("Starting Ray worker processes for multi-node support...")
self._ray_process_manager, coordinator_address = start_ray_workers(
backend_config
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The RayProcessManager is created and stored in self._ray_process_manager, but its shutdown() method is never called. When the engine process is terminated (e.g., by the API server's lifespan manager), the Ray worker actors will be orphaned, leading to a significant resource leak on the cluster.

To fix this, a graceful shutdown mechanism should be implemented. I recommend:

  1. Adding a shutdown() method to the TinkerEngine class that calls self._ray_process_manager.shutdown() if it exists.
  2. In engine.py's main() function, add a signal handler for SIGTERM and SIGINT that calls the new engine.shutdown() method before exiting.

Comment thread skyrl-tx/tx/tinker/backends/jax.py Outdated
Comment on lines +1502 to +1506
import time

time.sleep(2)

return manager, coordinator_address
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of time.sleep(2) to wait for Ray workers to initialize is brittle and can lead to race conditions, especially on a loaded system where actor startup might take longer. This can cause non-deterministic startup failures if the workers are not ready when the coordinator tries to connect.

A more robust approach would be to actively poll the status of the worker actors until they all report as running, with a reasonable timeout. This ensures that the system waits just as long as necessary and provides a more reliable startup sequence.

    import time

    # Wait for all workers to report they are running
    logger.info("Waiting for all Ray workers to start...")
    start_time = time.time()
    timeout_seconds = 60  # Consider making this configurable

    while time.time() - start_time < timeout_seconds:
        try:
            statuses = ray.get([h.get_status.remote() for h in manager.worker_handles])
            if all(s.get("is_running") for s in statuses):
                logger.info("All Ray workers are running.")
                return manager, coordinator_address
        except Exception as e:
            logger.warning(f"Error checking worker status, will retry: {e}")
        time.sleep(1)

    # If loop finishes, it's a timeout
    manager.shutdown()
    raise RuntimeError(f"Ray workers did not start within {timeout_seconds} seconds.")

Comment thread skyrl-tx/tx/tinker/api.py Outdated
Comment on lines +79 to +81
logger.info(
f"Started background engine with PID {background_engine.pid}: {' '.join(cmd)}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The API server logs the full command line used to start the background engine. This command line includes all configuration parameters, including the database_url. If the database URL contains credentials (e.g., a password for a PostgreSQL or MySQL database), these credentials will be written to the application logs in plain text.

Remediation: Sanitize the command line arguments before logging them. Specifically, the database_url should be masked or excluded from the log message.

Comment thread skyrl-tx/tx/tinker/backends/jax.py Outdated
Comment on lines +1481 to +1485
ray.init(
include_dashboard=True,
dashboard_host="0.0.0.0",
dashboard_port=8265,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

When Ray support is enabled, the application initializes Ray with the dashboard bound to 0.0.0.0. This makes the Ray dashboard accessible from any machine on the network. The Ray dashboard can expose sensitive information about the cluster and, depending on the version and configuration, may allow for unauthorized task submission or code execution.

Remediation: Change the default dashboard_host to 127.0.0.1 to ensure it is only accessible locally. If remote access is required, it should be made configurable and the user should be warned about the security implications.

@pcmoritz pcmoritz added the tx label Jan 26, 2026
Signed-off-by: Future-Outlier <eric901201@gmail.com>
@Future-Outlier
Copy link
Copy Markdown
Author

will come back later, recently preparing a talk for kubecon EU 2026!

@Future-Outlier
Copy link
Copy Markdown
Author

need 1 more week, gaining context

Signed-off-by: Future-Outlier <eric901201@gmail.com>
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 7 additional findings in Devin Review.

Open in Devin Review

Comment on lines +1521 to +1522

status_futures = [handle.get_status.remote() for handle in self.worker_handles]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 wait_for_workers_ready() always times out due to Ray actor single-threaded execution

wait_for_workers_ready() calls handle.get_status.remote() on each worker actor, but start_worker() has already been submitted to the same actor and is blocking forever in the JAX worker loop. Since Ray actors process method calls sequentially in FIFO order, get_status() is queued behind start_worker() and will never execute.

Detailed Explanation

The sequence of events is:

  1. In start_workers() at skyrl-tx/tx/tinker/backends/jax.py:1486, launcher.start_worker.remote(...) submits the start_worker task to the actor
  2. start_worker() (skyrl-tx/tx/tinker/backends/jax.py:1313-1338) blocks forever by calling run_worker() which enters an infinite command loop (skyrl-tx/tx/tinker/backends/jax.py:1258: while True:)
  3. In wait_for_workers_ready() at line 1522, handle.get_status.remote() submits a get_status task to the same actor
  4. Since Ray actors are single-threaded by default and process tasks in FIFO order, get_status is queued behind the already-running start_worker and can never execute

This means ray.get(status_futures, timeout=timeout) at line 1524 will always raise GetTimeoutError, causing the entire Ray worker launch flow to fail with a RuntimeError.

Impact: The Ray-based multi-node feature is completely non-functional — every attempt to use enable_ray=True will fail during startup.

Prompt for agents
In skyrl-tx/tx/tinker/backends/jax.py, the wait_for_workers_ready() method (line 1504) calls get_status() on each RayWorkerLauncher actor, but start_worker() has already been submitted and is blocking the actor forever. Since Ray actors are single-threaded, get_status() can never execute.

To fix this, you have several options:

1. Remove the wait_for_workers_ready() call entirely from start_ray_workers() (line 1758) and replace it with a simple time.sleep() or rely on JAX distributed initialization as the synchronization point.

2. Make the RayWorkerLauncher actor use Ray's async actor pattern (using async def methods) or threaded actors so get_status() can run concurrently with start_worker().

3. Instead of calling get_status() on the workers, verify readiness by checking that the Ray actor handles are alive using ray.get() with a short timeout on a separate lightweight task, or simply check that the actor objects were created successfully (which is already implied by the successful .remote() calls).

Option 1 is the simplest: in start_ray_workers() at line 1757-1758, replace manager.wait_for_workers_ready(timeout=config.ray_startup_timeout) with a brief sleep or remove it entirely, since the real synchronization happens when the coordinator calls jax.distributed.initialize().
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +1578 to +1586
for ref in ready:
worker_idx = self.worker_futures.index(ref)
try:
result = ray.get(ref)
logger.error(
f"Worker {worker_idx + 1} exited unexpectedly with result: {result}"
)
except Exception as e:
logger.error(f"Worker {worker_idx + 1} failed: {e}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Health monitor repeatedly logs same failed workers due to never removing completed futures

In _health_monitor_loop, completed (failed) worker futures are never removed from self.worker_futures. On every iteration, ray.wait() returns the same completed refs, and ray.get(ref) re-raises the same exception, causing the same error messages to be logged every check_interval seconds indefinitely.

Root Cause

At skyrl-tx/tx/tinker/backends/jax.py:1572-1586, the loop calls ray.wait(self.worker_futures, ...) and iterates over ready refs. For each ready ref, it calls ray.get(ref) which either returns a result or raises an exception. However, the completed refs are never removed from self.worker_futures. On the next iteration (after check_interval seconds), ray.wait() returns the same completed refs again, and the same errors are logged.

For a failed worker, ray.get(ref) will re-raise the stored exception on every call, leading to the same error being logged every 5 seconds (default check_interval). This continues for the entire lifetime of the process.

Impact: Log spam that can make it difficult to diagnose other issues, and wastes CPU cycles repeatedly processing already-known failures.

Suggested change
for ref in ready:
worker_idx = self.worker_futures.index(ref)
try:
result = ray.get(ref)
logger.error(
f"Worker {worker_idx + 1} exited unexpectedly with result: {result}"
)
except Exception as e:
logger.error(f"Worker {worker_idx + 1} failed: {e}")
for ref in ready:
worker_idx = self.worker_futures.index(ref)
try:
result = ray.get(ref)
logger.error(
f"Worker {worker_idx + 1} exited unexpectedly with result: {result}"
)
except Exception as e:
logger.error(f"Worker {worker_idx + 1} failed: {e}")
# Remove completed futures so we don't re-process them
for ref in ready:
self.worker_futures.remove(ref)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@Future-Outlier
Copy link
Copy Markdown
Author

if this is urgent, feel free to close this and let other do it, i will address this after kubecon, so will keep doing this from April 5

@andrewsykim
Copy link
Copy Markdown
Contributor

@Future-Outlier I think we can close this based on #1418

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants