[WIP] [tx] Make it easy to run on a multi-node Ray cluster#955
[WIP] [tx] Make it easy to run on a multi-node Ray cluster#955Future-Outlier wants to merge 3 commits into
Conversation
Signed-off-by: Future-Outlier <eric901201@gmail.com>
|
@Future-Outlier is attempting to deploy a commit to the Tyler's projects Team on Vercel. A member of the Team first needs to authorize it. |
There was a problem hiding this comment.
Code Review
This pull request introduces support for running on a multi-node Ray cluster by adding a RayProcessManager and integrating it into the JAX backend and the Tinker engine. During the review, several issues were identified. A critical resource leak exists where Ray workers are not shut down gracefully, and there's a high-severity issue with a brittle synchronization mechanism that could cause startup failures. Furthermore, a security audit revealed two medium-severity vulnerabilities: leakage of database credentials in application logs (due to the API server logging the full command line including the database URL, potentially exposing passwords) and insecure configuration of the Ray dashboard (bound to 0.0.0.0, exposing it to the network). All these issues should be addressed.
| self._ray_process_manager = None | ||
| if hasattr(backend_config, "enable_ray") and backend_config.enable_ray: | ||
| logger.info("Starting Ray worker processes for multi-node support...") | ||
| self._ray_process_manager, coordinator_address = start_ray_workers( | ||
| backend_config | ||
| ) |
There was a problem hiding this comment.
The RayProcessManager is created and stored in self._ray_process_manager, but its shutdown() method is never called. When the engine process is terminated (e.g., by the API server's lifespan manager), the Ray worker actors will be orphaned, leading to a significant resource leak on the cluster.
To fix this, a graceful shutdown mechanism should be implemented. I recommend:
- Adding a
shutdown()method to theTinkerEngineclass that callsself._ray_process_manager.shutdown()if it exists. - In
engine.py'smain()function, add a signal handler forSIGTERMandSIGINTthat calls the newengine.shutdown()method before exiting.
| import time | ||
|
|
||
| time.sleep(2) | ||
|
|
||
| return manager, coordinator_address |
There was a problem hiding this comment.
The use of time.sleep(2) to wait for Ray workers to initialize is brittle and can lead to race conditions, especially on a loaded system where actor startup might take longer. This can cause non-deterministic startup failures if the workers are not ready when the coordinator tries to connect.
A more robust approach would be to actively poll the status of the worker actors until they all report as running, with a reasonable timeout. This ensures that the system waits just as long as necessary and provides a more reliable startup sequence.
import time
# Wait for all workers to report they are running
logger.info("Waiting for all Ray workers to start...")
start_time = time.time()
timeout_seconds = 60 # Consider making this configurable
while time.time() - start_time < timeout_seconds:
try:
statuses = ray.get([h.get_status.remote() for h in manager.worker_handles])
if all(s.get("is_running") for s in statuses):
logger.info("All Ray workers are running.")
return manager, coordinator_address
except Exception as e:
logger.warning(f"Error checking worker status, will retry: {e}")
time.sleep(1)
# If loop finishes, it's a timeout
manager.shutdown()
raise RuntimeError(f"Ray workers did not start within {timeout_seconds} seconds.")| logger.info( | ||
| f"Started background engine with PID {background_engine.pid}: {' '.join(cmd)}" | ||
| ) |
There was a problem hiding this comment.
The API server logs the full command line used to start the background engine. This command line includes all configuration parameters, including the database_url. If the database URL contains credentials (e.g., a password for a PostgreSQL or MySQL database), these credentials will be written to the application logs in plain text.
Remediation: Sanitize the command line arguments before logging them. Specifically, the database_url should be masked or excluded from the log message.
| ray.init( | ||
| include_dashboard=True, | ||
| dashboard_host="0.0.0.0", | ||
| dashboard_port=8265, | ||
| ) |
There was a problem hiding this comment.
When Ray support is enabled, the application initializes Ray with the dashboard bound to 0.0.0.0. This makes the Ray dashboard accessible from any machine on the network. The Ray dashboard can expose sensitive information about the cluster and, depending on the version and configuration, may allow for unauthorized task submission or code execution.
Remediation: Change the default dashboard_host to 127.0.0.1 to ensure it is only accessible locally. If remote access is required, it should be made configurable and the user should be warned about the security implications.
Signed-off-by: Future-Outlier <eric901201@gmail.com>
|
will come back later, recently preparing a talk for kubecon EU 2026! |
|
need 1 more week, gaining context |
Signed-off-by: Future-Outlier <eric901201@gmail.com>
|
|
||
| status_futures = [handle.get_status.remote() for handle in self.worker_handles] |
There was a problem hiding this comment.
🔴 wait_for_workers_ready() always times out due to Ray actor single-threaded execution
wait_for_workers_ready() calls handle.get_status.remote() on each worker actor, but start_worker() has already been submitted to the same actor and is blocking forever in the JAX worker loop. Since Ray actors process method calls sequentially in FIFO order, get_status() is queued behind start_worker() and will never execute.
Detailed Explanation
The sequence of events is:
- In
start_workers()atskyrl-tx/tx/tinker/backends/jax.py:1486,launcher.start_worker.remote(...)submits thestart_workertask to the actor start_worker()(skyrl-tx/tx/tinker/backends/jax.py:1313-1338) blocks forever by callingrun_worker()which enters an infinite command loop (skyrl-tx/tx/tinker/backends/jax.py:1258:while True:)- In
wait_for_workers_ready()at line 1522,handle.get_status.remote()submits aget_statustask to the same actor - Since Ray actors are single-threaded by default and process tasks in FIFO order,
get_statusis queued behind the already-runningstart_workerand can never execute
This means ray.get(status_futures, timeout=timeout) at line 1524 will always raise GetTimeoutError, causing the entire Ray worker launch flow to fail with a RuntimeError.
Impact: The Ray-based multi-node feature is completely non-functional — every attempt to use enable_ray=True will fail during startup.
Prompt for agents
In skyrl-tx/tx/tinker/backends/jax.py, the wait_for_workers_ready() method (line 1504) calls get_status() on each RayWorkerLauncher actor, but start_worker() has already been submitted and is blocking the actor forever. Since Ray actors are single-threaded, get_status() can never execute.
To fix this, you have several options:
1. Remove the wait_for_workers_ready() call entirely from start_ray_workers() (line 1758) and replace it with a simple time.sleep() or rely on JAX distributed initialization as the synchronization point.
2. Make the RayWorkerLauncher actor use Ray's async actor pattern (using async def methods) or threaded actors so get_status() can run concurrently with start_worker().
3. Instead of calling get_status() on the workers, verify readiness by checking that the Ray actor handles are alive using ray.get() with a short timeout on a separate lightweight task, or simply check that the actor objects were created successfully (which is already implied by the successful .remote() calls).
Option 1 is the simplest: in start_ray_workers() at line 1757-1758, replace manager.wait_for_workers_ready(timeout=config.ray_startup_timeout) with a brief sleep or remove it entirely, since the real synchronization happens when the coordinator calls jax.distributed.initialize().
Was this helpful? React with 👍 or 👎 to provide feedback.
| for ref in ready: | ||
| worker_idx = self.worker_futures.index(ref) | ||
| try: | ||
| result = ray.get(ref) | ||
| logger.error( | ||
| f"Worker {worker_idx + 1} exited unexpectedly with result: {result}" | ||
| ) | ||
| except Exception as e: | ||
| logger.error(f"Worker {worker_idx + 1} failed: {e}") |
There was a problem hiding this comment.
🟡 Health monitor repeatedly logs same failed workers due to never removing completed futures
In _health_monitor_loop, completed (failed) worker futures are never removed from self.worker_futures. On every iteration, ray.wait() returns the same completed refs, and ray.get(ref) re-raises the same exception, causing the same error messages to be logged every check_interval seconds indefinitely.
Root Cause
At skyrl-tx/tx/tinker/backends/jax.py:1572-1586, the loop calls ray.wait(self.worker_futures, ...) and iterates over ready refs. For each ready ref, it calls ray.get(ref) which either returns a result or raises an exception. However, the completed refs are never removed from self.worker_futures. On the next iteration (after check_interval seconds), ray.wait() returns the same completed refs again, and the same errors are logged.
For a failed worker, ray.get(ref) will re-raise the stored exception on every call, leading to the same error being logged every 5 seconds (default check_interval). This continues for the entire lifetime of the process.
Impact: Log spam that can make it difficult to diagnose other issues, and wastes CPU cycles repeatedly processing already-known failures.
| for ref in ready: | |
| worker_idx = self.worker_futures.index(ref) | |
| try: | |
| result = ray.get(ref) | |
| logger.error( | |
| f"Worker {worker_idx + 1} exited unexpectedly with result: {result}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Worker {worker_idx + 1} failed: {e}") | |
| for ref in ready: | |
| worker_idx = self.worker_futures.index(ref) | |
| try: | |
| result = ray.get(ref) | |
| logger.error( | |
| f"Worker {worker_idx + 1} exited unexpectedly with result: {result}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Worker {worker_idx + 1} failed: {e}") | |
| # Remove completed futures so we don't re-process them | |
| for ref in ready: | |
| self.worker_futures.remove(ref) |
Was this helpful? React with 👍 or 👎 to provide feedback.
|
if this is urgent, feel free to close this and let other do it, i will address this after kubecon, so will keep doing this from April 5 |
|
@Future-Outlier I think we can close this based on #1418 |
WIP: #935