-
Notifications
You must be signed in to change notification settings - Fork 233
[WIP] [tx] Make it easy to run on a multi-node Ray cluster #955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[WIP] [tx] Make it easy to run on a multi-node Ray cluster #955
Conversation
Signed-off-by: Future-Outlier <eric901201@gmail.com>
|
@Future-Outlier is attempting to deploy a commit to the Tyler's projects Team on Vercel. A member of the Team first needs to authorize it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces support for running on a multi-node Ray cluster by adding a RayProcessManager and integrating it into the JAX backend and the Tinker engine. During the review, several issues were identified. A critical resource leak exists where Ray workers are not shut down gracefully, and there's a high-severity issue with a brittle synchronization mechanism that could cause startup failures. Furthermore, a security audit revealed two medium-severity vulnerabilities: leakage of database credentials in application logs (due to the API server logging the full command line including the database URL, potentially exposing passwords) and insecure configuration of the Ray dashboard (bound to 0.0.0.0, exposing it to the network). All these issues should be addressed.
| self._ray_process_manager = None | ||
| if hasattr(backend_config, "enable_ray") and backend_config.enable_ray: | ||
| logger.info("Starting Ray worker processes for multi-node support...") | ||
| self._ray_process_manager, coordinator_address = start_ray_workers( | ||
| backend_config | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RayProcessManager is created and stored in self._ray_process_manager, but its shutdown() method is never called. When the engine process is terminated (e.g., by the API server's lifespan manager), the Ray worker actors will be orphaned, leading to a significant resource leak on the cluster.
To fix this, a graceful shutdown mechanism should be implemented. I recommend:
- Adding a
shutdown()method to theTinkerEngineclass that callsself._ray_process_manager.shutdown()if it exists. - In
engine.py'smain()function, add a signal handler forSIGTERMandSIGINTthat calls the newengine.shutdown()method before exiting.
| import time | ||
|
|
||
| time.sleep(2) | ||
|
|
||
| return manager, coordinator_address |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of time.sleep(2) to wait for Ray workers to initialize is brittle and can lead to race conditions, especially on a loaded system where actor startup might take longer. This can cause non-deterministic startup failures if the workers are not ready when the coordinator tries to connect.
A more robust approach would be to actively poll the status of the worker actors until they all report as running, with a reasonable timeout. This ensures that the system waits just as long as necessary and provides a more reliable startup sequence.
import time
# Wait for all workers to report they are running
logger.info("Waiting for all Ray workers to start...")
start_time = time.time()
timeout_seconds = 60 # Consider making this configurable
while time.time() - start_time < timeout_seconds:
try:
statuses = ray.get([h.get_status.remote() for h in manager.worker_handles])
if all(s.get("is_running") for s in statuses):
logger.info("All Ray workers are running.")
return manager, coordinator_address
except Exception as e:
logger.warning(f"Error checking worker status, will retry: {e}")
time.sleep(1)
# If loop finishes, it's a timeout
manager.shutdown()
raise RuntimeError(f"Ray workers did not start within {timeout_seconds} seconds.")| logger.info( | ||
| f"Started background engine with PID {background_engine.pid}: {' '.join(cmd)}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API server logs the full command line used to start the background engine. This command line includes all configuration parameters, including the database_url. If the database URL contains credentials (e.g., a password for a PostgreSQL or MySQL database), these credentials will be written to the application logs in plain text.
Remediation: Sanitize the command line arguments before logging them. Specifically, the database_url should be masked or excluded from the log message.
| ray.init( | ||
| include_dashboard=True, | ||
| dashboard_host="0.0.0.0", | ||
| dashboard_port=8265, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When Ray support is enabled, the application initializes Ray with the dashboard bound to 0.0.0.0. This makes the Ray dashboard accessible from any machine on the network. The Ray dashboard can expose sensitive information about the cluster and, depending on the version and configuration, may allow for unauthorized task submission or code execution.
Remediation: Change the default dashboard_host to 127.0.0.1 to ensure it is only accessible locally. If remote access is required, it should be made configurable and the user should be warned about the security implications.
WIP: #935