Skip to content

Add Task pipeline#189

Merged
pan-x-c merged 9 commits intoagentscope-ai:feature/data_processorfrom
pan-x-c:feature/task_pipeline
Aug 18, 2025
Merged

Add Task pipeline#189
pan-x-c merged 9 commits intoagentscope-ai:feature/data_processorfrom
pan-x-c:feature/task_pipeline

Conversation

@pan-x-c
Copy link
Collaborator

@pan-x-c pan-x-c commented Aug 15, 2025

Description

  • Add TaskPipeline for taskset pre-processing
  • Add process_task on data-juicer service
  • Add tests and docs

Checklist

Please check the following items before code is ready to be reviewed.

  • Code has passed all tests
  • Docstrings have been added/updated in Google Style
  • Documentation has been updated
  • Code is ready for review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @pan-x-c, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant architectural overhaul for data processing within the system. The core change is the implementation of new ExperiencePipeline and TaskPipeline components, designed to streamline the pre-processing of both rollout experiences and raw task datasets. This new pipeline leverages a modular 'Operator' framework, allowing for flexible and extensible data transformations. A key aspect of this refactoring is the re-integration of advantage function calculations (e.g., GRPO, OPMD) directly into this operator-based pipeline, moving away from previous 'add_strategy' mechanisms. Furthermore, the changes lay the groundwork for robust integration with external data processing services like Data-Juicer, enhancing the system's data handling capabilities. The overall impact is a more structured, efficient, and scalable approach to managing and transforming data throughout the reinforcement learning process.

Highlights

  • New Data Processing Pipelines and Operator Architecture: This PR introduces a new, modular data processing architecture with ExperiencePipeline and TaskPipeline. These pipelines orchestrate ExperienceOperators, which are reusable components for data transformations like filtering, mapping, and integrating external services.
  • Refactored Advantage Function Handling: The way advantage functions (like GRPO and OPMD) are handled has been refactored. They are now implemented as GroupAdvantage operators that run within the ExperiencePipeline, providing a more flexible and integrated approach to experience pre-processing.
  • Integration with Data-Juicer Service: A significant new feature is the integration with the Data-Juicer service. The DataJuicerOperator allows users to leverage Data-Juicer's advanced data cleaning and processing capabilities directly within the ExperiencePipeline and TaskPipeline.
  • Comprehensive Configuration and API Updates: The configuration system has been extensively updated to support the new pipeline and operator concepts. This includes new ExperiencePipelineConfig and TaskPipelineConfig classes, deprecation of old add_strategy parameters, and new service configurations for Data-Juicer.
  • HuggingFace Datasets Compatibility: Utility functions have been added to seamlessly convert Experience objects to and from HuggingFace Dataset format, enhancing compatibility and interoperability with the broader ML ecosystem.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@pan-x-c pan-x-c changed the base branch from main to feature/data_processor August 15, 2025 02:59
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant refactoring by replacing add_strategy with a more flexible ExperiencePipeline composed of operators. This change improves modularity and allows for more complex data processing workflows. The PR also adds a new TaskPipeline for pre-processing task datasets, integrating with an external DataJuicer service. The changes are extensive, touching documentation, configuration, core logic, and tests. My review focuses on ensuring the new components are robust and correct.

@pan-x-c pan-x-c changed the title [WIP] Add Task pipeline Add Task pipeline Aug 15, 2025
@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 15, 2025

/unittest-all

@github-actions
Copy link

Summary

Tests 📝 Passed ✅ Failed ❌ Skipped ⏭️ Other ❓ Flaky 🍂 Duration ⏱️
105 102 3 0 0 0 2.8s

Failed Tests

Failed Tests ❌ Fail Message
❌ tests/common/config_test.py::TestConfig::test_all_examples_are_valid The test failed in the call phase due to an exception
❌ tests/manager/synchronizer_test.py::TestNCCLBasedSynchronizer_0::test_synchronizer The test failed in the call phase
❌ tests/manager/synchronizer_test.py::TestNCCLBasedSynchronizer_1::test_synchronizer The test failed in the call phase

Tests

Test Name Status Flaky Duration
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_correct_bias_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_duplicate_add_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_grpo_args 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_reward_variance_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_step_wise_grpo_strategy 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_duplicate_grpo 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_advantage 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_correct_bias 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_reward_std 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_step_wise_grpo_advantage 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_dpo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_gspo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_mix_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_opmd_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_ppo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_sft_policy_loss 1ms
tests/buffer/experience_pipeline_test.py::TestExperiencePipeline::test_experience_pipeline 11ms
tests/buffer/file_test.py::TestFileBuffer::test_file_buffer 2ms
tests/buffer/file_test.py::TestFileBuffer::test_file_reader 1ms
tests/buffer/file_test.py::TestFileBuffer::test_file_writer 2ms
tests/buffer/queue_test.py::TestQueueBuffer::test_priority_queue_buffer_reuse 7ms
tests/buffer/queue_test.py::TestQueueBuffer::test_priority_queue_capacity 3ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_0_queue 4ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_1_priority_queue 4ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_capacity 4ms
tests/buffer/reward_shaping_mapper_test.py::TestRewardShapingMapper::test_basic_usage 1ms
tests/buffer/sql_test.py::TestSQLBuffer::test_create_sql_buffer 4ms
tests/common/config_test.py::TestConfig::test_all_examples_are_valid 1ms
tests/common/config_test.py::TestConfig::test_config_flatten 1ms
tests/common/config_test.py::TestConfig::test_continue_from_checkpoint_is_valid 1ms
tests/common/config_test.py::TestConfig::test_load_default_config 5ms
tests/common/experience_test.py::TestEID::test_eid_properties 1ms
tests/common/experience_test.py::TestExperience::test_action_mask_and_logprobs_type 1ms
tests/common/experience_test.py::TestExperience::test_assertions 1ms
tests/common/experience_test.py::TestExperience::test_dpo_experience 1ms
tests/common/experience_test.py::TestExperience::test_gather 1ms
tests/common/experience_test.py::TestExperience::test_hf_datasets_conversion 1ms
tests/common/experience_test.py::TestExperience::test_multi_turn_experience 1ms
tests/common/experience_test.py::TestExperience::test_serialize_deserialize 1ms
tests/common/experience_test.py::TestExperience::test_single_turn_experience 1ms
tests/common/experience_test.py::TestExperience::test_to_dict 1ms
tests/common/experience_test.py::TestExperienceConversion::test_batch_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_dpo_experience_batch_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_experience_model_experience_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_multiturn_experience_batch_converstion 1ms
tests/common/vllm_test.py::ModelWrapperTest_0::test_generate 36ms
tests/common/vllm_test.py::ModelWrapperTest_1::test_generate 52ms
tests/common/vllm_test.py::ModelWrapperTest_2::test_generate 48ms
tests/common/vllm_test.py::ModelWrapperTest_3::test_generate 35ms
tests/common/vllm_test.py::ModelWrapperTest_4::test_generate 46ms
tests/common/vllm_test.py::TestAPIServer::test_api 24ms
tests/common/vllm_test.py::TestTokenizer::test_assistant_token_mask 1ms
tests/common/vllm_test.py::TestAPIServerToolCall_0_deepseek_r1::test_api_tool_calls 21ms
tests/common/vllm_test.py::TestAPIServerToolCall_1::test_api_tool_calls 19ms
tests/explorer/explorer_test.py::BaseExplorerCase::test_explorer 1ms
tests/explorer/explorer_test.py::TestExplorerCountdownEval::test_explorer 47ms
tests/explorer/explorer_test.py::TestExplorerCountdownNoEval::test_explorer 52ms
tests/explorer/explorer_test.py::TestExplorerGSM8k::test_explorer 199ms
tests/explorer/scheduler_test.py::SchedulerTest::test_concurrent_operations 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_get_results 19ms
tests/explorer/scheduler_test.py::SchedulerTest::test_multi_step_execution 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_non_repeatable_workflow 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_scheduler_all_methods 14ms
tests/explorer/scheduler_test.py::SchedulerTest::test_scheduler_restart_after_stop 8ms
tests/explorer/scheduler_test.py::SchedulerTest::test_split_tasks 7ms
tests/explorer/scheduler_test.py::SchedulerTest::test_stepwise_experience_eid 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_wait_all 7ms
tests/explorer/scheduler_test.py::SchedulerTest::test_wait_all_timeout_with_multi_batch 13ms
tests/explorer/step_wise_workflow_test.py::WorkflowTest::test_reward_propagation_workflow 1ms
tests/explorer/step_wise_workflow_test.py::WorkflowTest::test_step_wise_reward_workflow 1ms
tests/explorer/step_wise_workflow_test.py::WorkflowTest::test_workflows_raise_error 1ms
tests/explorer/step_wise_workflow_test.py::WorkflowTest::test_workflows_stop_at_max_env_steps 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_gsm8k_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_boxed_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_complex_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_eval_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_fraction_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_rm_gallery_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_workflow_repeatable 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_workflow_resettable 1ms
tests/manager/synchronizer_test.py::TestSynchronizerExit::test_synchronizer 29ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_0::test_synchronizer 61ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_1::test_synchronizer 64ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_2::test_synchronizer 95ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_3::test_synchronizer 103ms
tests/manager/synchronizer_test.py::TestNCCLBasedSynchronizer_0::test_synchronizer 200ms
tests/manager/synchronizer_test.py::TestNCCLBasedSynchronizer_1::test_synchronizer 5ms
tests/service/data_juicer_test.py::TestDataJuicer::test_config 1ms
tests/service/data_juicer_test.py::TestDataJuicer::test_server_start 21ms
tests/service/data_juicer_test.py::TestDataJuicerExperiencePipeline::test_data_juicer_operators 21ms
tests/service/data_juicer_test.py::TestDataJuicerTaskPipeline::test_data_juicer_task_pipeline 14ms
tests/trainer/trainer_test.py::BaseTrainerCase::test_trainer 1ms
tests/trainer/trainer_test.py::TestTrainerCountdown::test_trainer 1.1s
tests/trainer/trainer_test.py::TestStepAheadAsyncRL::test_trainer 72ms
tests/trainer/trainer_test.py::TestTrainerGSM8K::test_trainer 59ms
tests/trainer/trainer_test.py::TestTrainerSFTWarmupGSM8K::test_trainer 52ms
tests/trainer/trainer_test.py::TestTrainerDPO::test_trainer 32ms
tests/trainer/trainer_test.py::TestTrainerSFT::test_trainer 30ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_0_queue 62ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_1_priority_queue 62ms
tests/utils/eval_utils_test.py::TestMathEvalUtils::test_extract_answer 1ms
tests/utils/eval_utils_test.py::TestMathEvalUtils::test_verify_math_answer 1ms
tests/utils/eval_utils_test.py::TestEvalUtils::test_is_equiv 1ms
tests/utils/plugin_test.py::TestPluginLoader::test_load_plugins 4ms

Github Test Reporter by CTRF 💚

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 18, 2025

/unittest-all

@github-actions
Copy link

Summary

Tests 📝 Passed ✅ Failed ❌ Skipped ⏭️ Other ❓ Flaky 🍂 Duration ⏱️
105 103 2 0 0 0 1.7s

Failed Tests

Failed Tests ❌ Fail Message
❌ tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_0_queue The test failed in the call phase due to an assertion error
❌ tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_1_priority_queue The test failed in the call phase due to an assertion error

Tests

Test Name Status Flaky Duration
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_correct_bias_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_duplicate_add_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_grpo_args 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_reward_variance_strategy 1ms
tests/algorithm/add_strategy_test.py::TestAddStrategy::test_step_wise_grpo_strategy 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_duplicate_grpo 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_advantage 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_correct_bias 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_grpo_reward_std 1ms
tests/algorithm/advantage_fn_test.py::TestGroupedAdvantageFn::test_step_wise_grpo_advantage 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_dpo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_gspo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_mix_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_opmd_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_ppo_policy_loss 1ms
tests/algorithm/policy_loss_test.py::VerlPolicyLossTest::test_sft_policy_loss 1ms
tests/buffer/experience_pipeline_test.py::TestExperiencePipeline::test_experience_pipeline 11ms
tests/buffer/file_test.py::TestFileBuffer::test_file_buffer 2ms
tests/buffer/file_test.py::TestFileBuffer::test_file_reader 1ms
tests/buffer/file_test.py::TestFileBuffer::test_file_writer 2ms
tests/buffer/queue_test.py::TestQueueBuffer::test_priority_queue_buffer_reuse 7ms
tests/buffer/queue_test.py::TestQueueBuffer::test_priority_queue_capacity 3ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_0_queue 4ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_1_priority_queue 4ms
tests/buffer/queue_test.py::TestQueueBuffer::test_queue_buffer_capacity 4ms
tests/buffer/reward_shaping_mapper_test.py::TestRewardShapingMapper::test_basic_usage 1ms
tests/buffer/sql_test.py::TestSQLBuffer::test_create_sql_buffer 4ms
tests/common/config_test.py::TestConfig::test_all_examples_are_valid 1ms
tests/common/config_test.py::TestConfig::test_config_flatten 1ms
tests/common/config_test.py::TestConfig::test_continue_from_checkpoint_is_valid 1ms
tests/common/config_test.py::TestConfig::test_load_default_config 4ms
tests/common/experience_test.py::TestEID::test_eid_properties 1ms
tests/common/experience_test.py::TestExperience::test_action_mask_and_logprobs_type 1ms
tests/common/experience_test.py::TestExperience::test_assertions 1ms
tests/common/experience_test.py::TestExperience::test_dpo_experience 1ms
tests/common/experience_test.py::TestExperience::test_gather 1ms
tests/common/experience_test.py::TestExperience::test_hf_datasets_conversion 1ms
tests/common/experience_test.py::TestExperience::test_multi_turn_experience 1ms
tests/common/experience_test.py::TestExperience::test_serialize_deserialize 1ms
tests/common/experience_test.py::TestExperience::test_single_turn_experience 1ms
tests/common/experience_test.py::TestExperience::test_to_dict 1ms
tests/common/experience_test.py::TestExperienceConversion::test_batch_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_dpo_experience_batch_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_experience_model_experience_conversion 1ms
tests/common/experience_test.py::TestExperienceConversion::test_multiturn_experience_batch_converstion 1ms
tests/common/vllm_test.py::ModelWrapperTest_0::test_generate 36ms
tests/common/vllm_test.py::ModelWrapperTest_1::test_generate 53ms
tests/common/vllm_test.py::ModelWrapperTest_2::test_generate 48ms
tests/common/vllm_test.py::ModelWrapperTest_3::test_generate 36ms
tests/common/vllm_test.py::ModelWrapperTest_4::test_generate 45ms
tests/common/vllm_test.py::TestAPIServer::test_api 24ms
tests/common/vllm_test.py::TestTokenizer::test_assistant_token_mask 1ms
tests/common/vllm_test.py::TestAPIServerToolCall_0_deepseek_r1::test_api_tool_calls 22ms
tests/common/vllm_test.py::TestAPIServerToolCall_1::test_api_tool_calls 20ms
tests/explorer/explorer_test.py::BaseExplorerCase::test_explorer 1ms
tests/explorer/explorer_test.py::TestExplorerCountdownEval::test_explorer 52ms
tests/explorer/explorer_test.py::TestExplorerCountdownNoEval::test_explorer 49ms
tests/explorer/explorer_test.py::TestExplorerGSM8k::test_explorer 199ms
tests/explorer/scheduler_test.py::SchedulerTest::test_concurrent_operations 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_get_results 19ms
tests/explorer/scheduler_test.py::SchedulerTest::test_multi_step_execution 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_non_repeatable_workflow 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_scheduler_all_methods 14ms
tests/explorer/scheduler_test.py::SchedulerTest::test_scheduler_restart_after_stop 8ms
tests/explorer/scheduler_test.py::SchedulerTest::test_split_tasks 7ms
tests/explorer/scheduler_test.py::SchedulerTest::test_stepwise_experience_eid 4ms
tests/explorer/scheduler_test.py::SchedulerTest::test_wait_all 7ms
tests/explorer/scheduler_test.py::SchedulerTest::test_wait_all_timeout_with_multi_batch 13ms
tests/explorer/step_wise_workflow_test.py::WorkflowTest::test_reward_propagation_workflow 1ms
tests/explorer/step_wise_workflow_test.py::WorkflowTest::test_step_wise_reward_workflow 1ms
tests/explorer/step_wise_workflow_test.py::WorkflowTest::test_workflows_raise_error 1ms
tests/explorer/step_wise_workflow_test.py::WorkflowTest::test_workflows_stop_at_max_env_steps 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_gsm8k_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_boxed_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_complex_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_eval_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_fraction_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_math_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_rm_gallery_workflow 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_workflow_repeatable 1ms
tests/explorer/workflow_test.py::WorkflowTest::test_workflow_resettable 1ms
tests/manager/synchronizer_test.py::TestSynchronizerExit::test_synchronizer 28ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_0::test_synchronizer 62ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_1::test_synchronizer 68ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_2::test_synchronizer 83ms
tests/manager/synchronizer_test.py::TestStateDictBasedSynchronizer_3::test_synchronizer 89ms
tests/manager/synchronizer_test.py::TestNCCLBasedSynchronizer_0::test_synchronizer 56ms
tests/manager/synchronizer_test.py::TestNCCLBasedSynchronizer_1::test_synchronizer 57ms
tests/service/data_juicer_test.py::TestDataJuicer::test_config 1ms
tests/service/data_juicer_test.py::TestDataJuicer::test_server_start 21ms
tests/service/data_juicer_test.py::TestDataJuicerExperiencePipeline::test_data_juicer_operators 21ms
tests/service/data_juicer_test.py::TestDataJuicerTaskPipeline::test_data_juicer_task_pipeline 14ms
tests/trainer/trainer_test.py::BaseTrainerCase::test_trainer 1ms
tests/trainer/trainer_test.py::TestTrainerCountdown::test_trainer 140ms
tests/trainer/trainer_test.py::TestStepAheadAsyncRL::test_trainer 67ms
tests/trainer/trainer_test.py::TestTrainerGSM8K::test_trainer 48ms
tests/trainer/trainer_test.py::TestTrainerSFTWarmupGSM8K::test_trainer 57ms
tests/trainer/trainer_test.py::TestTrainerDPO::test_trainer 33ms
tests/trainer/trainer_test.py::TestTrainerSFT::test_trainer 30ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_0_queue 55ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_1_priority_queue 54ms
tests/utils/eval_utils_test.py::TestMathEvalUtils::test_extract_answer 1ms
tests/utils/eval_utils_test.py::TestMathEvalUtils::test_verify_math_answer 1ms
tests/utils/eval_utils_test.py::TestEvalUtils::test_is_equiv 1ms
tests/utils/plugin_test.py::TestPluginLoader::test_load_plugins 4ms

Github Test Reporter by CTRF 💚

@pan-x-c
Copy link
Collaborator Author

pan-x-c commented Aug 18, 2025

/unittest-module-trainer

@github-actions
Copy link

Summary

Tests 📝 Passed ✅ Failed ❌ Skipped ⏭️ Other ❓ Flaky 🍂 Duration ⏱️
9 9 0 0 0 0 480ms

Tests

Test Name Status Flaky Duration
tests/trainer/trainer_test.py::BaseTrainerCase::test_trainer 2ms
tests/trainer/trainer_test.py::TestTrainerCountdown::test_trainer 121ms
tests/trainer/trainer_test.py::TestStepAheadAsyncRL::test_trainer 59ms
tests/trainer/trainer_test.py::TestTrainerGSM8K::test_trainer 48ms
tests/trainer/trainer_test.py::TestTrainerSFTWarmupGSM8K::test_trainer 58ms
tests/trainer/trainer_test.py::TestTrainerDPO::test_trainer 34ms
tests/trainer/trainer_test.py::TestTrainerSFT::test_trainer 29ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_0_queue 60ms
tests/trainer/trainer_test.py::TestFullyAsyncMode::test_fully_async_mode_1_priority_queue 60ms

Github Test Reporter by CTRF 💚

@pan-x-c pan-x-c requested a review from Copilot August 18, 2025 03:40
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds task pipeline functionality for preprocessing tasksets using Data-Juicer operators. The task pipeline allows users to apply data processing operations to raw task files before they are used in the exploration phase.

  • Add TaskPipeline class for processing task datasets
  • Add process_task endpoint to the Data-Juicer service
  • Refactor configuration to support both task and experience pipelines

Reviewed Changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
trinity/utils/distributed.py Add port availability check utility function
trinity/service/data_juicer/server/utils.py Add task pipeline config parsing and validation
trinity/service/data_juicer/server/session.py Separate experience and task processing methods
trinity/service/data_juicer/server/server.py Add process_task endpoint
trinity/service/data_juicer/client.py Add task processing client method with improved logging
trinity/explorer/explorer.py Improve shutdown handling with proper resource cleanup
trinity/data/utils.py Remove deprecated data processor utilities
trinity/common/config.py Add TaskPipelineConfig and update data processor config
trinity/cli/launcher.py Replace deprecated data processor activation with task pipeline
trinity/buffer/pipelines/task_pipeline.py Main task pipeline implementation
trinity/buffer/pipelines/init.py Export task pipeline components
trinity/buffer/operators/data_juicer_operator.py Update to specify experience pipeline type
tests/service/data_juicer_test.py Add comprehensive task pipeline tests
examples/grpo_gsm8k_task_pipeline/gsm8k.yaml Update example to use new task pipeline config
docs/sphinx_doc/source/tutorial/trinity_programming_guide.md Update documentation

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@pan-x-c pan-x-c merged commit 577fddc into agentscope-ai:feature/data_processor Aug 18, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants