Skip to content

Commit

Permalink
Create PlacementGroup for steps using vLLM (#842)
Browse files Browse the repository at this point in the history
* Create placement group for `vLLM`

* Use `SPREAD` if `pipeline_parallel_size>1`

* Fix bundle initialization

* Fix wrong dictionary

* Remove using `SPMD` from ray docs

* Refactor creating `PlacementGroup` for `vLLM`
  • Loading branch information
gabrielmbmb authored Jul 30, 2024
1 parent 2aa977f commit be61d20
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 16 deletions.
8 changes: 0 additions & 8 deletions docs/sections/how_to_guides/advanced/scaling_with_ray.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,4 @@ with Pipeline(name="text-generation-ray-pipeline") as pipeline:
load_data_from_hub >> text_generation
```

Finally, we need to define two environment variables in our `runtime_env.yaml` file:

```yaml
env_vars:
VLLM_USE_RAY_COMPILED_DAG: "1"
VLLM_USE_RAY_SPMD_WORKER: "1"
```

More information about distributed inference with `vLLM` can be found here: [vLLM - Distributed Serving](https://docs.vllm.ai/en/latest/serving/distributed_serving.html)
76 changes: 68 additions & 8 deletions src/distilabel/pipeline/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

from distilabel.distiset import create_distiset
from distilabel.llms.vllm import vLLM
from distilabel.pipeline.base import BasePipeline
from distilabel.pipeline.constants import INPUT_QUEUE_ATTR_NAME
from distilabel.pipeline.step_wrapper import _StepWrapper
Expand All @@ -26,6 +27,8 @@
from os import PathLike
from queue import Queue

from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

from distilabel.distiset import Distiset
from distilabel.pipeline.typing import InputDataset
from distilabel.steps.base import _Step
Expand Down Expand Up @@ -69,6 +72,7 @@ def __init__(

self._ray_head_node_url = ray_head_node_url
self._ray_init_kwargs = ray_init_kwargs or {}
self._ray_node_ids = {}

def run(
self,
Expand Down Expand Up @@ -171,6 +175,8 @@ def _init_ray(self) -> None:
else:
ray.init(**self._ray_init_kwargs)

self._ray_node_ids = {node["NodeID"]: False for node in ray.nodes()}

@property
def QueueClass(self) -> Callable:
from ray.util.queue import Queue
Expand Down Expand Up @@ -218,17 +224,20 @@ def run(self) -> str:
"name": f"distilabel-{self.name}-{step.name}-{replica}"
}

if step.resources.cpus is not None:
resources["num_cpus"] = step.resources.cpus
if hasattr(step, "llm") and isinstance(step.llm, vLLM): # type: ignore
resources["scheduling_strategy"] = self._create_vllm_placement_group(step)
else:
if step.resources.cpus is not None:
resources["num_cpus"] = step.resources.cpus

if step.resources.gpus is not None:
resources["num_gpus"] = step.resources.gpus
if step.resources.gpus is not None:
resources["num_gpus"] = step.resources.gpus

if step.resources.memory is not None:
resources["memory"] = step.resources.memory
if step.resources.memory is not None:
resources["memory"] = step.resources.memory

if step.resources.resources is not None:
resources["resources"] = step.resources.resources
if step.resources.resources is not None:
resources["resources"] = step.resources.resources

_StepWrapperRay = _StepWrapperRay.options(**resources) # type: ignore

Expand All @@ -255,6 +264,57 @@ def run(self) -> str:
)
step_wrapper.run.remote()

def _create_vllm_placement_group(
self, step: "_Step"
) -> "PlacementGroupSchedulingStrategy":
"""Creates a Ray placement group with as many GPU bundles as `tensor_parallel_size`
specified in the `vLLM` initialisation. The created placement group uses the `STRICT_PACK`
strategy if the `pipeline_parallel_size` is less or equal to 1, otherwise it uses
`SPREAD` (placement group with GPU bundles in several nodes). In addition, the created
placement group is targeted to be created in a specific node. This avoids having
`vLLM` raising the exception `Ray does not allocate any GPUs on the driver node...`,
as it assures that the driver `_StepWrapperRay` actor created resides in the same
node as the ray actors created by `vLLM` for the distributed inference.
Args:
step: the step which uses `vLLM`.
Returns:
A `PlacementGroupSchedulingStrategy` using the created `PlacementGroup`.
"""
import ray

llm = step.llm # type: ignore
tensor_parallel_size = llm.extra_kwargs.get("tensor_parallel_size", 1) # type: ignore
pipeline_parallel_size = llm.extra_kwargs.get( # type: ignore
"pipeline_parallel_size", 1
)

node_id = next(
node_id for node_id, used in self._ray_node_ids.items() if not used
)

self._ray_node_ids[node_id] = True

# Create a placement group
pg = ray.util.placement_group(
# Create `tensor_parallel_size` GPU bundles and at least one CPU bundle
# so the actors can be scheduled and executed (1 CPU bundle can have infinite actors):
# https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#schedule-tasks-and-actors-to-placement-groups-use-reserved-resources
bundles=[{"CPU": 1}] + [{"GPU": 1}] * tensor_parallel_size,
strategy="SPREAD" if pipeline_parallel_size > 1 else "STRICT_PACK",
_soft_target_node_id=node_id if pipeline_parallel_size is None else None,
)

self._logger.info(
f"Step '{step.name}' uses `vLLM`. Created a Ray placement group with bundle"
f" specs: {pg.bundle_specs}"
)

return ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy( # type: ignore
placement_group=pg,
)

def _teardown(self) -> None:
"""Clean/release/stop resources reserved to run the pipeline."""
if self._write_buffer:
Expand Down

0 comments on commit be61d20

Please sign in to comment.