Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,17 @@ cd PipelineRL

Create the environments with dependencies.
```bash
conda create -n pipeline-rl -y python=3.12
conda create -n pipeline-rl -y python=3.11
conda run --no-capture-output -n pipeline-rl pip install -e .
conda run --no-capture-output -n pipeline-rl pip install flash-attn==2.8.3 --no-build-isolation
conda run --no-capture-output -n pipeline-rl pip install flash-attn==2.7.4.post1 --no-build-isolation
```

Alternatively for `flash-attn`, you can install it via prebuilt packages (on Linux):
```bash
# Check your PyTorch's C++ ABI setting first:
# python -c "import torch; print(torch._C._GLIBCXX_USE_CXX11_ABI)"
# Use cxx11abiTRUE or cxx11abiFALSE in the URL accordingly
conda run --no-capture-output -n pipeline-rl pip install https://github.com/Dao-AILab/flash-attention/releases/download/v2.8.3/flash_attn-2.8.3+cu12torch2.7cxx11abiTRUE-cp312-cp312-linux_x86_64.whl
conda run --no-capture-output -n pipeline-rl pip install https://github.com/Dao-AILab/flash-attention/releases/download/v2.7.4.post1/flash_attn-2.7.4.post1+cu12torch2.6cxx11abiTRUE-cp311-cp311-linux_x86_64.whl
```

By default Pipeline-RL will use the file system as the medium for streaming the generated data to the trainer processes. This works on one node, but the files can get quite large. To use Redis instead you will need to install the Redis server in the same conda environment:
Expand Down
5 changes: 3 additions & 2 deletions conf/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ preprocess:
chunk_n_groups: 2
# queue for loaded raw groups
raw_queue_size: 8
# queue for processed chunks of multiple groups
# queue for processed chunks of multiple groups
input_queue_size: 32
# queue for ready chunks for multiple groups
output_queue_size: 32
Expand All @@ -40,9 +40,10 @@ preprocess:
ring_buffer_size: 128
# "virtual" sample queue per lead trainer
max_ready_samples_per_lead: 64
pop_old_data: ${..pop_old_data}
pop_old_data: ${..pop_old_data}
shared_memory_entry_size: 100000000
log_every_n_samples: 128

llm:
parameters:
# changed
Expand Down
25 changes: 17 additions & 8 deletions pipelinerl/finetune_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ def send_weight_update(
with deepspeed.zero.GatheredParameters([parameter]):
if get_accelerator().is_main_process:
# Use PyNcclCommunicator's broadcast method as torch.distributed does not work since vLLM disabled that transfer path
self.actor_update_group.broadcast(parameter.data, src=0, stream=torch.cuda.current_stream())
# REVERTED: self.actor_update_group.broadcast(parameter.data, src=0, stream=torch.cuda.current_stream())
dist.broadcast(parameter.data, src=0, group=self.actor_update_group)
if get_accelerator().is_main_process:
logger.info("Wait for HTTP requests")
for future in futures: # type: ignore
Expand Down Expand Up @@ -257,7 +258,9 @@ def send_weight_update(
logger.info(f"Published weight update request for version {version}")
for _, parameter in named_parameters.items():
# Use PyNcclCommunicator's broadcast method as torch.distributed does not work since vLLM disabled that transfer path
self.actor_update_group.broadcast(parameter.data, src=0, stream=torch.cuda.current_stream())
# REVERTED: self.actor_update_group.broadcast(parameter.data, src=0, stream=torch.cuda.current_stream())
dist.broadcast(parameter.data, src=0, group=self.actor_update_group)
dist.barrier(self.actor_update_group)
for future in futures:
future.result()
logger.info("Finished broadcasting weights")
Expand Down Expand Up @@ -410,15 +413,19 @@ def run_finetuning_loop(
get_accelerator().wait_for_everyone()

if get_accelerator().is_main_process and args.send_weight_updates:
current_device = get_accelerator().device
torch.cuda.set_device(current_device)
logger.info("Initializing actor process group using StatelessProcessGroup")
logger.info(f"Set CUDA device to {current_device} for actor process group (rank 0)")
actor_update_group = torch_utils.stateless_init_process_group(
# current_device = get_accelerator().device
# torch.cuda.set_device(current_device)
# logger.info("Initializing actor process group using StatelessProcessGroup")
# logger.info(f"Set CUDA device to {current_device} for actor process group (rank 0)")
# actor_update_group = torch_utils.stateless_init_process_group(
logger.info("Initializing actor process group")
actor_update_group = torch_utils.init_extra_process_group(
group_name="actor",
backend="nccl",
init_method=cfg.me.weight_update_group_init_method,
rank=0,
world_size=cfg.me.weight_update_group_world_size,
device=current_device,
# device=current_device,
)
logger.info("Actor process group initialized")
else:
Expand Down Expand Up @@ -498,6 +505,8 @@ def run_finetuning_loop(
if weight_update_manager is not None:
weight_update_manager.shutdown()
# PyNcclCommunicator doesn't need explicit destroy like torch.distributed process groups
if actor_update_group:
dist.destroy_process_group(actor_update_group)


def rl_finetuning_worker(
Expand Down
10 changes: 7 additions & 3 deletions pipelinerl/vllm0.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,14 @@ def init_actor_update_group(
+ f"Weight update group init method: {weight_update_group_init_method}, world size: {weight_update_group_world_size}"
)
# Use StatelessProcessGroup + PyNcclCommunicator for cross-process NCCL communication
self.process_group = torch_utils.stateless_init_process_group(
# self.process_group = torch_utils.stateless_init_process_group(
self.process_group = pipelinerl.torch_utils.init_extra_process_group(
group_name="actor",
backend="nccl",
init_method=weight_update_group_init_method,
rank=self.pg_rank,
world_size=weight_update_group_world_size,
device=self.device,
# device=self.device,
)
logger.info(prefix + "Actor update process group initialized")

Expand All @@ -104,7 +107,8 @@ def receive_weight_update(self, request: WeightUpdateRequest):
if target_dtype not in expected_dtypes:
logger.warning(f"Unexpected dtype for {info.name}: {info.dtype}")
buffer = torch.empty(tuple(info.shape), dtype=target_dtype, device=self.device)
self.process_group.broadcast(buffer, src=0, stream=torch.cuda.current_stream())
# self.process_group.broadcast(buffer, src=0, stream=torch.cuda.current_stream())
torch.distributed.broadcast(buffer, src=0, group=self.process_group)
if isinstance(self.model_runner, MultiStepModelRunner):
loaded_params = self.model_runner._base_model_runner.model.load_weights(
weights=[(info.name, buffer)]
Expand Down
10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ requires = [
"wheel",
"numpy>=1.26.0",
"packaging>=23.0",
"torch==2.7.1",
"torch~=2.6.0",
]
build-backend = "setuptools.build_meta"

Expand All @@ -13,19 +13,19 @@ name = "pipelinerl"
version = "0.1.0"
description = "A scalable asynchronous reinforcement learning implementation with in-flight weight updates."
readme = "README.md"
requires-python = ">=3.12"
requires-python = ">=3.11"
license = { file = "LICENSE" }
authors = [
{ name = "ServiceNow" },
]
dependencies = [
"aiohttp>=3.9.0",
"vllm==0.10.0",
"vllm==0.8.5.post1",
"accelerate==1.12.0",
"deepspeed~=0.18.0",
"deepspeed==0.15.4",
"browsergym>=0.13.0",
"datasets>=2.21.0",
"transformers~=4.57.0" ,
"transformers~=4.52.0" ,
"fastapi>=0.115.0",
"joblib>=1.3.2",
"jsonref>=1.1.0",
Expand Down