Skip to content

Add operator interface and experience pipeline#160

Merged
yanxi-chen merged 32 commits intoagentscope-ai:feature/data_processorfrom
pan-x-c:feature/experience_pipeline
Aug 8, 2025
Merged

Add operator interface and experience pipeline#160
yanxi-chen merged 32 commits intoagentscope-ai:feature/data_processorfrom
pan-x-c:feature/experience_pipeline

Conversation

@pan-x-c
Copy link
Collaborator

@pan-x-c pan-x-c commented Aug 4, 2025

Description

  • Add ExperienceOperator interface to support user-defined Experience processing logic
  • Add ExperiencePipeline for automatical operator execution
  • Refactor existing AddStrategy into Operator
  • Modify BufferConfig and DataProcessorConfig to be compatible with the above features.
image

Checklist

Please check the following items before code is ready to be reviewed.

  • Code has passed all tests
  • Docstrings have been added/updated in Google Style
  • Documentation has been updated
  • Code is ready for review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @pan-x-c, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new framework for processing "experiences" within the system. It defines an Experience_Operator interface for custom processing logic and an ExperiencePipeline to automate the execution of these operators, aiming to provide a flexible and extensible way to handle data transformations for reinforcement learning or similar data processing workflows.

Highlights

  • Configuration for Experience Processing: New configuration classes OperatorConfig and ExperiencePipelineConfig have been added to trinity/common/config.py to define the structure for operators and the experience processing pipeline.
  • Abstract Experience Operator: An abstract base class ExperienceOperator is introduced in trinity/data/operators/experience_operator.py, providing a standardized interface for custom experience processing logic, along with a registry for managing different operator implementations.
  • Distributed Experience Pipeline: A new ExperiencePipeline class is implemented in trinity/data/pipelines/experience_pipeline.py to orchestrate the distributed processing of experiences using Ray, allowing for reading from input buffers, applying a sequence of operators, and writing to an output buffer.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an ExperienceOperator interface and an ExperiencePipeline to allow for user-defined processing of experiences. The changes include new configuration options, the operator base class, and the pipeline implementation. My review focuses on improving type safety and fixing several critical issues in the ExperiencePipeline implementation related to buffer handling and Ray actor creation. These issues could lead to runtime errors and incorrect behavior.

@pan-x-c pan-x-c changed the title [WIP] Add operator interface and experience pipeline Add operator interface and experience pipeline Aug 6, 2025
@pan-x-c pan-x-c changed the base branch from experience_pipeline_dev to main August 6, 2025 12:20
@pan-x-c pan-x-c requested a review from Copilot August 6, 2025 12:42

This comment was marked as outdated.

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 6, 2025

/unittest-all

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 7, 2025

/unittest-all

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 8, 2025

/unittest-all

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 8, 2025

/unittest-all

@pan-x-c pan-x-c changed the base branch from main to feature/data_processor August 8, 2025 06:14
@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 8, 2025

/unittest-all

@pan-x-c pan-x-c requested a review from Copilot August 8, 2025 07:04
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds comprehensive support for experience data processing through a new operator interface and pipeline architecture, while deprecating the old AddStrategy pattern. The changes introduce a more flexible system for processing experiences between the Explorer and Trainer.

  • Introduces ExperienceOperator interface and ExperiencePipeline for automatic operator execution
  • Migrates from AddStrategy to AdvantageFn operators for advantage computation
  • Refactors buffer configuration to support the new pipeline architecture

Reviewed Changes

Copilot reviewed 36 out of 37 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
trinity/buffer/pipelines/experience_pipeline.py New experience pipeline implementation for automatic operator execution
trinity/buffer/operators/experience_operator.py Base interface for experience processing operators
trinity/algorithm/advantage_fn/*.py Refactored advantage functions to support both trainer and pipeline execution
trinity/common/config.py Updated config structure to support experience pipeline and operator configurations
trinity/explorer/explorer.py Modified to use experience pipeline instead of add strategy
trinity/common/experience.py Added utility functions for grouping experiences
trinity/utils/annotations.py New decorator annotations for experimental and deprecated features

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 8, 2025

/unittest-all

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 8, 2025

/unittest-all

@github-actions
Copy link

github-actions bot commented Aug 8, 2025

Summary

Tests 📝 Passed ✅ Failed ❌ Skipped ⏭️ Other ❓ Flaky 🍂 Duration ⏱️
94 94 0 0 0 0 1.5s

Tests

Test Name Status Flaky Duration
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_correct_bias_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_duplicate_add_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_grpo_args 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_reward_variance_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_step_wise_grpo_strategy 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_duplicate_grpo 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_advantage 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_correct_bias 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_reward_std 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_step_wise_grpo_advantage 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_dpo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_gspo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_mix_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_opmd_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_ppo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_sft_policy_loss 1ms
tests/buffer/experience_pipeline_test.py::TestExperiencePipeline::test_experience_pipeline 11ms
tests/buffer/file_test.py::TestFileBuffer::test_file_buffer 2ms
tests/buffer/file_test.py::TestFileBuffer::test_file_reader 1ms
tests/buffer/file_test.py::TestFileBuffer::test_file_writer 2ms
tests/buffer/queue_test.py::TestQueueBuffer::test_priority_queue_buffer_reuse 7ms
tests/buffer/queue_test.py::TestQueueBuffer::test_priority_queue_capacity 3ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_0_queue 4ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_1_priority_queue 4ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_capacity 4ms
tests/buffer/sql_test.py::TestSQLBuffer::test_create_sql_buffer 5ms
tests/common/config_test.py::TestConfig::test_all_examples_are_valid 1ms
tests/common/config_test.py::TestConfig::test_continue_from_checkpoint_is_valid 1ms
tests/common/config_test.py::TestConfig::test_load_default_config 4ms
tests/common/experience_test.py::TestEID::test_eid_properties 1ms
tests/common/experience_test.py::TestExperience::test_action_mask_and_logprobs_type 1ms
tests/common/experience_test.py::TestExperience::test_assertions 1ms
tests/common/experience_test.py::TestExperience::test_dpo_experience 1ms
tests/common/experience_test.py::TestExperience::test_gather 1ms
tests/common/experience_test.py::TestExperience::test_multi_turn_experience 1ms
tests/common/experience_test.py::TestExperience::test_serialize_deserialize 1ms
tests/common/experience_test.py::TestExperience::test_single_turn_experience 1ms
tests/common/experience_test.py::TestExperience::test_to_dict 1ms
tests/common/experience_test.py::TestExperienceConversion::test_batch_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_dpo_experience_batch_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_experience_model_experience_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_multiturn_experience_batch_converstion 1ms
tests/common/vllm_test.py::ModelWrapperTest_0::test_generate 39ms
tests/common/vllm_test.py::ModelWrapperTest_1::test_generate 49ms
tests/common/vllm_test.py::ModelWrapperTest_2::test_generate 51ms
tests/common/vllm_test.py::ModelWrapperTest_3::test_generate 38ms
tests/common/vllm_test.py::ModelWrapperTest_4::test_generate 49ms
tests/common/vllm_test.py::TestAPIServer::test_api 26ms
tests/common/vllm_test.py::TestTokenizer::test_assistant_token_mask 1ms
tests/common/vllm_test.py::TestAPIServerToolCall_0_deepseek_r1::test_api_tool_calls 23ms
tests/common/vllm_test.py::TestAPIServerToolCall_1::test_api_tool_calls 21ms
tests/explorer/explorer_test.py::BaseExplorerCase::test_explorer 1ms
tests/explorer/explorer_test.py::TestExplorerCountdownEval::test_explorer 51ms
tests/explorer/explorer_test.py::TestExplorerCountdownNoEval::test_explorer 46ms
tests/explorer/explorer_test.py::TestExplorerGSM8k::test_explorer 31ms
tests/explorer/scheduler_test.py::SchedulerTest::test_concurrent_operations 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_get_results 20ms
tests/explorer/scheduler_test.py::SchedulerTest::test_multi_step_execution 5ms
tests/explorer/scheduler_test.py::SchedulerTest::test_non_repeatable_workflow 5ms
tests/explorer/scheduler_test.py::SchedulerTest::test_scheduler_all_methods 14ms
tests/explorer/scheduler_test.py::SchedulerTest::test_scheduler_restart_after_stop 8ms
tests/explorer/scheduler_test.py::SchedulerTest::test_split_tasks 7ms
tests/explorer/scheduler_test.py::SchedulerTest::test_stepwise_experience_eid 5ms
tests/explorer/scheduler_test.py::SchedulerTest::test_wait_all 7ms
tests/explorer/scheduler_test.py::SchedulerTest::test_wait_all_timeout_with_multi_batch 13ms
tests/explorer/workflow_test.py::WorkflowTest::test_gsm8k_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_boxed_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_complex_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_eval_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_fraction_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_rm_gallery_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_workflow_repeatable 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_workflow_resettable 1ms
tests/manager/synchronizer_test.py::TestSynchronizerExit::test_synchronizer 31ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_0::test_synchronizer 63ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_1::test_synchronizer 65ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_2::test_synchronizer 71ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_3::test_synchronizer 68ms
tests/manager/synchronizer_test.py::TestNCCLBasedSynchronizer_0::test_synchronizer 56ms
tests/manager/synchronizer_test.py::TestNCCLBasedSynchronizer_1::test_synchronizer 57ms
tests/trainer/trainer_test.py::BaseTrainerCase::test_trainer 1ms
tests/trainer/trainer_test.py::TestTrainerCountdown::test_trainer 132ms
tests/trainer/trainer_test.py::TestStepAheadAsyncRL::test_trainer 54ms
tests/trainer/trainer_test.py::TestTrainerGSM8K::test_trainer 52ms
tests/trainer/trainer_test.py::TestTrainerSFTWarmupGSM8K::test_trainer 64ms
tests/trainer/trainer_test.py::TestTrainerDPO::test_trainer 32ms
tests/trainer/trainer_test.py::TestTrainerSFT::test_trainer 31ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_0_queue 76ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_1_priority_queue 67ms
tests/utils/eval_utils_test.py::TestMathEvalUtils::test_extract_answer 1ms
tests/utils/eval_utils_test.py::TestMathEvalUtils::test_verify_math_answer 1ms
tests/utils/eval_utils_test.py::TestEvalUtils::test_is_equiv 1ms
tests/utils/plugin_test.py::TestPluginLoader::test_load_plugins 5ms

Github Test Reporter by CTRF 💚

@yanxi-chen yanxi-chen merged commit e4b1e8c into agentscope-ai:feature/data_processor Aug 8, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments