diff --git a/tests/explorer/step_wise_workflow_test.py b/tests/explorer/step_wise_workflow_test.py index df51989865..389b8ec01c 100644 --- a/tests/explorer/step_wise_workflow_test.py +++ b/tests/explorer/step_wise_workflow_test.py @@ -1,15 +1,19 @@ # -*- coding: utf-8 -*- """Test for the general step-wise workflow module""" +import asyncio import unittest from dataclasses import dataclass, field from typing import Dict, Optional from unittest.mock import MagicMock +from parameterized import parameterized from torch import Tensor from tests.tools import get_unittest_dataset_config from trinity.common.experience import EID, Experience from trinity.common.workflows.step_wise_workflow import ( + AsyncRewardPropagationWorkflow, + AsyncStepWiseRewardWorkflow, RewardPropagationWorkflow, StepWiseRewardWorkflow, ) @@ -46,6 +50,26 @@ def max_step_num(self): return self.max_env_steps +class DummyAsyncStepWiseRewardWorkflow(AsyncStepWiseRewardWorkflow): + def __init__(self, model, task: Task, auxiliary_models=None): + super().__init__(task=task, model=model, auxiliary_models=auxiliary_models) + self.repeat_times = task.repeat_times + self.max_env_steps = task.workflow_args.get("max_env_steps", 1) + self.actual_steps = task.workflow_args.get("actual_steps", 1) + + async def step_async(self, step_num: int): + await asyncio.sleep(0.1) + return step_num < self.actual_steps - 1 + + async def reward_async(self, exps: list[Experience], step_num: int): + await asyncio.sleep(0.1) + return 0.1 * step_num + + @property + def max_step_num(self): + return self.max_env_steps + + class DummyRewardPropagationWorkflow(RewardPropagationWorkflow): def __init__(self, model, task: Task, auxiliary_models=None): super().__init__(task=task, model=model, auxiliary_models=auxiliary_models) @@ -64,6 +88,34 @@ def max_step_num(self): return self.max_env_steps +class DummyAsyncRewardPropagationWorkflow(AsyncRewardPropagationWorkflow): + def __init__(self, model, task: Task, auxiliary_models=None): + super().__init__(task=task, model=model, auxiliary_models=auxiliary_models) + self.repeat_times = task.repeat_times + self.max_env_steps = task.workflow_args.get("max_env_steps", 1) + self.actual_steps = task.workflow_args.get("actual_steps", 1) + + async def step_async(self, step_num: int): + await asyncio.sleep(0.1) + return step_num < self.actual_steps - 1 + + async def reward_async(self, exps: list[Experience]): + await asyncio.sleep(0.1) + return 0.1 * len(exps) + + @property + def max_step_num(self): + return self.max_env_steps + + +_dummy_workflows = [ + DummyStepWiseRewardWorkflow, + DummyAsyncStepWiseRewardWorkflow, + DummyRewardPropagationWorkflow, + DummyAsyncRewardPropagationWorkflow, +] + + class WorkflowTest(unittest.TestCase): def setUp(self) -> None: self.model = MagicMock() @@ -73,14 +125,18 @@ def setUp(self) -> None: ] self.taskset_config = get_unittest_dataset_config("countdown") - def test_step_wise_reward_workflow(self) -> None: + @parameterized.expand([(DummyStepWiseRewardWorkflow,), (DummyAsyncStepWiseRewardWorkflow,)]) + def test_step_wise_reward_workflow(self, workflow_cls) -> None: task = Task( - workflow=DummyStepWiseRewardWorkflow, + workflow=workflow_cls, repeat_times=self.taskset_config.repeat_times, workflow_args={"max_env_steps": 10, "actual_steps": 5}, ) workflow = task.to_workflow(model=self.model) - experiences = workflow.run() + if workflow.asynchronous: + experiences = asyncio.run(workflow.run_async()) + else: + experiences = workflow.run() self.assertEqual(len(experiences), 5) actual_steps = [exp.eid.step for exp in experiences] @@ -90,14 +146,20 @@ def test_step_wise_reward_workflow(self) -> None: for actual, expected in zip(actual_rewards, expected_rewards): self.assertAlmostEqual(actual, expected) # type: ignore - def test_reward_propagation_workflow(self) -> None: + @parameterized.expand( + [(DummyRewardPropagationWorkflow,), (DummyAsyncRewardPropagationWorkflow,)] + ) + def test_reward_propagation_workflow(self, workflow_cls) -> None: task = Task( - workflow=DummyRewardPropagationWorkflow, + workflow=workflow_cls, repeat_times=self.taskset_config.repeat_times, workflow_args={"max_env_steps": 10, "actual_steps": 5}, ) workflow = task.to_workflow(model=self.model) - experiences = workflow.run() + if workflow.asynchronous: + experiences = asyncio.run(workflow.run_async()) + else: + experiences = workflow.run() self.assertEqual(len(experiences), 5) actual_steps = [exp.eid.step for exp in experiences] @@ -107,38 +169,26 @@ def test_reward_propagation_workflow(self) -> None: self.assertAlmostEqual(exp.reward, expected_reward) # type: ignore def test_workflows_stop_at_max_env_steps(self) -> None: - task = Task( - workflow=DummyStepWiseRewardWorkflow, - repeat_times=self.taskset_config.repeat_times, - workflow_args={"max_env_steps": 3, "actual_steps": 100}, # actual > max - ) - workflow = task.to_workflow(model=self.model) - experiences = workflow.run() - self.assertEqual(len(experiences), 3) - - task = Task( - workflow=DummyRewardPropagationWorkflow, - repeat_times=self.taskset_config.repeat_times, - workflow_args={"max_env_steps": 3, "actual_steps": 100}, # actual > max - ) - workflow = task.to_workflow(model=self.model) - experiences = workflow.run() - self.assertEqual(len(experiences), 3) + for workflow in _dummy_workflows: + task = Task( + workflow=workflow, + repeat_times=self.taskset_config.repeat_times, + workflow_args={"max_env_steps": 3, "actual_steps": 100}, # actual > max + ) + workflow = task.to_workflow(model=self.model) + if workflow.asynchronous: + experiences = asyncio.run(workflow.run_async()) # type: ignore + else: + experiences = workflow.run() + self.assertEqual(len(experiences), 3) def test_workflows_raise_error(self) -> None: self.model.enable_history = False - task = Task( - workflow=DummyStepWiseRewardWorkflow, - repeat_times=self.taskset_config.repeat_times, - workflow_args={"max_env_steps": 10, "actual_steps": 5}, - ) - with self.assertRaises(AssertionError): - task.to_workflow(model=self.model) - - task = Task( - workflow=DummyRewardPropagationWorkflow, - repeat_times=self.taskset_config.repeat_times, - workflow_args={"max_env_steps": 10, "actual_steps": 5}, - ) - with self.assertRaises(AssertionError): - task.to_workflow(model=self.model) + for workflow in _dummy_workflows: + task = Task( + workflow=workflow, + repeat_times=self.taskset_config.repeat_times, + workflow_args={"max_env_steps": 10, "actual_steps": 5}, + ) + with self.assertRaises(AssertionError): + task.to_workflow(model=self.model) diff --git a/tests/explorer/workflow_test.py b/tests/explorer/workflow_test.py index 9dc5a16442..c6b6019c21 100644 --- a/tests/explorer/workflow_test.py +++ b/tests/explorer/workflow_test.py @@ -1,14 +1,19 @@ # -*- coding: utf-8 -*- """Test for the workflow module""" +import asyncio import unittest from dataclasses import dataclass, field from typing import Dict, Optional from unittest.mock import MagicMock +from parameterized import parameterized, parameterized_class from torch import Tensor -from tests.tools import get_unittest_dataset_config +from tests.common.vllm_test import CHAT_TEMPLATE +from tests.tools import get_model_path, get_template_config, get_unittest_dataset_config from trinity.common.experience import EID +from trinity.common.models import create_inference_models +from trinity.common.models.model import ModelWrapper from trinity.common.rewards import RMGalleryFn from trinity.common.workflows import ( MathBoxedWorkflow, @@ -17,7 +22,7 @@ MathWorkflow, Workflow, ) -from trinity.common.workflows.workflow import Task +from trinity.common.workflows.workflow import MultiTurnWorkflow, Task @dataclass @@ -68,6 +73,84 @@ def run(self): raise ValueError("Invalid output format") +class DummyAsyncWorkflow(Workflow): + def __init__(self, model, task: Task, auxiliary_models=None): + super().__init__(task=task, model=model, auxiliary_models=auxiliary_models) + self.obj = task.raw_task + self.output_format = task.workflow_args["output_format"] + self.repeat_times = task.rollout_args.n + + @property + def resettable(self): + return True + + @property + def repeatable(self): + return True + + @property + def asynchronous(self): + return True + + def reset(self, task: Task): + self.obj = task.raw_task + self.output_format = task.workflow_args["output_format"] + + def set_repeat_times(self, repeat_times, run_id_base): + self.repeat_times = repeat_times + self.run_id_base = run_id_base + + async def run_async(self): + await asyncio.sleep(0.1) + if self.output_format == "json": + import json + + return [json.dumps(self.obj)] * self.repeat_times + elif self.output_format == "yaml": + import yaml + + return [yaml.safe_dump(self.obj)] * self.repeat_times + else: + raise ValueError("Invalid output format") + + +class DummyMultiTurnWorkflow(MultiTurnWorkflow): + def __init__(self, model, task: Task, auxiliary_models=None): + super().__init__(task=task, model=model, auxiliary_models=auxiliary_models) + self.contents = task.raw_task["contents"] # type: ignore + + def run(self): + memory = [{"role": "system", "content": "You are a helpful assistant."}] + experience_list = [] + for content in self.contents: + memory.append({"role": "user", "content": content}) + memory.append({"role": "assistant", "content": content.upper()}) + experience = self.process_messages_to_experience(memory, 0, {}) + experience_list.append(experience) + return experience_list + + +class DummyAsyncMultiTurnWorkflow(MultiTurnWorkflow): + def __init__(self, model, task: Task, auxiliary_models=None): + super().__init__(task=task, model=model, auxiliary_models=auxiliary_models) + self.contents = task.raw_task["contents"] # type: ignore + + @property + def asynchronous(self): + return True + + async def run_async(self): + memory = [{"role": "system", "content": "You are a helpful assistant."}] + experience_list = [] + for content in self.contents: + await asyncio.sleep(0.1) + memory.append({"role": "user", "content": content}) + memory.append({"role": "assistant", "content": content.upper()}) + experience = self.process_messages_to_experience(memory, 0, {}) + experience_list.append(experience) + return experience_list + + class WorkflowTest(unittest.TestCase): def test_math_workflow(self) -> None: model = MagicMock() @@ -323,31 +406,39 @@ def test_math_eval_workflow(self) -> None: assert exp.metrics is not None, f"Metrics for response {i} should not be None" self.assertEqual(exp.metrics["accuracy"], expected_acc) - def test_workflow_resettable(self) -> None: + @parameterized.expand([(DummyWorkflow,), (DummyAsyncWorkflow,)]) + def test_workflow_resettable(self, workflow_cls) -> None: model = MagicMock() json_task = Task( - workflow=DummyWorkflow, + workflow=workflow_cls, repeat_times=1, raw_task={"a": 1}, workflow_args={"output_format": "json"}, ) yaml_task = Task( - workflow=DummyWorkflow, + workflow=workflow_cls, repeat_times=1, raw_task={"a": 1}, workflow_args={"output_format": "yaml"}, ) workflow = json_task.to_workflow(model) - answer = workflow.run() + if workflow.asynchronous: + answer = asyncio.run(workflow.run_async()) + else: + answer = workflow.run() self.assertEqual(answer[0], '{"a": 1}') workflow.reset(yaml_task) - answer = workflow.run() + if workflow.asynchronous: + answer = asyncio.run(workflow.run_async()) + else: + answer = workflow.run() self.assertEqual(answer[0], "a: 1\n") - def test_workflow_repeatable(self) -> None: + @parameterized.expand([(DummyWorkflow,), (DummyAsyncWorkflow,)]) + def test_workflow_repeatable(self, workflow_cls) -> None: model = MagicMock() task = Task( - workflow=DummyWorkflow, + workflow=workflow_cls, repeat_times=3, raw_task={"a": 1}, workflow_args={"output_format": "json"}, @@ -355,5 +446,47 @@ def test_workflow_repeatable(self) -> None: workflow = task.to_workflow(model) workflow.set_repeat_times(2, run_id_base=0) self.assertEqual(workflow.repeat_times, 2) - answer = workflow.run() + if workflow.asynchronous: + answer = asyncio.run(workflow.run_async()) + else: + answer = workflow.run() + self.assertEqual(len(answer), 2) + + +@parameterized_class( + ("workflow_cls",), + [ + (DummyMultiTurnWorkflow,), + (DummyAsyncMultiTurnWorkflow,), + ], +) +class MultiTurnWorkflowTest(unittest.TestCase): + def setUp(self): + # configure the model + self.config = get_template_config() + self.config.mode = "explore" + self.config.model.model_path = get_model_path() + self.config.model.max_model_len = None # self.max_model_len + self.config.explorer.rollout_model.engine_num = 1 # self.engine_num + self.config.explorer.rollout_model.tensor_parallel_size = 1 # self.tensor_parallel_size + self.config.explorer.rollout_model.chat_template = CHAT_TEMPLATE + self.config.algorithm.repeat_times = 2 # self.repeat_times + self.config.explorer.rollout_model.enable_history = True # self.enable_history + self.config.check_and_update() + self.engines, self.auxiliary_engines = create_inference_models(self.config) + self.model_wrapper = ModelWrapper(self.engines[0], engine_type="vllm", enable_history=True) + + def test_multi_turn_workflow(self): + task = Task( + workflow=self.workflow_cls, + repeat_times=3, + raw_task={"contents": ["hello world!", "how are you?"]}, + workflow_args={"output_format": "json"}, + ) + workflow = task.to_workflow(self.model_wrapper) + workflow.set_repeat_times(2, run_id_base=0) + if workflow.asynchronous: + answer = asyncio.run(workflow.run_async()) + else: + answer = workflow.run() self.assertEqual(len(answer), 2) diff --git a/trinity/common/models/model.py b/trinity/common/models/model.py index 80f978f227..2f55c45ffd 100644 --- a/trinity/common/models/model.py +++ b/trinity/common/models/model.py @@ -106,7 +106,7 @@ async def prepare(self) -> None: if response.status_code == 200: return except Exception as e: - self.logger.info(f"API server not ready (attempt {i+1}/{max_retries}): {e}") + self.logger.info(f"API server not ready (attempt {i + 1}/{max_retries}): {e}") await asyncio.sleep(interval) raise RuntimeError( f"API server at {self.api_address} not ready after {max_retries} attempts." diff --git a/trinity/common/workflows/step_wise_workflow.py b/trinity/common/workflows/step_wise_workflow.py index 23ae76fbf0..608eea3bf2 100644 --- a/trinity/common/workflows/step_wise_workflow.py +++ b/trinity/common/workflows/step_wise_workflow.py @@ -1,5 +1,3 @@ -from abc import abstractmethod - import openai from trinity.common.experience import Experience @@ -45,7 +43,6 @@ def run(self) -> list[Experience]: return experiences - @abstractmethod def step(self, step_num: int) -> bool: """Run a single step of your agent application. @@ -59,17 +56,16 @@ def step(self, step_num: int) -> bool: You can use the openai client (`self.client`) to migrate your existing applications at low cost. """ - pass + raise NotImplementedError - @abstractmethod def reward(self, exps: list[Experience], step_num: int) -> float: """Calculate the reward for the given experiences at the specified step.""" - pass + raise NotImplementedError @property - @abstractmethod def max_step_num(self): """Return the maximum number of steps in the task.""" + raise NotImplementedError @property def repeatable(self): @@ -104,7 +100,6 @@ async def run_async(self) -> list[Experience]: return experiences - @abstractmethod async def step_async(self, step_num: int) -> bool: """Run a single step of your agent application asynchronously. @@ -118,12 +113,11 @@ async def step_async(self, step_num: int) -> bool: You can use the openai client (`self.client`) to migrate your existing applications at low cost. """ - pass + raise NotImplementedError - @abstractmethod async def reward_async(self, exps: list[Experience], step_num: int) -> float: """Calculate the reward for the given experiences at the specified step asynchronously.""" - pass + raise NotImplementedError class RewardPropagationWorkflow(Workflow): @@ -166,7 +160,6 @@ def run(self) -> list[Experience]: exp.metrics["actual_env_steps"] = step + 1 # +1 because step starts from 0 return experiences - @abstractmethod def step(self, step_num: int) -> bool: """Run a single step of your agent application. @@ -180,17 +173,16 @@ def step(self, step_num: int) -> bool: You can use the openai client (`self.client`) to migrate your existing applications at low cost. """ - pass + raise NotImplementedError - @abstractmethod def reward(self, exps: list[Experience]) -> float: """Calculate the reward for the given experiences of the entire run.""" - pass + raise NotImplementedError @property - @abstractmethod def max_step_num(self): """Return the maximum number of steps in the task.""" + raise NotImplementedError @property def repeatable(self): @@ -219,7 +211,7 @@ async def run_async(self) -> list[Experience]: experiences.extend(exps) if not continue_run: break - reward = self.reward(experiences) + reward = await self.reward_async(experiences) for exp in experiences: exp.reward = reward if exp.metrics is None: @@ -227,7 +219,6 @@ async def run_async(self) -> list[Experience]: exp.metrics["actual_env_steps"] = step + 1 # +1 because step starts from 0 return experiences - @abstractmethod async def step_async(self, step_num: int) -> bool: """Run a single step of your agent application asynchronously. @@ -241,9 +232,8 @@ async def step_async(self, step_num: int) -> bool: You can use the openai client (`self.client`) to migrate your existing applications at low cost. """ - pass + raise NotImplementedError - @abstractmethod async def reward_async(self, exps: list[Experience]) -> float: """Calculate the reward for the given experiences of the entire run asynchronously.""" - pass + raise NotImplementedError diff --git a/trinity/common/workflows/workflow.py b/trinity/common/workflows/workflow.py index b5067d5bd8..19d27098a8 100644 --- a/trinity/common/workflows/workflow.py +++ b/trinity/common/workflows/workflow.py @@ -3,7 +3,6 @@ from __future__ import annotations -from abc import ABC, abstractmethod from dataclasses import asdict, dataclass, field from typing import Any, List, Optional, Type, Union @@ -75,7 +74,7 @@ def to_dict(self) -> dict: return self.raw_task # type: ignore -class Workflow(ABC): +class Workflow: """The base workflow class. A workflow is a runnable object which generates a list of experiences. @@ -158,10 +157,6 @@ def set_repeat_times(self, repeat_times, run_id_base): self.repeat_times = repeat_times self.run_id_base = run_id_base - @abstractmethod - def run(self) -> List[Experience]: - """Run workflow and return a list of experiences.""" - def process_messages_to_experience(self, messages, reward, info={}) -> Experience: converted_experience = self.model.convert_messages_to_experience(messages)