diff --git a/python/ray/llm/_internal/serve/configs/server_models.py b/python/ray/llm/_internal/serve/configs/server_models.py index 910fce823c0c..3be3e246929a 100644 --- a/python/ray/llm/_internal/serve/configs/server_models.py +++ b/python/ray/llm/_internal/serve/configs/server_models.py @@ -424,6 +424,38 @@ def update_engine_kwargs(self, **kwargs: Any) -> None: if self._engine_config: self._engine_config.engine_kwargs.update(kwargs) + def _merge_replica_actor_and_child_actor_bundles( + self, + child_actor_bundles: List[Dict[str, float]], + replica_actor_bundle: Dict[str, float], + ) -> List[Dict[str, float]]: + """Sum up the bundles from replica actor bundles with the first bundle from child actor bundles. + + This is because the replica actor will use the first bundle in the list, and we want to collocate the replica actor with the child actor. + So we need to group them together. + + So for example: + child_actor_bundles = [{"GPU": 1, "CPU": 1}, {"GPU": 1, "CPU": 1}] + replica_actor_bundle = {"GPU": 0, "CPU": 1, "memory": 100} + return [{"GPU": 1, "CPU": 2, "memory": 100}, {"GPU": 1, "CPU": 1}] + """ + + if not child_actor_bundles: + return [replica_actor_bundle] + + if not replica_actor_bundle: + return child_actor_bundles + + first_bundle = child_actor_bundles[0] + bundle_key_set = set(first_bundle.keys()) | set(replica_actor_bundle.keys()) + + for key in bundle_key_set: + first_bundle[key] = replica_actor_bundle.get(key, 0) + first_bundle.get( + key, 0 + ) + + return [first_bundle] + child_actor_bundles[1:] + def _set_deployment_placement_options(self) -> Dict[str, Any]: deployment_config = self.deployment_config engine_config = self.get_engine_config() @@ -449,15 +481,17 @@ def _set_deployment_placement_options(self) -> Dict[str, Any]: ) try: - bundles = engine_config.placement_bundles + child_actor_bundles = engine_config.placement_bundles except ValueError: # May happen if all bundles are empty. - bundles = [] + child_actor_bundles = [] - bundles = [replica_actor_resources] + bundles + pg_bundles = self._merge_replica_actor_and_child_actor_bundles( + child_actor_bundles, replica_actor_resources + ) deployment_config.update( { - "placement_group_bundles": bundles, + "placement_group_bundles": pg_bundles, "placement_group_strategy": engine_config.placement_strategy, } ) @@ -569,7 +603,7 @@ def _setup_kv_connector_backend(self): raise ValueError(f"Unsupported connector type: {kv_connector}") # 2. Setup the backend - kv_connector_backend = kv_connector_backend_class(kv_transfer_config) + kv_connector_backend = kv_connector_backend_class(self) kv_connector_backend.setup() diff --git a/python/ray/llm/_internal/serve/deployments/data_parallel/dp_server.py b/python/ray/llm/_internal/serve/deployments/data_parallel/dp_server.py index 8e2bc445f3fd..11c8d8e25e98 100644 --- a/python/ray/llm/_internal/serve/deployments/data_parallel/dp_server.py +++ b/python/ray/llm/_internal/serve/deployments/data_parallel/dp_server.py @@ -76,6 +76,7 @@ def build_dp_deployment( llm_config: LLMConfig, *, name_prefix: Optional[str] = None, + options_override: Optional[dict] = None, ) -> Application: """Build a data parallel LLM deployment.""" dp_size = llm_config.engine_kwargs.get("data_parallel_size", 1) @@ -89,10 +90,12 @@ def build_dp_deployment( # the number of ranks per node because that has special semantics in vLLM. dp_size_per_node = llm_config.experimental_configs.get("dp_size_per_node", None) - deployment_options = llm_config.get_serve_options(name_prefix=name_prefix) dp_rank_assigner = DPRankAssigner.bind( dp_size=dp_size, dp_size_per_node=dp_size_per_node ) + deployment_options = llm_config.get_serve_options(name_prefix=name_prefix) + if options_override: + deployment_options.update(options_override) return DPServer.as_deployment(deployment_options).bind( llm_config=llm_config, dp_rank_assigner=dp_rank_assigner diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/base.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/base.py index 420f7f44a06f..2999a147c887 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/base.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/base.py @@ -1,17 +1,29 @@ import abc import random import string -from typing import Any, Dict +from typing import TYPE_CHECKING, Any, Dict + +if TYPE_CHECKING: + from ray.llm._internal.serve.configs.server_models import LLMConfig class BaseConnectorBackend(abc.ABC): - def __init__(self, kv_transfer_config: Dict[str, Any]): + def __init__(self, llm_config: "LLMConfig"): """Base class for connector backends. Args: - kv_transfer_config: Configuration for the KV transfer. + llm_config: The llm configuration for this engine """ - self.kv_transfer_config = kv_transfer_config + self.llm_config = llm_config + + @property + def kv_transfer_config(self) -> Dict[str, Any]: + engine_kwargs = self.llm_config.engine_kwargs + kv_transfer_config = engine_kwargs.get("kv_transfer_config") + assert ( + kv_transfer_config is not None + ), "In Connector backend, kv_transfer_config is not set" + return kv_transfer_config def _get_unique_suffix(self, len: int = 6) -> str: """Generates unique alphanumeric suffix. diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/nixl_connector.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/nixl_connector.py index 5cdbb37f744e..76036a5f3117 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/nixl_connector.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/kv_transfer_backends/nixl_connector.py @@ -6,6 +6,28 @@ class NixlConnectorBackend(BaseConnectorBackend): + def _set_side_channel_port(self): + from vllm import envs as vllm_envs, utils as vllm_utils + + if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_PORT"): + base_port: int = int( + self.llm_config.experimental_configs.get( + "NIXL_SIDE_CHANNEL_PORT_BASE", vllm_utils.get_open_port() + ) + ) + # If dp_rank is set, we should use the + # base port + dp_rank as the side channel port + # due to a potential ray condition for getting the free ports. + dp_rank = self.llm_config.engine_kwargs.get("data_parallel_rank", 0) + port = base_port + dp_rank + os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port) + + def _set_side_channel_host(self): + from vllm import envs as vllm_envs, utils as vllm_utils + + if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_HOST"): + os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = vllm_utils.get_ip() + def setup(self) -> None: """Initialize the NIXL connector backend. @@ -20,7 +42,7 @@ def setup(self) -> None: ValueError: If the current vLLM version doesn't support the required NIXL environment variables. """ - from vllm import envs as vllm_envs, utils as vllm_utils + from vllm import envs as vllm_envs if ( "VLLM_NIXL_SIDE_CHANNEL_PORT" not in vllm_envs.environment_variables @@ -32,11 +54,8 @@ def setup(self) -> None: "that you are using an older version of vLLM." ) - if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_PORT"): - port: int = vllm_utils.get_open_port() - os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(port) - if not vllm_envs.is_set("VLLM_NIXL_SIDE_CHANNEL_HOST"): - os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = vllm_utils.get_ip() + self._set_side_channel_port() + self._set_side_channel_host() # We need to overwrite the engine_id to make it unique across replicas. engine_id = self.kv_transfer_config.get("engine_id", self._get_unique_suffix()) diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index 081b0b2b6807..36a5444fc564 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -1,3 +1,4 @@ +import copy import dataclasses import os from typing import Any, Dict, List, Optional @@ -201,7 +202,7 @@ def placement_bundles(self) -> List[Dict[str, float]]: bundle = {"GPU": 1} if self.accelerator_type: bundle[self.ray_accelerator_type()] = 0.001 - bundles = [bundle for _ in range(self.num_devices)] + bundles = [copy.deepcopy(bundle) for _ in range(self.num_devices)] return bundles diff --git a/python/ray/llm/tests/serve/cpu/configs/test_models.py b/python/ray/llm/tests/serve/cpu/configs/test_models.py index 6f5bfab6d1b6..b88ca0524029 100644 --- a/python/ray/llm/tests/serve/cpu/configs/test_models.py +++ b/python/ray/llm/tests/serve/cpu/configs/test_models.py @@ -178,8 +178,7 @@ def test_get_serve_options_with_accelerator_type(self): "max_replicas": 10, } assert serve_options["placement_group_bundles"] == [ - {"CPU": 1, "GPU": 0}, - {"GPU": 1, "accelerator_type:A100-40G": 0.001}, + {"CPU": 1, "GPU": 1, "accelerator_type:A100-40G": 0.001}, ] assert serve_options["placement_group_strategy"] == "STRICT_PACK" assert serve_options["name"] == "Test:test_model" @@ -214,10 +213,7 @@ def test_get_serve_options_without_accelerator_type(self): "initial_replicas": 1, "max_replicas": 10, } - assert serve_options["placement_group_bundles"] == [ - {"CPU": 1, "GPU": 0}, - {"GPU": 1}, - ] + assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 1}] assert serve_options["placement_group_strategy"] == "STRICT_PACK" assert serve_options["name"] == "Test:test_model" @@ -239,8 +235,9 @@ def test_resources_per_bundle(self): model_loading_config=dict(model_id="test_model"), engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), ).get_serve_options(name_prefix="Test:") - assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [ - {"GPU": 1} for _ in range(6) + + assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 1}] + [ + {"GPU": 1} for _ in range(5) ] # Test the custom resource bundle @@ -249,9 +246,9 @@ def test_resources_per_bundle(self): engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2), resources_per_bundle={"XPU": 1}, ).get_serve_options(name_prefix="Test:") - assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [ - {"XPU": 1} for _ in range(6) - ] + assert serve_options["placement_group_bundles"] == [ + {"CPU": 1, "GPU": 0, "XPU": 1} + ] + [{"XPU": 1} for _ in range(5)] def test_engine_config_cached(self): """Test that the engine config is cached and not recreated when calling