Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gather GPUs per Ray node to create placement groups #848

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions src/distilabel/pipeline/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ def _init_ray(self) -> None:
else:
ray.init(**self._ray_init_kwargs)

self._ray_node_ids = {node["NodeID"]: False for node in ray.nodes()}
# Get the number of GPUs per Ray node
for node in ray.nodes():
node_id = node["NodeID"]
gpus = node["Resources"].get("GPU", 0)
self._ray_node_ids[node_id] = gpus

self._logger.info(f"Ray nodes GPUs: {self._ray_node_ids}")

@property
def QueueClass(self) -> Callable:
Expand Down Expand Up @@ -290,11 +296,19 @@ def _create_vllm_placement_group(
"pipeline_parallel_size", 1
)

node_id = next(
node_id for node_id, used in self._ray_node_ids.items() if not used
)

self._ray_node_ids[node_id] = True
# TODO: this is suboptimal as placement groups can get distributed in a suboptimal
# way (some GPUs are not used and some LLMs cannot be loaded because they need 2
# GPUs but there is 1 GPU in one node and 1 GPU in another node)
selected_node_id = None
for node_id, gpus in self._ray_node_ids.items():
if gpus >= tensor_parallel_size:
self._logger.info(
f"Ray node with ID '{node_id}' has enough GPUs (needed: {tensor_parallel_size},"
f" available: {gpus}) for allocating `vLLM` used by '{step.name}' step."
)
self._ray_node_ids[node_id] -= tensor_parallel_size
selected_node_id = node_id
break

# Create a placement group
pg = ray.util.placement_group(
Expand All @@ -303,7 +317,9 @@ def _create_vllm_placement_group(
# https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#schedule-tasks-and-actors-to-placement-groups-use-reserved-resources
bundles=[{"CPU": 1}] + [{"GPU": 1}] * tensor_parallel_size,
strategy="SPREAD" if pipeline_parallel_size > 1 else "STRICT_PACK",
_soft_target_node_id=node_id if pipeline_parallel_size is None else None,
_soft_target_node_id=selected_node_id
if pipeline_parallel_size is None
else None,
)

self._logger.info(
Expand Down
Loading