Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/unittest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ jobs:
fi
fi

- name: Clean checkpoint dir
working-directory: trinity-${{ github.run_id }}/.github/workflows/docker
if: always()
run: |
docker compose exec trinity-node-1 rm -rf /mnt/checkpoints/*
continue-on-error: true

- name: Upload test results
if: env.tests_run == 'true' || failure()
uses: actions/upload-artifact@v4
Expand Down
7 changes: 4 additions & 3 deletions benchmark/config/countdown-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ buffer:
experience_buffer:
name: experience_buffer
storage_type: queue
use_priority_queue: true
replay_buffer_kwargs:
replay_buffer:
enable: true
priority_fn: linear_decay
decay: 0.1
priority_fn_args:
decay: 0.1
explorer:
runner_per_model: 8
max_timeout: 900
Expand Down
7 changes: 4 additions & 3 deletions benchmark/config/gsm8k-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ buffer:
experience_buffer:
name: experience_buffer
storage_type: queue
use_priority_queue: true
replay_buffer_kwargs:
replay_buffer:
enable: true
priority_fn: linear_decay
decay: 0.1
priority_fn_args:
decay: 0.1
explorer:
runner_per_model: 8
max_timeout: 900
Expand Down
8 changes: 4 additions & 4 deletions docs/sphinx_doc/source/tutorial/develop_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
In Trinity-RFT, the operator module is responsible for processing experience data in the buffer module. It supports existing data processing capabilities from [Data-Juicer](https://github.com/modelscope/data-juicer) naturally, and allows developers to implement their own operators as well.
By customizing operators, developers can implement various data processing functionalities, such as data augmentation, filtering, and transformation. You can even implement advantages/returns calculation as operators, as shown in {ref}`Algorithms <Algorithms>` section.

- **DataJuicerOperator** ({class}`trinity.data.operators.DataJuicerOperator`): The operator that wraps the data processing operators from Data-Juicer. It provides a simple interface for developers to list the Data-Juicer operators they want to use. The full list of Data-Juicer operators can be found [here](https://modelscope.github.io/data-juicer/en/main/docs/Operators.html).
- **ExperienceOperator** ({class}`trinity.data.operators.ExperienceOperator`): The base class for all operators used in experience data processing. It defines the interface and common functionalities that all operators should have. Each operator processes a batch of experience data and returns the processed data with metrics for logging.
- **ExperiencePipeline** ({class}`trinity.data.pipelines.ExperiencePipeline`): The experience data processing pipeline that manages a sequence of operators. It takes raw experiences from the `Explorer`, passes them through each operator in the pipeline, and writes the final processed experiences into the input buffer of the `Trainer`.
- **DataJuicerOperator** ({class}`trinity.buffer.operators.DataJuicerOperator`): The operator that wraps the data processing operators from Data-Juicer. It provides a simple interface for developers to list the Data-Juicer operators they want to use. The full list of Data-Juicer operators can be found [here](https://modelscope.github.io/data-juicer/en/main/docs/Operators.html).
- **ExperienceOperator** ({class}`trinity.buffer.operators.ExperienceOperator`): The base class for all operators used in experience data processing. It defines the interface and common functionalities that all operators should have. Each operator processes a batch of experience data and returns the processed data with metrics for logging.
- **ExperiencePipeline** ({class}`trinity.buffer.pipelines.ExperiencePipeline`): The experience data processing pipeline that manages a sequence of operators. It takes raw experiences from the `Explorer`, passes them through each operator in the pipeline, and writes the final processed experiences into the input buffer of the `Trainer`.

```{note}
Except for `ExperiencePipeline`, Trinity-RFT also provides `TaskPipeline` for task data processing.
Expand Down Expand Up @@ -56,7 +56,7 @@ class RewardFilter(ExperienceOperator):
return filtered_exps, metrics
```

After implementation, you need to register this module through {class}`trinity.data.operators.EXPERIENCE_OPERATORS`. Once registered, the module can be configured in the configuration file using the registered name.
After implementation, you need to register this module through {class}`trinity.buffer.operators.EXPERIENCE_OPERATORS`. Once registered, the module can be configured in the configuration file using the registered name.

### Step 2: Use Your Operator

Expand Down
1 change: 0 additions & 1 deletion docs/sphinx_doc/source/tutorial/develop_selector.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# 🧪 Experimental: Task Selection & Scheduling System

```{note}
Expand Down
15 changes: 7 additions & 8 deletions docs/sphinx_doc/source/tutorial/example_mix_algo.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,21 @@ class MixSampleStrategy(SampleStrategy):
expert_batch_size = ceil(self.expert_data_ratio * tot_batch_size)

# experience buffer
usual_buffer_config = copy.deepcopy(buffer_config)
usual_buffer_config.train_batch_size = tot_batch_size - expert_batch_size
self.usual_exp_buffer = get_buffer_reader(
buffer_config.trainer_input.experience_buffer, usual_buffer_config # type: ignore
)
usual_buffer_config = copy.deepcopy(buffer_config.trainer_input.experience_buffer)
usual_buffer_config.batch_size = tot_batch_size - expert_batch_size
self.usual_exp_buffer = get_buffer_reader(usual_buffer_config)

if buffer_config.trainer_input.auxiliary_buffers is None:
raise ValueError(
"`buffer_config.trainer_input.auxiliary_buffers` is required in MIX algorithm"
)

# expert experience buffer
expert_buffer_config = copy.deepcopy(buffer_config)
expert_buffer_config.train_batch_size = expert_batch_size
expert_buffer_config = copy.deepcopy(
buffer_config.trainer_input.auxiliary_buffers[self.sft_dataset_name]
)
expert_buffer_config.batch_size = expert_batch_size
self.expert_exp_buffer = get_buffer_reader(
buffer_config.trainer_input.auxiliary_buffers[self.sft_dataset_name],
expert_buffer_config,
)

Expand Down
8 changes: 6 additions & 2 deletions docs/sphinx_doc/source/tutorial/example_step_wise.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ In general multi-step scenarios, each run may generate various number of experie

- `buffer.train_batch_size`: The number of experiences to be sampled from the buffer for training, which can be different from the number of generated experiences in each explore step.

- `buffer.trainer_input.use_priority_queue = true`: Using `PriorityQueue` allows the model to use the experiences with higher priority, which prefers newly-generated experiences by default.
- `buffer.trainer_input.experience_buffer.replay_buffer`: Using `PriorityQueue` allows the model to use the experiences with higher priority, which prefers newly-generated experiences by default.

- `synchronizer.sync_style = dynamic_by_explorer`: The explorer determines when to synchronize the model weights with the trainer.

Expand Down Expand Up @@ -126,7 +126,11 @@ buffer:
experience_buffer:
name: alfworld_buffer
storage_type: queue
use_priority_queue: true
replay_buffer:
enable: true
priority_fn: linear_decay
priority_fn_args:
decay: 0.1
explorer:
max_repeat_times_per_runner: 1
runner_per_model: 32
Expand Down
15 changes: 8 additions & 7 deletions docs/sphinx_doc/source/tutorial/trinity_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,12 @@ The configuration for each task dataset is defined as follows:
- `name`: Name of the dataset. This name will be used as the Ray actor's name, so it must be unique.
- `storage_type`: How the dataset is stored. Options: `file`, `queue`, `sql`.
- `file`: The dataset is stored in `jsonl`/`parquet` files. The data file organization is required to meet the huggingface standard. *We recommand using this storage type for most cases.*
- `queue`: The dataset is stored in a queue. The queue is a simple FIFO queue that stores the task dataset. *Do not use this storage type for task dataset unless you know what you are doing.*
- `sql`: The dataset is stored in a SQL database. *This type is unstable and will be optimized in the future versions.*
- `path`: The path to the task dataset.
- For `file` storage type, the path points to the directory that contains the task dataset files.
- For `queue` storage type, the path is optional. You can back up the data in the queue by specifying a sqlite database path here.
- For `sql` storage type, the path points to the sqlite database file.
- `subset_name`: The subset name of the task dataset. Default is `None`.
- `split`: The split of the task dataset. Default is `train`.
- `subset_name`: The subset name of the task dataset, corresponding to the `name` parameter in huggingface datasets `load_dataset` function. Default is `None`.
- `split`: The split of the task dataset, corresponding to the `split` parameter in huggingface datasets `load_dataset` function. Default is `train`.
- `repeat_times`: The number of rollouts generated for a task. If not set, it will be automatically set to `algorithm.repeat_times` for `taskset`, and `1` for `eval_tasksets`.
- `rollout_args`: The parameters for rollout.
- `temperature`: The temperature for sampling.
Expand Down Expand Up @@ -324,7 +322,7 @@ buffer:
- For `queue` storage type, this field is optional. You can specify a SQLite database or JSON file path here to back up the queue data.
- For `file` storage type, the path points to the directory containing the dataset files.
- For `sql` storage type, the path points to the SQLite database file.
- `format`: Defines keys for prompts and responses in the dataset.
- `format`: Mainly for SFT and DPO algorithm datasets, used to format the extracted data.
- `prompt_type`: Specifies the type of prompts in the dataset. We support `plaintext`, `messages` for now.
- `plaintext`: The prompt is in string format.
- `messages`: The prompt is organized as a message list.
Expand All @@ -339,8 +337,11 @@ buffer:
- `enable_concatenated_multi_turn`: Enable concatenated multi-turn SFT data preprocess. Only for `messages` and only take effect with SFT algorithm.
- `chat_template`: Specifies the chat template in string format. If not provided, use `model.custom_chat_template`.
- `max_read_timeout`: The maximum waiting time (in seconds) to read new experience data. If exceeded, an incomplete batch will be returned directly. Only take effect when `storage_type` is `queue`. Default is 1800 seconds (30 minutes).
- `use_priority_queue`: Only take effect when `storage_type` is `queue`. If set to `True`, the queue will be a priority queue, which allows for prioritizing certain experiences over others. Default is `False`.
- `reuse_cooldown_time`: Only take effect when `storage_type` is `queue` and `use_priority_queue` is `True`. If set, it specifies the cooldown time (in seconds) for reusing experiences. If not specified, the default value is `None`, meaning experiences can not be reused.
- `replay_buffer`: Only take effect when `storage_type` is `queue`. Used to configure the replay buffer for experience reuse.
- `enable`: Whether to enable the replay buffer. Default is `false`.
- `reuse_cooldown_time`: Cooldown time (in seconds) for reusing experiences. If not specified, the default value is `None`, meaning experiences can not be reused.
- `priority_fn`: Experience priority function used to determine the order of experience reuse. Currently supports `linear_decay` and `linear_decay_use_count_control_randomization`.
- `priority_fn_args`: A dictionary of arguments passed to the priority function, specific parameters depend on the selected priority function.
- `auxiliary_buffers`: Optional buffers used for trainer. It is a dictionary where each key is the buffer name and the value is the buffer configuration. Each buffer configuration is similar to the `experience_buffer`.

---
Expand Down
8 changes: 4 additions & 4 deletions docs/sphinx_doc/source_zh/tutorial/develop_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
Operator 模块负责处理由 Explorer 所生成的轨迹数据(我们称之为 `Experience`)。它原生支持来自 [Data-Juicer](https://github.com/modelscope/data-juicer) 的数据处理功能,也允许开发者实现自己的算子。
通过自定义数据处理算子,开发者可以实现各种数据处理功能,如数据增强、过滤和转换。你甚至可以将优势值/回报值计算实现为 Operator,如 {ref}`算法 <Algorithms>` 部分所示。

- **DataJuicerOperator** ({class}`trinity.data.operators.DataJuicerOperator`):封装后的 Data-Juicer 算子,使用时只需在配置文件中标明想要使用的 Data-Juicer 算子列表即可。完整的 Data-Juicer 算子列表请见 [此处](https://modelscope.github.io/data-juicer/en/main/docs/Operators.html)。
- **ExperienceOperator** ({class}`trinity.data.operators.ExperienceOperator`):用于 experience 数据处理的所有数据处理算子的基类。定义了所有数据处理算子应具备的接口和通用功能。每个算子处理一批 experience 数据,并返回处理后的数据及用于日志记录的指标。
- **ExperiencePipeline** ({class}`trinity.data.pipelines.ExperiencePipeline`):管理一系列数据处理算子的 experience 数据处理流水线。它从 `Explorer` 获取原始 experience,通过流水线中的每个算子处理,最后将最终处理过的 experience 写入 `Trainer` 的输入缓冲区。
- **DataJuicerOperator** ({class}`trinity.buffer.operators.DataJuicerOperator`):封装后的 Data-Juicer 算子,使用时只需在配置文件中标明想要使用的 Data-Juicer 算子列表即可。完整的 Data-Juicer 算子列表请见 [此处](https://modelscope.github.io/data-juicer/en/main/docs/Operators.html)。
- **ExperienceOperator** ({class}`trinity.buffer.operators.ExperienceOperator`):用于 experience 数据处理的所有数据处理算子的基类。定义了所有数据处理算子应具备的接口和通用功能。每个算子处理一批 experience 数据,并返回处理后的数据及用于日志记录的指标。
- **ExperiencePipeline** ({class}`trinity.buffer.pipelines.ExperiencePipeline`):管理一系列数据处理算子的 experience 数据处理流水线。它从 `Explorer` 获取原始 experience,通过流水线中的每个算子处理,最后将最终处理过的 experience 写入 `Trainer` 的输入缓冲区。

```{note}
除了 `ExperiencePipeline`,Trinity-RFT 还提供 `TaskPipeline` 用于任务数据处理。
Expand Down Expand Up @@ -57,7 +57,7 @@ class RewardFilter(ExperienceOperator):
return filtered_exps, metrics
```

实现后,你需要通过 {class}`trinity.data.operators.EXPERIENCE_OPERATORS` 注册此模块。注册后,该模块可在配置文件中使用注册名称进行配置。
实现后,你需要通过 {class}`trinity.buffer.operators.EXPERIENCE_OPERATORS` 注册此模块。注册后,该模块可在配置文件中使用注册名称进行配置。

### 步骤 2:使用此算子

Expand Down
15 changes: 7 additions & 8 deletions docs/sphinx_doc/source_zh/tutorial/example_mix_algo.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,21 @@ class MixSampleStrategy(SampleStrategy):
expert_batch_size = ceil(self.expert_data_ratio * tot_batch_size)

# experience buffer
usual_buffer_config = copy.deepcopy(buffer_config)
usual_buffer_config.train_batch_size = tot_batch_size - expert_batch_size
self.usual_exp_buffer = get_buffer_reader(
buffer_config.trainer_input.experience_buffer, usual_buffer_config # type: ignore
)
usual_buffer_config = copy.deepcopy(buffer_config.trainer_input.experience_buffer)
usual_buffer_config.batch_size = tot_batch_size - expert_batch_size
self.usual_exp_buffer = get_buffer_reader(usual_buffer_config)

if buffer_config.trainer_input.auxiliary_buffers is None:
raise ValueError(
"`buffer_config.trainer_input.auxiliary_buffers` is required in MIX algorithm"
)

# expert experience buffer
expert_buffer_config = copy.deepcopy(buffer_config)
expert_buffer_config.train_batch_size = expert_batch_size
expert_buffer_config = copy.deepcopy(
buffer_config.trainer_input.auxiliary_buffers[self.sft_dataset_name]
)
expert_buffer_config.batch_size = expert_batch_size
self.expert_exp_buffer = get_buffer_reader(
buffer_config.trainer_input.auxiliary_buffers[self.sft_dataset_name],
expert_buffer_config,
)

Expand Down
8 changes: 5 additions & 3 deletions docs/sphinx_doc/source_zh/tutorial/example_step_wise.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class StepWiseAlfworldWorkflow(RewardPropagationWorkflow):

- `buffer.train_batch_size`:从 buffer 中采样用于训练的 experience 数量,可以与每次探索生成的 experience 数量不同。

- `buffer.trainer_input.use_priority_queue = true`:使用 `PriorityQueue` 可使模型优先使用高优先级的 experience (默认为使用更新产生的 experience)。
- `buffer.trainer_input.experience_buffer.replay_buffer`:使用 `PriorityQueue` 可使模型优先使用高优先级的 experience (默认为使用更新产生的 experience)。

- `synchronizer.sync_style = dynamic_by_explorer`:由 explorer 决定何时与 trainer 同步模型权重。

Expand Down Expand Up @@ -124,7 +124,8 @@ buffer:
experience_buffer:
name: alfworld_buffer
storage_type: queue
use_priority_queue: true
replay_buffer:
enable: true
explorer:
max_repeat_times_per_runner: 1
runner_per_model: 16
Expand Down Expand Up @@ -154,11 +155,12 @@ trainer:
ulysses_sequence_parallel_size: 1
```


下面,我们提供运行 ALFWorld 任务的命令。

## 示例:多步 ALFWorld

### 环境准备

要安装 ALFWorld 环境,可按照以下说明操作。

1. 使用 pip 安装:`pip install alfworld[full]`
Expand Down
Loading