From e66341b252b573b1e9e236f4ee62fbab51fa79a9 Mon Sep 17 00:00:00 2001 From: ehsk Date: Tue, 3 Feb 2026 20:56:49 +0000 Subject: [PATCH] vllm rolled back to 0.8.5.post1 due to stateless process group issues in multi-node settings --- README.md | 6 +++--- conf/base.yaml | 5 +++-- pipelinerl/finetune_loop.py | 25 +++++++++++++++++-------- pipelinerl/vllm0.py | 10 +++++++--- pyproject.toml | 10 +++++----- 5 files changed, 35 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 7b890b98..aae0ab24 100644 --- a/README.md +++ b/README.md @@ -195,9 +195,9 @@ cd PipelineRL Create the environments with dependencies. ```bash -conda create -n pipeline-rl -y python=3.12 +conda create -n pipeline-rl -y python=3.11 conda run --no-capture-output -n pipeline-rl pip install -e . -conda run --no-capture-output -n pipeline-rl pip install flash-attn==2.8.3 --no-build-isolation +conda run --no-capture-output -n pipeline-rl pip install flash-attn==2.7.4.post1 --no-build-isolation ``` Alternatively for `flash-attn`, you can install it via prebuilt packages (on Linux): @@ -205,7 +205,7 @@ Alternatively for `flash-attn`, you can install it via prebuilt packages (on Lin # Check your PyTorch's C++ ABI setting first: # python -c "import torch; print(torch._C._GLIBCXX_USE_CXX11_ABI)" # Use cxx11abiTRUE or cxx11abiFALSE in the URL accordingly -conda run --no-capture-output -n pipeline-rl pip install https://github.com/Dao-AILab/flash-attention/releases/download/v2.8.3/flash_attn-2.8.3+cu12torch2.7cxx11abiTRUE-cp312-cp312-linux_x86_64.whl +conda run --no-capture-output -n pipeline-rl pip install https://github.com/Dao-AILab/flash-attention/releases/download/v2.7.4.post1/flash_attn-2.7.4.post1+cu12torch2.6cxx11abiTRUE-cp311-cp311-linux_x86_64.whl ``` By default Pipeline-RL will use the file system as the medium for streaming the generated data to the trainer processes. This works on one node, but the files can get quite large. To use Redis instead you will need to install the Redis server in the same conda environment: diff --git a/conf/base.yaml b/conf/base.yaml index 5418f846..36682995 100644 --- a/conf/base.yaml +++ b/conf/base.yaml @@ -30,7 +30,7 @@ preprocess: chunk_n_groups: 2 # queue for loaded raw groups raw_queue_size: 8 - # queue for processed chunks of multiple groups + # queue for processed chunks of multiple groups input_queue_size: 32 # queue for ready chunks for multiple groups output_queue_size: 32 @@ -40,9 +40,10 @@ preprocess: ring_buffer_size: 128 # "virtual" sample queue per lead trainer max_ready_samples_per_lead: 64 - pop_old_data: ${..pop_old_data} + pop_old_data: ${..pop_old_data} shared_memory_entry_size: 100000000 log_every_n_samples: 128 + llm: parameters: # changed diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index b23d6429..b8b4e3f6 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -214,7 +214,8 @@ def send_weight_update( with deepspeed.zero.GatheredParameters([parameter]): if get_accelerator().is_main_process: # Use PyNcclCommunicator's broadcast method as torch.distributed does not work since vLLM disabled that transfer path - self.actor_update_group.broadcast(parameter.data, src=0, stream=torch.cuda.current_stream()) + # REVERTED: self.actor_update_group.broadcast(parameter.data, src=0, stream=torch.cuda.current_stream()) + dist.broadcast(parameter.data, src=0, group=self.actor_update_group) if get_accelerator().is_main_process: logger.info("Wait for HTTP requests") for future in futures: # type: ignore @@ -257,7 +258,9 @@ def send_weight_update( logger.info(f"Published weight update request for version {version}") for _, parameter in named_parameters.items(): # Use PyNcclCommunicator's broadcast method as torch.distributed does not work since vLLM disabled that transfer path - self.actor_update_group.broadcast(parameter.data, src=0, stream=torch.cuda.current_stream()) + # REVERTED: self.actor_update_group.broadcast(parameter.data, src=0, stream=torch.cuda.current_stream()) + dist.broadcast(parameter.data, src=0, group=self.actor_update_group) + dist.barrier(self.actor_update_group) for future in futures: future.result() logger.info("Finished broadcasting weights") @@ -410,15 +413,19 @@ def run_finetuning_loop( get_accelerator().wait_for_everyone() if get_accelerator().is_main_process and args.send_weight_updates: - current_device = get_accelerator().device - torch.cuda.set_device(current_device) - logger.info("Initializing actor process group using StatelessProcessGroup") - logger.info(f"Set CUDA device to {current_device} for actor process group (rank 0)") - actor_update_group = torch_utils.stateless_init_process_group( + # current_device = get_accelerator().device + # torch.cuda.set_device(current_device) + # logger.info("Initializing actor process group using StatelessProcessGroup") + # logger.info(f"Set CUDA device to {current_device} for actor process group (rank 0)") + # actor_update_group = torch_utils.stateless_init_process_group( + logger.info("Initializing actor process group") + actor_update_group = torch_utils.init_extra_process_group( + group_name="actor", + backend="nccl", init_method=cfg.me.weight_update_group_init_method, rank=0, world_size=cfg.me.weight_update_group_world_size, - device=current_device, + # device=current_device, ) logger.info("Actor process group initialized") else: @@ -498,6 +505,8 @@ def run_finetuning_loop( if weight_update_manager is not None: weight_update_manager.shutdown() # PyNcclCommunicator doesn't need explicit destroy like torch.distributed process groups + if actor_update_group: + dist.destroy_process_group(actor_update_group) def rl_finetuning_worker( diff --git a/pipelinerl/vllm0.py b/pipelinerl/vllm0.py index 54f40baf..eda23308 100644 --- a/pipelinerl/vllm0.py +++ b/pipelinerl/vllm0.py @@ -88,11 +88,14 @@ def init_actor_update_group( + f"Weight update group init method: {weight_update_group_init_method}, world size: {weight_update_group_world_size}" ) # Use StatelessProcessGroup + PyNcclCommunicator for cross-process NCCL communication - self.process_group = torch_utils.stateless_init_process_group( + # self.process_group = torch_utils.stateless_init_process_group( + self.process_group = pipelinerl.torch_utils.init_extra_process_group( + group_name="actor", + backend="nccl", init_method=weight_update_group_init_method, rank=self.pg_rank, world_size=weight_update_group_world_size, - device=self.device, + # device=self.device, ) logger.info(prefix + "Actor update process group initialized") @@ -104,7 +107,8 @@ def receive_weight_update(self, request: WeightUpdateRequest): if target_dtype not in expected_dtypes: logger.warning(f"Unexpected dtype for {info.name}: {info.dtype}") buffer = torch.empty(tuple(info.shape), dtype=target_dtype, device=self.device) - self.process_group.broadcast(buffer, src=0, stream=torch.cuda.current_stream()) + # self.process_group.broadcast(buffer, src=0, stream=torch.cuda.current_stream()) + torch.distributed.broadcast(buffer, src=0, group=self.process_group) if isinstance(self.model_runner, MultiStepModelRunner): loaded_params = self.model_runner._base_model_runner.model.load_weights( weights=[(info.name, buffer)] diff --git a/pyproject.toml b/pyproject.toml index ac0ccc28..13b5643e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ requires = [ "wheel", "numpy>=1.26.0", "packaging>=23.0", - "torch==2.7.1", + "torch~=2.6.0", ] build-backend = "setuptools.build_meta" @@ -13,19 +13,19 @@ name = "pipelinerl" version = "0.1.0" description = "A scalable asynchronous reinforcement learning implementation with in-flight weight updates." readme = "README.md" -requires-python = ">=3.12" +requires-python = ">=3.11" license = { file = "LICENSE" } authors = [ { name = "ServiceNow" }, ] dependencies = [ "aiohttp>=3.9.0", - "vllm==0.10.0", + "vllm==0.8.5.post1", "accelerate==1.12.0", - "deepspeed~=0.18.0", + "deepspeed==0.15.4", "browsergym>=0.13.0", "datasets>=2.21.0", - "transformers~=4.57.0" , + "transformers~=4.52.0" , "fastapi>=0.115.0", "joblib>=1.3.2", "jsonref>=1.1.0",