-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start Ray on the head and the worker nodes #305
Conversation
…using self.run (, node)
def _start_ray(self, host, master_host, n_hosts, ray_port): | ||
if host == master_host: | ||
# Head node | ||
if ray.is_initialized(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking is_initialized in too coarse, as it will check if any ray cluster is connected. Unfortunately if we accidentally kill SkyPilot's Ray cluster autostop breaks, and it's tricky to restart. What we ideally want is to check if ray has already started on the specified port (and/or maybe with our "runhouse" namespace?), and we may need to use a subprocess.run for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the current check.
We could run
nmap -sV --reason -p 6381 127.0.0.1
and check that we get
PORT STATE SERVICE REASON VERSION
6381/tcp open redis? syn-ack
Problem: we need to install nmap.
or
nc -vv -z 127.0.0.1 6381
Connection to domain_name 6381 port [tcp/*] succeeded!
But this result is too coarse imho (not Ray specific).
Please let me know if you have a preference between the two, but I don't think it's a blocker for a first merge.
logger.info( | ||
f"There is a Ray cluster already running on the head node {master_host}. Shutting it down." | ||
) | ||
ray.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto the above about being too coarse. Also unfortunately there is no way to stop ray only on a single port. That's why we use a pkill command matching the specific port today. It's not ideal because Ray would otherwise tear down the resources more comprehensively, but it is what it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, removed the coarse code.
self._start_ray(host, master_host, n_hosts, self.DEFAULT_RAY_PORT) | ||
|
||
# logger.info("🎉 All workers present and accounted for 🎉") | ||
# logger.info(ray.cluster_resources()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good reminder, we need a better cluster.state API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
# Worker node | ||
self.run( | ||
commands=[ | ||
f"sleep 10 && ray start --address={master_host}:{ray_port} --block", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to start the workers inside the runhouse namespace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean the on the worker nodes? I'm not sure that ray start supports that? ray.init(address="ray://123.45.67.89:6381", namespace="runhouse")
does, but I'm not sure that that's the recommended way to connect to the cluster: https://docs.ray.io/en/latest/ray-core/configure.html#cluster-resources
master_host = self.address | ||
n_hosts = len(self.ips) | ||
for host in self.ips: | ||
self._start_ray(host, master_host, n_hosts, self.DEFAULT_RAY_PORT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we need to start on the head node, we already start ray remotely on the head node above (also, that runs remotely, where as this restart logic would start the head node locally, which I don't think is correct). I think we could just connect each of the workers to that, and it would be less disruptive to the existing server too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I've updated the code to start remotely on the head node and remotely on the workers.
Without the explicit start on the head node, we have two ray servers: 6380 by SkyPilot and 6379 default.
Do you mean we should use the existing 6379 default one and connect the workers to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As based on an offline convo, we are reusing the existing Ray instance on port 6379
@@ -787,6 +815,15 @@ def restart_server( | |||
self.client.use_https = https_flag | |||
self.client.cert_path = self.cert_config.cert_path | |||
|
|||
if restart_ray: | |||
# Restart Ray on the head node and each of the workers | |||
# TODO: kill ray on all nodes first. Need to think more of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should find out if it dies automatically on the workers when we kill it on the head or if we're really borking them when we pkill the head node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like 'RAY_HEAD_IP=127.0.0.1 RAY_HEAD_PORT=6379 ray stop' should stop the entire cluster with workers when run on the head node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it doesn't seem to work..
Back to pkill for now.
The workers do not die automatically, they become orphaned. There seems to be a way to carefully send shutdown commands to them, but need to experiment further. We can probably decouple it from the current PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next to try: python3 -c "import ray; ray.init('ray://localhost:6379'); ray.shutdown()"
https://docs.ray.io/en/latest/ray-core/api/doc/ray.shutdown.html
If that doesn't work, maybe this is a more involved and explicit way:
https://stackoverflow.com/questions/69613739/how-to-kill-ray-tasks-when-the-driver-is-dead
The algorithm:
_start_server_cmds
and initialized insidehttp_server.py
. The instance listens on port 6379 (note: the Skypilot ray instance listens on port 6380).cluster.py
. The workers start on the number of nodes that is derived from cluster.ips (-1 for the head node)http_server.py
(verified by a call toray.nodes()
)