Skip to content

Commit

Permalink
[Misc] Optimize ray worker initialization time (vllm-project#11275)
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Co-authored-by: Cody Yu <hao.yu.cody@gmail.com>
  • Loading branch information
2 people authored and BKitor committed Dec 30, 2024
1 parent 3c57a07 commit 7bbd9e0
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions vllm/executor/ray_gpu_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup",

# Create the workers.
driver_ip = get_ip()
workers = []
for bundle_id, bundle in enumerate(placement_group.bundle_specs):
if not bundle.get("GPU", 0):
continue
Expand All @@ -138,20 +139,30 @@ def _init_workers_ray(self, placement_group: "PlacementGroup",
scheduling_strategy=scheduling_strategy,
**ray_remote_kwargs,
)(RayWorkerWrapper).remote(vllm_config=self.vllm_config)
workers.append(worker)

if self.use_ray_spmd_worker:
self.workers.append(worker)
else:
worker_ip = ray.get(worker.get_node_ip.remote())
if worker_ip == driver_ip and self.driver_dummy_worker is None:
worker_ip_refs = [
worker.get_node_ip.remote() # type: ignore[attr-defined]
for worker in workers
]
worker_ips = ray.get(worker_ip_refs)

if not self.use_ray_spmd_worker:
for i in range(len(workers)):
worker = workers[i]
worker_ip = worker_ips[i]
if self.driver_dummy_worker is None and worker_ip == driver_ip:
# If the worker is on the same node as the driver, we use it
# as the resource holder for the driver process.
self.driver_dummy_worker = worker
self.driver_worker = RayWorkerWrapper(
vllm_config=self.vllm_config)
else:
# Else, added to the list of workers.
self.workers.append(worker)
workers.pop(i)
worker_ips.pop(i)
self.workers = workers
break
else:
self.workers = workers

logger.debug("workers: %s", self.workers)
logger.debug("driver_dummy_worker: %s", self.driver_dummy_worker)
Expand All @@ -161,14 +172,12 @@ def _init_workers_ray(self, placement_group: "PlacementGroup",
"adjusting the Ray placement group or running the driver on a "
"GPU node.")

worker_ips = [
ray.get(worker.get_node_ip.remote()) # type: ignore[attr-defined]
for worker in self.workers
]
ip_counts: Dict[str, int] = {}
for ip in worker_ips:
ip_counts[ip] = ip_counts.get(ip, 0) + 1

worker_to_ip = dict(zip(self.workers, worker_ips))

def sort_by_driver_then_worker_ip(worker):
"""
Sort the workers based on 3 properties:
Expand All @@ -179,7 +188,7 @@ def sort_by_driver_then_worker_ip(worker):
3. Finally, if the work is on a node with smaller IP address, it
should be placed first.
"""
ip = ray.get(worker.get_node_ip.remote())
ip = worker_to_ip[worker]
return (ip != driver_ip, ip_counts[ip], ip)

# After sorting, the workers on the same node will be
Expand Down

0 comments on commit 7bbd9e0

Please sign in to comment.