Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions python/ray/llm/_internal/serve/configs/server_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,38 @@ def update_engine_kwargs(self, **kwargs: Any) -> None:
if self._engine_config:
self._engine_config.engine_kwargs.update(kwargs)

def _merge_replica_actor_and_child_actor_bundles(
self,
child_actor_bundles: List[Dict[str, float]],
replica_actor_bundle: Dict[str, float],
Comment on lines +429 to +430
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: switch the order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fine.

) -> List[Dict[str, float]]:
"""Sum up the bundles from replica actor bundles with the first bundle from child actor bundles.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully getting the intention here: the placement strategy is STRICT_PACK (at least for TP only), why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was hanging when deployment was

[{CPU: 1, GPU:0}] + [{GPU: 1}] * tp

Also, in case of PACK, because replicas are not limited to be scheduled on the same node as their child RayWorkers I was always confounded. Modifying to this form of placement ensures the replica actor is scheduled on the same node as its own RayWorker


This is because the replica actor will use the first bundle in the list, and we want to collocate the replica actor with the child actor.
So we need to group them together.

So for example:
child_actor_bundles = [{"GPU": 1, "CPU": 1}, {"GPU": 1, "CPU": 1}]
replica_actor_bundle = {"GPU": 0, "CPU": 1, "memory": 100}
return [{"GPU": 1, "CPU": 2, "memory": 100}, {"GPU": 1, "CPU": 1}]
"""

if not child_actor_bundles:
return [replica_actor_bundle]

if not replica_actor_bundle:
return child_actor_bundles

first_bundle = child_actor_bundles[0]
bundle_key_set = set(first_bundle.keys()) | set(replica_actor_bundle.keys())

for key in bundle_key_set:
first_bundle[key] = replica_actor_bundle.get(key, 0) + first_bundle.get(
key, 0
)

return [first_bundle] + child_actor_bundles[1:]

def _set_deployment_placement_options(self) -> Dict[str, Any]:
deployment_config = self.deployment_config
engine_config = self.get_engine_config()
Expand All @@ -449,15 +481,17 @@ def _set_deployment_placement_options(self) -> Dict[str, Any]:
)

try:
bundles = engine_config.placement_bundles
child_actor_bundles = engine_config.placement_bundles
except ValueError:
# May happen if all bundles are empty.
bundles = []
child_actor_bundles = []

bundles = [replica_actor_resources] + bundles
pg_bundles = self._merge_replica_actor_and_child_actor_bundles(
child_actor_bundles, replica_actor_resources
)
deployment_config.update(
{
"placement_group_bundles": bundles,
"placement_group_bundles": pg_bundles,
"placement_group_strategy": engine_config.placement_strategy,
}
)
Expand Down Expand Up @@ -569,7 +603,7 @@ def _setup_kv_connector_backend(self):
raise ValueError(f"Unsupported connector type: {kv_connector}")

# 2. Setup the backend
kv_connector_backend = kv_connector_backend_class(kv_transfer_config)
kv_connector_backend = kv_connector_backend_class(self)
kv_connector_backend.setup()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def build_dp_deployment(
llm_config: LLMConfig,
*,
name_prefix: Optional[str] = None,
options_override: Optional[dict] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: what do you have on mind to use this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

placement groups / deployment name (full name) etc.

) -> Application:
"""Build a data parallel LLM deployment."""
dp_size = llm_config.engine_kwargs.get("data_parallel_size", 1)
Expand All @@ -89,10 +90,12 @@ def build_dp_deployment(
# the number of ranks per node because that has special semantics in vLLM.
dp_size_per_node = llm_config.experimental_configs.get("dp_size_per_node", None)

deployment_options = llm_config.get_serve_options(name_prefix=name_prefix)
dp_rank_assigner = DPRankAssigner.bind(
dp_size=dp_size, dp_size_per_node=dp_size_per_node
)
deployment_options = llm_config.get_serve_options(name_prefix=name_prefix)
if options_override:
deployment_options.update(options_override)

return DPServer.as_deployment(deployment_options).bind(
llm_config=llm_config, dp_rank_assigner=dp_rank_assigner
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import abc
import random
import string
from typing import Any, Dict
from typing import TYPE_CHECKING, Any, Dict

if TYPE_CHECKING:
from ray.llm._internal.serve.configs.server_models import LLMConfig


class BaseConnectorBackend(abc.ABC):
def __init__(self, kv_transfer_config: Dict[str, Any]):
def __init__(self, llm_config: "LLMConfig"):
"""Base class for connector backends.

Args:
kv_transfer_config: Configuration for the KV transfer.
llm_config: The llm configuration for this engine
"""
self.kv_transfer_config = kv_transfer_config
self.llm_config = llm_config

@property
def kv_transfer_config(self) -> Dict[str, Any]:
engine_kwargs = self.llm_config.engine_kwargs
kv_transfer_config = engine_kwargs.get("kv_transfer_config")
assert (
kv_transfer_config is not None
), "In Connector backend, kv_transfer_config is not set"
Comment on lines +23 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to validate it early in the constructor, and validate only once?

return kv_transfer_config

def _get_unique_suffix(self, len: int = 6) -> str:
"""Generates unique alphanumeric suffix.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@


class NixlConnectorBackend(BaseConnectorBackend):
def _set_side_channel_port(self):
from vllm import envs as vllm_envs, utils as vllm_utils

if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_PORT"):
base_port: int = int(
self.llm_config.experimental_configs.get(
"NIXL_SIDE_CHANNEL_PORT_BASE", vllm_utils.get_open_port()
)
)
# If dp_rank is set, we should use the
# base port + dp_rank as the side channel port
# due to a potential ray condition for getting the free ports.
dp_rank = self.llm_config.engine_kwargs.get("data_parallel_rank", 0)
port = base_port + dp_rank
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port)

def _set_side_channel_host(self):
from vllm import envs as vllm_envs, utils as vllm_utils

if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_HOST"):
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = vllm_utils.get_ip()

def setup(self) -> None:
"""Initialize the NIXL connector backend.

Expand All @@ -20,7 +42,7 @@ def setup(self) -> None:
ValueError: If the current vLLM version doesn't support the required
NIXL environment variables.
"""
from vllm import envs as vllm_envs, utils as vllm_utils
from vllm import envs as vllm_envs

if (
"VLLM_NIXL_SIDE_CHANNEL_PORT" not in vllm_envs.environment_variables
Expand All @@ -32,11 +54,8 @@ def setup(self) -> None:
"that you are using an older version of vLLM."
)

if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_PORT"):
port: int = vllm_utils.get_open_port()
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port)
if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_HOST"):
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = vllm_utils.get_ip()
self._set_side_channel_port()
self._set_side_channel_host()

# We need to overwrite the engine_id to make it unique across replicas.
engine_id = self.kv_transfer_config.get("engine_id", self._get_unique_suffix())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import dataclasses
import os
from typing import Any, Dict, List, Optional
Expand Down Expand Up @@ -201,7 +202,7 @@ def placement_bundles(self) -> List[Dict[str, float]]:
bundle = {"GPU": 1}
if self.accelerator_type:
bundle[self.ray_accelerator_type()] = 0.001
bundles = [bundle for _ in range(self.num_devices)]
bundles = [copy.deepcopy(bundle) for _ in range(self.num_devices)]

return bundles

Expand Down
19 changes: 8 additions & 11 deletions python/ray/llm/tests/serve/cpu/configs/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ def test_get_serve_options_with_accelerator_type(self):
"max_replicas": 10,
}
assert serve_options["placement_group_bundles"] == [
{"CPU": 1, "GPU": 0},
{"GPU": 1, "accelerator_type:A100-40G": 0.001},
{"CPU": 1, "GPU": 1, "accelerator_type:A100-40G": 0.001},
]
assert serve_options["placement_group_strategy"] == "STRICT_PACK"
assert serve_options["name"] == "Test:test_model"
Expand Down Expand Up @@ -214,10 +213,7 @@ def test_get_serve_options_without_accelerator_type(self):
"initial_replicas": 1,
"max_replicas": 10,
}
assert serve_options["placement_group_bundles"] == [
{"CPU": 1, "GPU": 0},
{"GPU": 1},
]
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 1}]
assert serve_options["placement_group_strategy"] == "STRICT_PACK"
assert serve_options["name"] == "Test:test_model"

Expand All @@ -239,8 +235,9 @@ def test_resources_per_bundle(self):
model_loading_config=dict(model_id="test_model"),
engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2),
).get_serve_options(name_prefix="Test:")
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [
{"GPU": 1} for _ in range(6)

assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 1}] + [
{"GPU": 1} for _ in range(5)
]

# Test the custom resource bundle
Expand All @@ -249,9 +246,9 @@ def test_resources_per_bundle(self):
engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2),
resources_per_bundle={"XPU": 1},
).get_serve_options(name_prefix="Test:")
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [
{"XPU": 1} for _ in range(6)
]
assert serve_options["placement_group_bundles"] == [
{"CPU": 1, "GPU": 0, "XPU": 1}
] + [{"XPU": 1} for _ in range(5)]

def test_engine_config_cached(self):
"""Test that the engine config is cached and not recreated when calling
Expand Down