Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ Trinity-RFT uses a unified config file to manage all config items. For the data
In this example, assume that you need to select the chosen and rejected responses for DPO method. So you can set these config items like the following example:

```yaml
# using task pipeline to decide the chosen and rejected from human preference
data_processor:
# task pipeline related
task_pipeline:
Expand Down Expand Up @@ -239,7 +240,7 @@ data_processor:
chosen_key: "chosen" # Chosen field
rejected_key: "rejected" # Rejected field
inputs: # the output will be set to the explorer input automatically
- /PATH/TO/DATA/FILE/TO/BE/ANNOTATED
- 'examples/dpo_human_in_the_loop/demo-data.jsonl'
target_fields: ["prompt"]
service:
data_juicer:
Expand All @@ -252,6 +253,8 @@ The difference is that we use the data-juicer OP `human_preference_annotation_ma

You can set more config items for this OP (e.g. notification when annotation is finished). For more details, please refer to this [doc](https://github.com/modelscope/data-juicer/tree/main/configs/annotation).

All config items in the `data_processor` section can be found [here](trinity_configs.md). A prepared config file for this example can be found in [the config file](https://github.com/modelscope/Trinity-RFT/tree/main/examples/dpo_human_in_the_loop/dpo.yaml).

### Start Running

When you start running with the RFT config, the data processor will start the OP `human_preference_annotation_mapper`, and then you can find a new project on the "Projects" page of the label-studio server.
Expand Down
7 changes: 7 additions & 0 deletions examples/dpo_human_in_the_loop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# DPO with Human in the Loop

This example shows the usage of DPO with human in the loop on a simple example dataset.

For more detailed information, please refer to the [documentation](../../docs/sphinx_doc/source/tutorial/example_data_functionalities.md#example-human-in-the-loop).

The config files are located in [`dpo.yaml`](dpo.yaml) and [`train_dpo.yaml`](train_dpo.yaml). The example dataset is located in [`demo_data.jsonl`](demo-data.jsonl).
10 changes: 10 additions & 0 deletions examples/dpo_human_in_the_loop/demo-data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"prompt": "What is the capital of France?", "answer1": "Paris", "answer2": "Lyon"}
{"prompt": "Which planet is known as the Red Planet?", "answer1": "Mars", "answer2": "Venus"}
{"prompt": "What is the chemical symbol for gold?", "answer1": "Au", "answer2": "Ag"}
{"prompt": "Who wrote 'Romeo and Juliet'?", "answer1": "William Shakespeare", "answer2": "Christopher Marlowe"}
{"prompt": "What is the largest mammal on Earth?", "answer1": "Blue Whale", "answer2": "African Elephant"}
{"prompt": "In which year did World War II end?", "answer1": "1945", "answer2": "1944"}
{"prompt": "What is the square root of 64?", "answer1": "8", "answer2": "6"}
{"prompt": "Who painted the Mona Lisa?", "answer1": "Leonardo da Vinci", "answer2": "Michelangelo"}
{"prompt": "What is the main component of the Sun?", "answer1": "Hydrogen", "answer2": "Helium"}
{"prompt": "Which programming language was created by Guido van Rossum?", "answer1": "Python", "answer2": "Java"}
73 changes: 73 additions & 0 deletions examples/dpo_human_in_the_loop/dpo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
project: "dpo_example"
name: "trinity_dpo"
mode: train

# using task pipeline to decide the chosen and rejected from human preference
data_processor:
# task pipeline related
task_pipeline:
num_process: 1
operators:
- name: "human_preference_annotation_mapper"
args:
# general annotation project settings
project_name_prefix: "Human_Preference_Annotation_Demo"
wait_for_annotations: true # Whether to wait for annotations to complete
timeout: 3600 # Maximum time to wait for annotations in seconds (1 hour)
poll_interval: 10 # Time between annotation status checks in seconds
max_tasks_per_batch: 10 # Maximum number of tasks in a single batch
notification_config:
enabled: false

# label studio connection settings
api_url: "http://localhost:7070" # Default Label Studio URL
api_key: "YOUR_API_KEY" # Your API key for label studuio authentication, which can be set when starting the label-studio service

# human preference annotation settings
prompt_key: "prompt" # Prompt field
answer1_key: "answer1" # First answer option
answer2_key: "answer2" # Second answer option
chosen_key: "chosen" # Chosen field
rejected_key: "rejected" # Rejected field
inputs: # the output will be set to the explorer input automatically
- 'examples/dpo_human_in_the_loop/demo-data.jsonl'
target_fields: ["prompt"]
service:
data_juicer:
auto_start: true

algorithm:
algorithm_type: dpo
kl_loss_fn: k1
kl_loss_fn_args:
kl_coef: 0.1
checkpoint_root_dir: /PATH/TO/CHECKPOINT/
model:
model_path: /PATH/TO/MODEL
max_response_tokens: 1024
max_model_len: 1536
cluster:
node_num: 1
gpu_per_node: 8
buffer:
total_epochs: 2
train_batch_size: 64
trainer_input:
experience_buffer:
name: dpo_buffer
storage_type: file
enable_progress_bar: True
path: ./outputs/human_annotation_output/ # the result data after human preference annotation are stored here
format:
prompt_type: plaintext # plaintext/messages
prompt_key: prompt
chosen_key: chosen
rejected_key: rejected
synchronizer:
sync_method: 'checkpoint'
sync_interval: 30
sync_timeout: 1200
trainer:
trainer_type: 'verl'
trainer_config_path: 'examples/dpo_human_in_the_loop/train_dpo.yaml'
save_interval: 30
51 changes: 51 additions & 0 deletions examples/dpo_human_in_the_loop/train_dpo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
actor_rollout_ref:
hybrid_engine: True
model:
external_lib: null
override_config: { }
enable_gradient_checkpointing: True
use_remove_padding: False
actor:
strategy: fsdp # This is for backward-compatibility
ppo_micro_batch_size_per_gpu: 2
use_dynamic_bsz: False
ppo_max_token_len_per_gpu: 16384
grad_clip: 1.0
ppo_epochs: 1
shuffle: False
ulysses_sequence_parallel_size: 1 # sp size
optim:
lr: 5e-7
lr_warmup_steps_ratio: 0.03 # the total steps will be injected during runtime
min_lr_ratio: 0.1 # only useful for warmup with cosine
warmup_style: cosine # select from constant/cosine
total_training_steps: 783
betas: [0.9, 0.95]
fsdp_config:
wrap_policy:
# transformer_layer_cls_to_wrap: None
min_num_params: 0
param_offload: False
optimizer_offload: False
fsdp_size: -1
ref:
fsdp_config:
param_offload: False
wrap_policy:
# transformer_layer_cls_to_wrap: None
min_num_params: 0
# log_prob_micro_batch_size: 4 # will be deprecated, use log_prob_micro_batch_size_per_gpu
log_prob_micro_batch_size_per_gpu: 2
log_prob_use_dynamic_bsz: ${actor_rollout_ref.actor.use_dynamic_bsz}
log_prob_max_token_len_per_gpu: ${actor_rollout_ref.actor.ppo_max_token_len_per_gpu}
ulysses_sequence_parallel_size: ${actor_rollout_ref.actor.ulysses_sequence_parallel_size} # sp size

trainer:
balance_batch: False
total_training_steps: 783
# auto: find the last ckpt to resume. If can't find, start from scratch
resume_mode: auto # or auto or resume_path if
default_hdfs_dir: null
remove_previous_ckpt_in_save: False
del_local_ckpt_after_load: False
val_before_train: False
3 changes: 0 additions & 3 deletions trinity/buffer/pipelines/task_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@


def check_and_run_task_pipeline(config: Config) -> Dict:
if not (config.mode == "explore" or config.mode == "both"):
# task pipeline is only available when using Explorer
return {}
if config.data_processor.task_pipeline is None:
return {}

Expand Down
16 changes: 15 additions & 1 deletion trinity/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,21 @@ def _check_buffer(self) -> None: # noqa: C901
)
if self.data_processor.task_pipeline is not None:
if self.data_processor.task_pipeline.output is None:
self.data_processor.task_pipeline.output = self.buffer.explorer_input.taskset
if self.buffer.explorer_input.taskset.path is not None:
self.data_processor.task_pipeline.output = self.buffer.explorer_input.taskset
elif (
self.buffer.trainer_input.experience_buffer.schema_type in {"dpo", "sft"}
and self.buffer.trainer_input.experience_buffer.path is not None
):
self.data_processor.task_pipeline.output = (
self.buffer.trainer_input.experience_buffer
)
else:
raise ValueError(
"`data_processor.task_pipeline.output` is required when both "
"`buffer.explorer_input.taskset.path` and `buffer.trainer_input.experience_buffer.path` are "
"None"
)
if self.data_processor.task_pipeline.output.path and os.path.exists(
self.data_processor.task_pipeline.output.path
):
Expand Down
4 changes: 4 additions & 0 deletions trinity/service/data_juicer/server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def _parse_task_pipeline_config(config: DJConfig) -> Namespace:


def group_scores(dataset: Dataset) -> Dataset:
if Fields.stats not in dataset.features:
return dataset
# for perplexity, normalize them with the max value.
stats_min_max = {}
for stats in dataset.features[Fields.stats]:
Expand Down Expand Up @@ -165,6 +167,8 @@ def compute_priority_scores(

from data_juicer.utils.constant import Fields

if Fields.stats not in sample:
return sample
stats = sample[Fields.stats]
if isinstance(stats, list):
stats = stats[0]
Expand Down