Skip to content

Commit fc498c2

Browse files
kouroshHakhadstrodtman
authored andcommitted
[serve.llm] Fixed DP DSV3 issues (#55802)
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
1 parent e81a25a commit fc498c2

File tree

6 files changed

+94
-28
lines changed

6 files changed

+94
-28
lines changed

python/ray/llm/_internal/serve/configs/server_models.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,38 @@ def update_engine_kwargs(self, **kwargs: Any) -> None:
424424
if self._engine_config:
425425
self._engine_config.engine_kwargs.update(kwargs)
426426

427+
def _merge_replica_actor_and_child_actor_bundles(
428+
self,
429+
child_actor_bundles: List[Dict[str, float]],
430+
replica_actor_bundle: Dict[str, float],
431+
) -> List[Dict[str, float]]:
432+
"""Sum up the bundles from replica actor bundles with the first bundle from child actor bundles.
433+
434+
This is because the replica actor will use the first bundle in the list, and we want to collocate the replica actor with the child actor.
435+
So we need to group them together.
436+
437+
So for example:
438+
child_actor_bundles = [{"GPU": 1, "CPU": 1}, {"GPU": 1, "CPU": 1}]
439+
replica_actor_bundle = {"GPU": 0, "CPU": 1, "memory": 100}
440+
return [{"GPU": 1, "CPU": 2, "memory": 100}, {"GPU": 1, "CPU": 1}]
441+
"""
442+
443+
if not child_actor_bundles:
444+
return [replica_actor_bundle]
445+
446+
if not replica_actor_bundle:
447+
return child_actor_bundles
448+
449+
first_bundle = child_actor_bundles[0]
450+
bundle_key_set = set(first_bundle.keys()) | set(replica_actor_bundle.keys())
451+
452+
for key in bundle_key_set:
453+
first_bundle[key] = replica_actor_bundle.get(key, 0) + first_bundle.get(
454+
key, 0
455+
)
456+
457+
return [first_bundle] + child_actor_bundles[1:]
458+
427459
def _set_deployment_placement_options(self) -> Dict[str, Any]:
428460
deployment_config = self.deployment_config
429461
engine_config = self.get_engine_config()
@@ -449,15 +481,17 @@ def _set_deployment_placement_options(self) -> Dict[str, Any]:
449481
)
450482

451483
try:
452-
bundles = engine_config.placement_bundles
484+
child_actor_bundles = engine_config.placement_bundles
453485
except ValueError:
454486
# May happen if all bundles are empty.
455-
bundles = []
487+
child_actor_bundles = []
456488

457-
bundles = [replica_actor_resources] + bundles
489+
pg_bundles = self._merge_replica_actor_and_child_actor_bundles(
490+
child_actor_bundles, replica_actor_resources
491+
)
458492
deployment_config.update(
459493
{
460-
"placement_group_bundles": bundles,
494+
"placement_group_bundles": pg_bundles,
461495
"placement_group_strategy": engine_config.placement_strategy,
462496
}
463497
)
@@ -569,7 +603,7 @@ def _setup_kv_connector_backend(self):
569603
raise ValueError(f"Unsupported connector type: {kv_connector}")
570604

571605
# 2. Setup the backend
572-
kv_connector_backend = kv_connector_backend_class(kv_transfer_config)
606+
kv_connector_backend = kv_connector_backend_class(self)
573607
kv_connector_backend.setup()
574608

575609

python/ray/llm/_internal/serve/deployments/data_parallel/dp_server.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def build_dp_deployment(
7676
llm_config: LLMConfig,
7777
*,
7878
name_prefix: Optional[str] = None,
79+
options_override: Optional[dict] = None,
7980
) -> Application:
8081
"""Build a data parallel LLM deployment."""
8182
dp_size = llm_config.engine_kwargs.get("data_parallel_size", 1)
@@ -89,10 +90,12 @@ def build_dp_deployment(
8990
# the number of ranks per node because that has special semantics in vLLM.
9091
dp_size_per_node = llm_config.experimental_configs.get("dp_size_per_node", None)
9192

92-
deployment_options = llm_config.get_serve_options(name_prefix=name_prefix)
9393
dp_rank_assigner = DPRankAssigner.bind(
9494
dp_size=dp_size, dp_size_per_node=dp_size_per_node
9595
)
96+
deployment_options = llm_config.get_serve_options(name_prefix=name_prefix)
97+
if options_override:
98+
deployment_options.update(options_override)
9699

97100
return DPServer.as_deployment(deployment_options).bind(
98101
llm_config=llm_config, dp_rank_assigner=dp_rank_assigner

python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/base.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,29 @@
11
import abc
22
import random
33
import string
4-
from typing import Any, Dict
4+
from typing import TYPE_CHECKING, Any, Dict
5+
6+
if TYPE_CHECKING:
7+
from ray.llm._internal.serve.configs.server_models import LLMConfig
58

69

710
class BaseConnectorBackend(abc.ABC):
8-
def __init__(self, kv_transfer_config: Dict[str, Any]):
11+
def __init__(self, llm_config: "LLMConfig"):
912
"""Base class for connector backends.
1013
1114
Args:
12-
kv_transfer_config: Configuration for the KV transfer.
15+
llm_config: The llm configuration for this engine
1316
"""
14-
self.kv_transfer_config = kv_transfer_config
17+
self.llm_config = llm_config
18+
19+
@property
20+
def kv_transfer_config(self) -> Dict[str, Any]:
21+
engine_kwargs = self.llm_config.engine_kwargs
22+
kv_transfer_config = engine_kwargs.get("kv_transfer_config")
23+
assert (
24+
kv_transfer_config is not None
25+
), "In Connector backend, kv_transfer_config is not set"
26+
return kv_transfer_config
1527

1628
def _get_unique_suffix(self, len: int = 6) -> str:
1729
"""Generates unique alphanumeric suffix.

python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/nixl_connector.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,28 @@
66

77

88
class NixlConnectorBackend(BaseConnectorBackend):
9+
def _set_side_channel_port(self):
10+
from vllm import envs as vllm_envs, utils as vllm_utils
11+
12+
if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_PORT"):
13+
base_port: int = int(
14+
self.llm_config.experimental_configs.get(
15+
"NIXL_SIDE_CHANNEL_PORT_BASE", vllm_utils.get_open_port()
16+
)
17+
)
18+
# If dp_rank is set, we should use the
19+
# base port + dp_rank as the side channel port
20+
# due to a potential ray condition for getting the free ports.
21+
dp_rank = self.llm_config.engine_kwargs.get("data_parallel_rank", 0)
22+
port = base_port + dp_rank
23+
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port)
24+
25+
def _set_side_channel_host(self):
26+
from vllm import envs as vllm_envs, utils as vllm_utils
27+
28+
if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_HOST"):
29+
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = vllm_utils.get_ip()
30+
931
def setup(self) -> None:
1032
"""Initialize the NIXL connector backend.
1133
@@ -20,7 +42,7 @@ def setup(self) -> None:
2042
ValueError: If the current vLLM version doesn't support the required
2143
NIXL environment variables.
2244
"""
23-
from vllm import envs as vllm_envs, utils as vllm_utils
45+
from vllm import envs as vllm_envs
2446

2547
if (
2648
"VLLM_NIXL_SIDE_CHANNEL_PORT" not in vllm_envs.environment_variables
@@ -32,11 +54,8 @@ def setup(self) -> None:
3254
"that you are using an older version of vLLM."
3355
)
3456

35-
if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_PORT"):
36-
port: int = vllm_utils.get_open_port()
37-
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port)
38-
if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_HOST"):
39-
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = vllm_utils.get_ip()
57+
self._set_side_channel_port()
58+
self._set_side_channel_host()
4059

4160
# We need to overwrite the engine_id to make it unique across replicas.
4261
engine_id = self.kv_transfer_config.get("engine_id", self._get_unique_suffix())

python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import dataclasses
23
import os
34
from typing import Any, Dict, List, Optional
@@ -201,7 +202,7 @@ def placement_bundles(self) -> List[Dict[str, float]]:
201202
bundle = {"GPU": 1}
202203
if self.accelerator_type:
203204
bundle[self.ray_accelerator_type()] = 0.001
204-
bundles = [bundle for _ in range(self.num_devices)]
205+
bundles = [copy.deepcopy(bundle) for _ in range(self.num_devices)]
205206

206207
return bundles
207208

python/ray/llm/tests/serve/cpu/configs/test_models.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,7 @@ def test_get_serve_options_with_accelerator_type(self):
178178
"max_replicas": 10,
179179
}
180180
assert serve_options["placement_group_bundles"] == [
181-
{"CPU": 1, "GPU": 0},
182-
{"GPU": 1, "accelerator_type:A100-40G": 0.001},
181+
{"CPU": 1, "GPU": 1, "accelerator_type:A100-40G": 0.001},
183182
]
184183
assert serve_options["placement_group_strategy"] == "STRICT_PACK"
185184
assert serve_options["name"] == "Test:test_model"
@@ -214,10 +213,7 @@ def test_get_serve_options_without_accelerator_type(self):
214213
"initial_replicas": 1,
215214
"max_replicas": 10,
216215
}
217-
assert serve_options["placement_group_bundles"] == [
218-
{"CPU": 1, "GPU": 0},
219-
{"GPU": 1},
220-
]
216+
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 1}]
221217
assert serve_options["placement_group_strategy"] == "STRICT_PACK"
222218
assert serve_options["name"] == "Test:test_model"
223219

@@ -239,8 +235,9 @@ def test_resources_per_bundle(self):
239235
model_loading_config=dict(model_id="test_model"),
240236
engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2),
241237
).get_serve_options(name_prefix="Test:")
242-
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [
243-
{"GPU": 1} for _ in range(6)
238+
239+
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 1}] + [
240+
{"GPU": 1} for _ in range(5)
244241
]
245242

246243
# Test the custom resource bundle
@@ -249,9 +246,9 @@ def test_resources_per_bundle(self):
249246
engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2),
250247
resources_per_bundle={"XPU": 1},
251248
).get_serve_options(name_prefix="Test:")
252-
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [
253-
{"XPU": 1} for _ in range(6)
254-
]
249+
assert serve_options["placement_group_bundles"] == [
250+
{"CPU": 1, "GPU": 0, "XPU": 1}
251+
] + [{"XPU": 1} for _ in range(5)]
255252

256253
def test_engine_config_cached(self):
257254
"""Test that the engine config is cached and not recreated when calling

0 commit comments

Comments
 (0)