Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ services:
- CHECKPOINT_ROOT_DIR=/mnt/checkpoints
- DATA_ROOT_DIR=/mnt/data
- MODEL_PATH=/mnt/models/Qwen3-0.6B
- API_MODEL_PATH=/mnt/models/Qwen3-1.7B
- CHECKPOINT_PATH=/mnt/checkpoints
working_dir: /workspace
networks:
Expand Down
2 changes: 1 addition & 1 deletion docs/sphinx_doc/source/tutorial/example_async_mode.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ cluster:
gpu_per_node: 4
buffer:
total_epochs: 1
batch_size: 64
train_batch_size: 512
explorer_input:
taskset:
name: gsm8k
Expand Down
4 changes: 2 additions & 2 deletions docs/sphinx_doc/source/tutorial/example_dpo.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ cluster:
gpu_per_node: 8
buffer:
total_epochs: 2
batch_size: 64
train_batch_size: 64
trainer_input:
experience_buffer:
name: human_like_dpo
Expand Down Expand Up @@ -95,7 +95,7 @@ cluster:
gpu_per_node: 2
buffer:
total_epochs: 5
batch_size: 64
train_batch_size: 64
trainer_input:
experience_buffer:
name: <sft_dataset_name>
Expand Down
41 changes: 19 additions & 22 deletions docs/sphinx_doc/source/tutorial/example_mix_algo.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ class MixSampleStrategy(SampleStrategy):
def __init__(self, buffer_config: BufferConfig, **kwargs):
super().__init__(buffer_config)
self.expert_data_ratio = kwargs.get("expert_data_ratio", 0.5)
tot_batch_size = buffer_config.read_batch_size
tot_batch_size = buffer_config.train_batch_size
expert_batch_size = ceil(self.expert_data_ratio * tot_batch_size)

# experience buffer
usual_buffer_config = copy.deepcopy(buffer_config)
usual_buffer_config.read_batch_size = tot_batch_size - expert_batch_size
usual_buffer_config.train_batch_size = tot_batch_size - expert_batch_size
self.usual_exp_buffer = get_buffer_reader(
buffer_config.trainer_input.experience_buffer, usual_buffer_config # type: ignore
)
Expand All @@ -97,7 +97,7 @@ class MixSampleStrategy(SampleStrategy):

# expert experience buffer
expert_buffer_config = copy.deepcopy(buffer_config)
expert_buffer_config.read_batch_size = expert_batch_size
expert_buffer_config.train_batch_size = expert_batch_size
self.expert_exp_buffer = get_buffer_reader(
buffer_config.trainer_input.sft_warmup_dataset, expert_buffer_config
)
Expand Down Expand Up @@ -157,23 +157,20 @@ class MIXPolicyLossFn(PolicyLossFn):
clip_range_low: Optional[float] = None,
clip_range_high: Optional[float] = None,
use_dynamic_bsz: Optional[bool] = None,
repeat_times: Optional[int] = None,
ppo_mini_batch_size: Optional[int] = None,
ppo_micro_batch_size_per_gpu: Optional[int] = None,
ngpus_trainer: Optional[int] = None,
read_batch_size_usual: Optional[int] = None,
read_batch_size_expert: Optional[int] = None,
ppo_mini_batch_size: int = 1,
ppo_micro_batch_size_per_gpu: int = 1,
ngpus_trainer: int = 1,
train_batch_size_usual: int = 1,
train_batch_size_expert: int = 1,
use_token_level_loss_in_sft: bool = True,
) -> None:
super().__init__(backend=backend)
self.mu = mu
self.use_dynamic_bsz = use_dynamic_bsz
self.experience_per_gpu = ppo_mini_batch_size * repeat_times // ngpus_trainer # type: ignore
self.gradient_accumulation = (
ppo_mini_batch_size * repeat_times // ppo_micro_batch_size_per_gpu # type: ignore
)
self.read_batch_size_usual = read_batch_size_usual
self.read_batch_size_expert = read_batch_size_expert
self.experience_per_gpu = ppo_mini_batch_size // ngpus_trainer
self.gradient_accumulation = ppo_mini_batch_size // ppo_micro_batch_size_per_gpu
self.train_batch_size_usual = train_batch_size_usual
self.train_batch_size_expert = train_batch_size_expert
self.grpo_loss_fn = PPOPolicyLossFn(
clip_range=clip_range,
clip_range_low=clip_range_low,
Expand All @@ -199,14 +196,14 @@ class MIXPolicyLossFn(PolicyLossFn):

if self.use_dynamic_bsz:
per_micro_batch_weight_usual = self.experience_per_gpu / (
logprob.shape[0] * self.read_batch_size_usual
logprob.shape[0] * self.train_batch_size_usual
)
per_micro_batch_weight_expert = self.experience_per_gpu / (
logprob.shape[0] * self.read_batch_size_expert
logprob.shape[0] * self.train_batch_size_expert
)
else:
per_micro_batch_weight_usual = self.gradient_accumulation / self.read_batch_size_usual # type: ignore
per_micro_batch_weight_expert = self.gradient_accumulation / self.read_batch_size_expert # type: ignore
per_micro_batch_weight_usual = self.gradient_accumulation / self.train_batch_size_usual # type: ignore
per_micro_batch_weight_expert = self.gradient_accumulation / self.train_batch_size_expert # type: ignore

if n_usual_exp > 0:
grpo_loss, grpo_metrics = self.grpo_loss_fn(
Expand Down Expand Up @@ -272,11 +269,11 @@ algorithm:
use_token_level_loss_in_sft: False
use_dynamic_bsz: False
repeat_times: 8
ppo_mini_batch_size: 32
ppo_mini_batch_size: 256
ppo_micro_batch_size_per_gpu: 4
ngpus_trainer: 4
read_batch_size_expert: 64
read_batch_size_usual: 192
train_batch_size_expert: 64
train_batch_size_usual: 192
```

With the above configurations, the experiment can be run with the following command:
Expand Down
4 changes: 2 additions & 2 deletions docs/sphinx_doc/source/tutorial/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ For users' convenience, future versions will gradually reduce parameters in `tra
**A:** The following parameters are closely related:

- `buffer.batch_size`: The number of tasks in a batch, effective for both the explorer and the trainer.
- `actor_rollout_ref.actor.ppo_mini_batch_size`: In the configuration, this value represents the number of tasks in a mini-batch, overridden by `buffer.batch_size`; but in the `update_policy` function, its value becomes the number of experiences in a mini-batch per GPU, i.e., `buffer.batch_size * algorithm.repeat_times (/ ngpus_trainer)`. The expression of dividing `ngpus_trainer` is caused by implict data allocation to GPUs, but this do not affects the result after gradient accumulation.
- `actor_rollout_ref.actor.ppo_mini_batch_size`: The number of experiences in a mini-batch, overridden by `buffer.train_batch_size`; but in the `update_policy` function, its value becomes the number of experiences in a mini-batch per GPU, i.e., `buffer.train_batch_size (/ ngpus_trainer)`. The expression of dividing `ngpus_trainer` is caused by implict data allocation to GPUs, but this do not affects the result after gradient accumulation.
- `actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu`: The number of experiences in a micro-batch per GPU.

A minimal example showing their usage is as follows:

```python
def update_policy(batch_exps):
dataloader = batch_epxs.split(ppo_mini_batch_size) # here `ppo_mini_batch_size` is in terms of experiences
dataloader = batch_exps.split(ppo_mini_batch_size)
for _ in range(ppo_epochs):
for batch_idx, data in enumerate(dataloader):
# Split data
Expand Down
4 changes: 2 additions & 2 deletions docs/sphinx_doc/source/tutorial/trinity_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Configures the data buffers used by the explorer and trainer.
```yaml
buffer:
batch_size: 32
train_batch_size: 256
total_epochs: 100

explorer_input:
Expand All @@ -184,6 +185,7 @@ buffer:
```

- `batch_size`: Number of tasks used per training step. *Please do not multiply this value by the `algorithm.repeat_times` manually*.
- `train_batch_size`: Number of experiences used per training step. Defaults to `batch_size` * `algorithm.repeat_times`.
- `total_epochs`: Total number of training epochs.
- `total_steps`: Optional. The total number of training steps. If specified, `total_epochs` will be ignored.

Expand Down Expand Up @@ -440,7 +442,6 @@ actor_rollout_ref:
impl_backend: None
actor:
strategy: fsdp # This is for backward-compatibility
ppo_mini_batch_size: 128
# ppo_micro_batch_size: 8 # will be deprecated, use ppo_micro_batch_size_per_gpu
ppo_micro_batch_size_per_gpu: 4
use_dynamic_bsz: True
Expand Down Expand Up @@ -505,7 +506,6 @@ critic:
min_num_params: 0
fsdp_size: -1
forward_prefetch: False
ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size}
ppo_micro_batch_size_per_gpu: 8
forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu}
use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz}
Expand Down
2 changes: 1 addition & 1 deletion examples/async_gsm8k/trainer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ cluster:
gpu_per_node: 4
buffer:
total_epochs: 1
batch_size: 96
train_batch_size: 768
max_retry_times: 3
max_retry_interval: 1
explorer_input:
Expand Down
2 changes: 1 addition & 1 deletion examples/dpo_humanlike/dpo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cluster:
gpu_per_node: 8
buffer:
total_epochs: 2
batch_size: 32
train_batch_size: 64
max_retry_times: 3
max_retry_interval: 1
trainer_input:
Expand Down
6 changes: 3 additions & 3 deletions examples/mix_math/mix_math.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ algorithm:
use_token_level_loss_in_sft: False
use_dynamic_bsz: False
repeat_times: 8
ppo_mini_batch_size: 32
ppo_mini_batch_size: 256
ppo_micro_batch_size_per_gpu: 4
ngpus_trainer: 4
read_batch_size_expert: 64
read_batch_size_usual: 192
train_batch_size_expert: 64
train_batch_size_usual: 192
model:
model_path: /PATH/TO/MODEL/
max_response_tokens: 10240
Expand Down
2 changes: 1 addition & 1 deletion examples/sft_mot/sft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ cluster:
gpu_per_node: 8
buffer:
total_epochs: 1
batch_size: 32
train_batch_size: 64
max_retry_times: 3
max_retry_interval: 1
trainer_input:
Expand Down
14 changes: 7 additions & 7 deletions tests/buffer/queue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ async def test_queue_buffer(self, name, use_priority_queue):
exp.info = {"model_version": 0, "use_count": 0}
for _ in range(self.total_num // self.put_batch_size):
await writer.write_async(exps)
for _ in range(self.total_num // self.read_batch_size):
for _ in range(self.total_num // self.train_batch_size):
exps = reader.read()
self.assertEqual(len(exps), self.read_batch_size)
print(f"finish read {self.read_batch_size} experience")
self.assertEqual(len(exps), self.train_batch_size)
print(f"finish read {self.train_batch_size} experience")
exps = [
Experience(
tokens=torch.tensor([float(j) for j in range(i + 1)]),
Expand Down Expand Up @@ -94,13 +94,13 @@ def thread_read(reader, result_queue):

async def test_priority_queue_capacity(self):
# test queue capacity
self.config.read_batch_size = 4
self.config.train_batch_size = 4
meta = StorageConfig(
name="test_buffer_small",
algorithm_type="ppo",
storage_type=StorageType.QUEUE,
max_read_timeout=1,
capacity=100, # priority will use 2 * read_batch_size as capacity (8)
capacity=100, # priority will use 2 * train_batch_size as capacity (8)
path=BUFFER_FILE_PATH,
use_priority_queue=True,
replay_buffer_kwargs={"priority_fn": "linear_decay", "decay": 0.6},
Expand Down Expand Up @@ -303,12 +303,12 @@ def replace_call():
def setUp(self):
self.total_num = 8
self.put_batch_size = 2
self.read_batch_size = 4
self.train_batch_size = 4

self.config = BufferConfig(
max_retry_times=3,
max_retry_interval=1,
read_batch_size=self.read_batch_size,
train_batch_size=self.train_batch_size,
)
if os.path.exists(BUFFER_FILE_PATH):
os.remove(BUFFER_FILE_PATH)
2 changes: 1 addition & 1 deletion tests/buffer/sql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async def test_create_sql_buffer(self) -> None:
config = BufferConfig(
max_retry_times=3,
max_retry_interval=1,
read_batch_size=read_batch_size,
train_batch_size=read_batch_size,
)
sql_writer = SQLWriter(meta, config)
sql_reader = SQLReader(meta, config)
Expand Down
17 changes: 14 additions & 3 deletions tests/common/vllm_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ def get_model_path() -> str:
return path


def get_api_model_path() -> str:
path = os.environ.get("API_MODEL_PATH")
if not path:
raise EnvironmentError(
"Please set `export API_MODEL_PATH=<your_api_model_checkpoint_dir>` before running this test."
)
return path


DEBUG = False


Expand Down Expand Up @@ -322,7 +331,7 @@ class TestAPIServerToolCall(RayUnittestBase):
def setUp(self):
self.config = get_template_config()
self.config.mode = "explore"
self.config.model.model_path = get_model_path()
self.config.model.model_path = get_api_model_path()
self.config.explorer.rollout_model.engine_type = "vllm_async"
self.config.explorer.rollout_model.engine_num = 1
self.config.explorer.rollout_model.tensor_parallel_size = 1
Expand All @@ -345,11 +354,13 @@ def setUp(self):
)

def test_api_tool_calls(self):
"""Tests the full conversation flow of a tool call via the OpenAI API."""
"""Tests the full conversation flow of a tool call via the OpenAI API.
Note: This test require a model that supports tool calls and thinking mode, e.g. Qwen3-1.7B.
"""
import json
import time

tokenizer = AutoTokenizer.from_pretrained(get_model_path())
tokenizer = AutoTokenizer.from_pretrained(get_api_model_path())
print_debug("\n\n" + "=" * 30 + " Running test_api_tool_calls " + "=" * 30)
start_time = time.time()

Expand Down
14 changes: 8 additions & 6 deletions tests/explorer/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def setUp(self):
self.config.explorer.max_retry_times = 1
self.config.explorer.max_timeout = 5
self.config.explorer.runner_per_model = 2
self.config.buffer.read_batch_size = 2
self.config.buffer.train_batch_size = 2
self.config.buffer.pad_token_id = 0
self.config.buffer.explorer_output = (
self.config.buffer.trainer_input.experience_buffer
Expand Down Expand Up @@ -568,11 +568,13 @@ async def test_non_repeatable_workflow(self):
)

async def test_stepwise_experience_eid(self):
task_num, repeat_times, step_num = 2, 4, 3
self.config.buffer.batch_size = task_num
self.config.buffer.train_batch_size = task_num * repeat_times * step_num
self.config.explorer.max_repeat_times_per_runner = 2
self.config.check_and_update()
scheduler = Scheduler(self.config, [DummyModel.remote(), DummyModel.remote()])
await scheduler.start()
task_num, repeat_times, step_num = 2, 4, 3
batch_num = 2

# repeatable stepwise workflow
Expand All @@ -584,8 +586,8 @@ async def test_stepwise_experience_eid(self):
scheduler.schedule(tasks, batch_id=i)
statuses, _ = await scheduler.get_results(batch_id=i)
self.assertEqual(len(statuses), task_num * repeat_times / 2)
exps = self.queue.read(batch_size=task_num * repeat_times * step_num)
self.assertEqual(len(exps), task_num * repeat_times * step_num)
exps = self.queue.read(batch_size=self.config.buffer.train_batch_size)
self.assertEqual(len(exps), self.config.buffer.train_batch_size)
exp_list.extend(exps)

# test task_id, run_id and unique_id
Expand All @@ -605,8 +607,8 @@ async def test_stepwise_experience_eid(self):
scheduler.schedule(tasks, batch_id=i)
statuses, _ = await scheduler.get_results(batch_id=i)
self.assertEqual(len(statuses), task_num * repeat_times / 2)
exps = self.queue.read(batch_size=task_num * repeat_times * step_num)
self.assertEqual(len(exps), task_num * repeat_times * step_num)
exps = self.queue.read(batch_size=self.config.buffer.train_batch_size)
self.assertEqual(len(exps), self.config.buffer.train_batch_size)
exp_list.extend(exps)

# test task_id, run_id and unique_id
Expand Down
2 changes: 2 additions & 0 deletions tests/manager/synchronizer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def test_synchronizer(self):
config.monitor.monitor_type = "tensorboard"
trainer_config = deepcopy(config)
trainer_config.mode = "train"
trainer_config.buffer.train_batch_size = 4
trainer_config.check_and_update()

explorer1_config = deepcopy(config)
Expand Down Expand Up @@ -253,6 +254,7 @@ def test_synchronizer(self):
config.monitor.monitor_type = "tensorboard"
trainer_config = deepcopy(config)
trainer_config.mode = "train"
trainer_config.buffer.train_batch_size = 4
trainer_config.check_and_update()

explorer1_config = deepcopy(config)
Expand Down
1 change: 0 additions & 1 deletion tests/template/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ algorithm:
lam: 1.0
kl_penalty_fn: k3
kl_loss_fn: k2

model:
model_path: ''
max_response_tokens: 2048
Expand Down
2 changes: 0 additions & 2 deletions tests/template/verl_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ actor_rollout_ref:
use_remove_padding: True
actor:
strategy: fsdp # This is for backward-compatibility
ppo_mini_batch_size: 4
ppo_micro_batch_size_per_gpu: 1
use_dynamic_bsz: True
ppo_max_token_len_per_gpu: 16384
Expand Down Expand Up @@ -70,7 +69,6 @@ critic:
min_num_params: 0
fsdp_size: -1
forward_prefetch: False
ppo_mini_batch_size: ${actor_rollout_ref.actor.ppo_mini_batch_size}
ppo_micro_batch_size_per_gpu: 1
forward_micro_batch_size_per_gpu: ${critic.ppo_micro_batch_size_per_gpu}
use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz}
Expand Down
Loading