From 697e674f5efb5fc5cc8b1aff4a453a1082a1b58d Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 27 Jan 2023 19:35:00 -0800 Subject: [PATCH 1/9] fix wiring Signed-off-by: Eric Liang --- .../execution/operators/actor_pool_map_operator.py | 4 ++-- python/ray/data/tests/test_operators.py | 14 +++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 2e67572edbf7..fba9289690b5 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -60,7 +60,7 @@ def __init__( ObjectRef[ObjectRefGenerator], Tuple[_TaskState, ray.actor.ActorHandle] ] = {} # A pool of running actors on which we can execute mapper tasks. - self._actor_pool = _ActorPool() + self._actor_pool = _ActorPool(autoscaling_policy._config.max_tasks_in_flight) # A queue of bundles awaiting dispatch to actors. self._bundle_queue = collections.deque() # Cached actor class. @@ -198,7 +198,7 @@ def num_active_work_refs(self) -> int: def progress_str(self) -> str: return ( - f"{self._actor_pool.num_running_actors()} " + f"{self._actor_pool.num_running_actors()} actors " f"({self._actor_pool.num_pending_actors()} pending)" ) diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index d616d71d0215..6957012dd761 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -101,7 +101,9 @@ def dummy_all_transform(bundles: List[RefBundle]): def test_map_operator_bulk(ray_start_regular_shared, use_actors): # Create with inputs. input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)])) - compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy() + compute_strategy = ( + ActorPoolStrategy(max_size=1) if use_actors else TaskPoolStrategy() + ) op = MapOperator.create( _mul2_transform, input_op=input_op, @@ -113,7 +115,7 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors): op.start(ExecutionOptions()) if use_actors: # Actor will be pending after starting the operator. - assert op.progress_str() == "0 (1 pending)" + assert op.progress_str() == "0 actors (1 pending)" while input_op.has_next(): op.add_input(input_op.get_next(), 0) op.inputs_done() @@ -126,10 +128,10 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors): if use_actors and work_refs: # After actor is ready (first work ref resolved), actor will remain ready # while there is work to do. - assert op.progress_str() == "1 (0 pending)" + assert op.progress_str() == "1 actors (0 pending)" if use_actors: # After all work is done, actor will have been killed to free up resources.. - assert op.progress_str() == "0 (0 pending)" + assert op.progress_str() == "0 actors (0 pending)" else: assert op.progress_str() == "" @@ -227,7 +229,9 @@ def test_map_operator_ray_args(shutdown_only, use_actors): ray.init(num_cpus=0, num_gpus=1) # Create with inputs. input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(10)])) - compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy() + compute_strategy = ( + ActorPoolStrategy(max_size=1) if use_actors else TaskPoolStrategy() + ) op = MapOperator.create( _mul2_transform, input_op=input_op, From 2d6fcbb82ecb18662fa64c21ddccceb8a519db0e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 27 Jan 2023 19:45:18 -0800 Subject: [PATCH 2/9] update Signed-off-by: Eric Liang --- .../execution/operators/actor_pool_map_operator.py | 9 +++++---- python/ray/data/tests/test_operators.py | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index fba9289690b5..f429a00818cd 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -197,10 +197,11 @@ def num_active_work_refs(self) -> int: return len(self._tasks) def progress_str(self) -> str: - return ( - f"{self._actor_pool.num_running_actors()} actors " - f"({self._actor_pool.num_pending_actors()} pending)" - ) + base = f"{self._actor_pool.num_running_actors()} actors" + pending = self._actor_pool.num_pending_actors() + if pending: + base += f" ({pending} pending)" + return base def base_resource_usage(self) -> ExecutionResources: min_workers = self._autoscaling_policy.min_workers diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index 6957012dd761..34ad4a1e9775 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -128,10 +128,10 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors): if use_actors and work_refs: # After actor is ready (first work ref resolved), actor will remain ready # while there is work to do. - assert op.progress_str() == "1 actors (0 pending)" + assert op.progress_str() == "1 actors" if use_actors: # After all work is done, actor will have been killed to free up resources.. - assert op.progress_str() == "0 actors (0 pending)" + assert op.progress_str() == "0 actors" else: assert op.progress_str() == "" From 7d3598360d641b7d99e797c2e5338ad5705b9e02 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 27 Jan 2023 20:01:54 -0800 Subject: [PATCH 3/9] fix queue size reporting for actor pool ops Signed-off-by: Eric Liang --- python/ray/data/_internal/execution/interfaces.py | 7 +++++++ .../execution/operators/actor_pool_map_operator.py | 3 +++ .../_internal/execution/streaming_executor_state.py | 11 ++++++++--- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/interfaces.py b/python/ray/data/_internal/execution/interfaces.py index 3f05eb7e05cb..899b6ac9379f 100644 --- a/python/ray/data/_internal/execution/interfaces.py +++ b/python/ray/data/_internal/execution/interfaces.py @@ -293,6 +293,13 @@ def num_active_work_refs(self) -> int: """ return len(self.get_work_refs()) + def internal_queue_size(self) -> int: + """If the operator has an internal input queue, return its size. + + This is used to report tasks pending submission to actor pools. + """ + return 0 + def notify_work_completed(self, work_ref: ray.ObjectRef) -> None: """Executor calls this when the given work is completed and local. diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index f429a00818cd..831e4190ba3c 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -68,6 +68,9 @@ def __init__( # Whether no more submittable bundles will be added. self._inputs_done = False + def internal_queue_size(self) -> int: + return len(self._bundle_queue) + def start(self, options: ExecutionOptions): super().start(options) diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index a208dc5ecb96..e8f0bf1a8b9c 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -77,7 +77,7 @@ def refresh_progress_bar(self) -> None: self.progress_bar.set_description(self.summary_str()) def summary_str(self) -> str: - queued = self.num_queued() + queued = self.num_queued() + self.op.internal_queue_size() desc = f"{self.op.name}: {self.num_active_tasks()} active, {queued} queued" suffix = self.op.progress_str() if suffix: @@ -237,9 +237,14 @@ def select_operator_to_run( if not ops: return None - # Equally penalize outqueue length and active tasks for backpressure. + # Equally penalize outqueue length and active tasks for backpressure. Note that + # for actor pool ops, we must also include the internal queue size, since + # num_active_tasks() does not reflect tasks pending submission to the actor pool. return min( - ops, key=lambda op: len(topology[op].outqueue) + topology[op].num_active_tasks() + ops, + key=lambda op: len(topology[op].outqueue) + + topology[op].num_active_tasks() + + op.internal_queue_size(), ) From d15db36db08971ea371863223962e62728563988 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 27 Jan 2023 22:22:43 -0800 Subject: [PATCH 4/9] add e2e autoscaling sanity checks --- python/ray/data/tests/test_operators.py | 8 ++ .../ray/data/tests/test_streaming_executor.py | 116 +++++++++++++++++- 2 files changed, 121 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index 34ad4a1e9775..3b433448f4e9 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -116,8 +116,15 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors): if use_actors: # Actor will be pending after starting the operator. assert op.progress_str() == "0 actors (1 pending)" + assert op.internal_queue_size() == 0 + i = 0 while input_op.has_next(): op.add_input(input_op.get_next(), 0) + i += 1 + if use_actors: + assert op.internal_queue_size() == i + else: + assert op.internal_queue_size() == 0 op.inputs_done() work_refs = op.get_work_refs() while work_refs: @@ -129,6 +136,7 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors): # After actor is ready (first work ref resolved), actor will remain ready # while there is work to do. assert op.progress_str() == "1 actors" + assert op.internal_queue_size() == 0 if use_actors: # After all work is done, actor will have been killed to free up resources.. assert op.progress_str() == "0 actors" diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 2762ec0a69f5..1013c8f3bd96 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -144,11 +144,26 @@ def test_select_operator_to_run(ray_start_10_cpus_shared): # Test backpressure includes num active tasks as well. topo[o3].num_active_tasks = MagicMock(return_value=2) + o3.internal_queue_size = MagicMock(return_value=0) + assert ( + select_operator_to_run(topo, ExecutionResources(), ExecutionResources(), True) + == o2 + ) + # Internal queue size is added to num active tasks. + topo[o3].num_active_tasks = MagicMock(return_value=0) + o3.internal_queue_size = MagicMock(return_value=2) assert ( select_operator_to_run(topo, ExecutionResources(), ExecutionResources(), True) == o2 ) topo[o2].num_active_tasks = MagicMock(return_value=2) + o2.internal_queue_size = MagicMock(return_value=0) + assert ( + select_operator_to_run(topo, ExecutionResources(), ExecutionResources(), True) + == o3 + ) + topo[o2].num_active_tasks = MagicMock(return_value=0) + o2.internal_queue_size = MagicMock(return_value=2) assert ( select_operator_to_run(topo, ExecutionResources(), ExecutionResources(), True) == o3 @@ -321,9 +336,12 @@ def run(): ) run() - DatasetContext.get_current().execution_options.resource_limits.cpu = 1 - with pytest.raises(ValueError): - run() + try: + DatasetContext.get_current().execution_options.resource_limits.cpu = 1 + with pytest.raises(ValueError): + run() + finally: + DatasetContext.get_current().execution_options.resource_limits.cpu = None def test_configure_spread_e2e(ray_start_10_cpus_shared): @@ -464,6 +482,98 @@ def test_configure_output_locality(ray_start_10_cpus_shared): ) +def test_e2e_autoscaling_up(ray_start_10_cpus_shared): + DatasetContext.get_current().new_execution_backend = True + DatasetContext.get_current().use_streaming_executor = True + + @ray.remote(max_concurrency=10) + class Barrier: + def __init__(self, n, delay=0): + self.n = n + self.delay = delay + self.max_waiters = 0 + self.cur_waiters = 0 + + def wait(self): + self.cur_waiters += 1 + if self.cur_waiters > self.max_waiters: + self.max_waiters = self.cur_waiters + self.n -= 1 + print("wait", self.n) + while self.n > 0: + time.sleep(0.1) + time.sleep(self.delay) + print("wait done") + self.cur_waiters -= 1 + + def get_max_waiters(self): + return self.max_waiters + + b1 = Barrier.remote(6) + + def barrier1(x): + ray.get(b1.wait.remote(), timeout=10) + return x + + # Tests that we autoscale up to necessary size. + # 6 tasks + 1 tasks in flight per actor => need at least 6 actors to run. + ray.data.range(6, parallelism=6).map_batches( + barrier1, + compute=ray.data.ActorPoolStrategy(1, 6, max_tasks_in_flight_per_actor=1), + batch_size=None, + ).take_all() + assert ray.get(b1.get_max_waiters.remote()) == 6 + + b2 = Barrier.remote(3, delay=2) + + def barrier2(x): + ray.get(b2.wait.remote(), timeout=10) + return x + + # Tests that we don't over-scale up. + # 6 tasks + 2 tasks in flight per actor => only scale up to 3 actors + ray.data.range(6, parallelism=6).map_batches( + barrier2, + compute=ray.data.ActorPoolStrategy(1, 3, max_tasks_in_flight_per_actor=2), + batch_size=None, + ).take_all() + assert ray.get(b2.get_max_waiters.remote()) == 3 + + # Tests that the max pool size is respected. + b3 = Barrier.remote(6) + + def barrier3(x): + ray.get(b3.wait.remote(), timeout=2) + return x + + # This will hang, since the actor pool is too small. + with pytest.raises(ray.exceptions.RayTaskError): + ray.data.range(6, parallelism=6).map( + barrier3, compute=ray.data.ActorPoolStrategy(1, 2) + ).take_all() + + +def test_e2e_autoscaling_down(ray_start_10_cpus_shared): + DatasetContext.get_current().new_execution_backend = True + DatasetContext.get_current().use_streaming_executor = True + + def f(x): + time.sleep(1) + return x + + # Tests that autoscaling works even when resource constrained via actor killing. + # To pass this, we need to autoscale down to free up slots for task execution. + try: + DatasetContext.get_current().execution_options.resource_limits.cpu = 2 + ray.data.range(5, parallelism=5).map_batches( + f, + compute=ray.data.ActorPoolStrategy(1, 2), + batch_size=None, + ).map_batches(lambda x: x, batch_size=None, num_cpus=2).take_all() + finally: + DatasetContext.get_current().execution_options.resource_limits.cpu = None + + if __name__ == "__main__": import sys From 53c712bd744f3f254ea3fa14fbbb050fb53154e3 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 27 Jan 2023 22:30:02 -0800 Subject: [PATCH 5/9] update tests Signed-off-by: Eric Liang --- .../ray/data/tests/test_streaming_executor.py | 284 +---------------- .../data/tests/test_streaming_integration.py | 291 ++++++++++++++++++ 2 files changed, 301 insertions(+), 274 deletions(-) create mode 100644 python/ray/data/tests/test_streaming_integration.py diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 1013c8f3bd96..8ca97507f001 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -2,18 +2,13 @@ import time from unittest.mock import MagicMock -from typing import List, Any - import ray -from ray.data.context import DatasetContext from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, - RefBundle, PhysicalOperator, ) from ray.data._internal.execution.streaming_executor import ( - StreamingExecutor, _debug_dump_topology, _validate_topology, ) @@ -24,13 +19,10 @@ select_operator_to_run, _execution_allowed, ) -from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator from ray.data._internal.execution.operators.map_operator import MapOperator from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.util import make_ref_bundles from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy -from ray._private.test_utils import wait_for_condition -from ray.data.tests.conftest import * # noqa @ray.remote @@ -46,15 +38,7 @@ def map_fn(block_iter): return map_fn -def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: - output = [] - for bundle in bundles: - for block, _ in bundle.blocks: - output.append(ray.get(block)) - return output - - -def test_build_streaming_topology(ray_start_10_cpus_shared): +def test_build_streaming_topology(): inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) @@ -68,7 +52,7 @@ def test_build_streaming_topology(ray_start_10_cpus_shared): assert list(topo) == [o1, o2, o3] -def test_disallow_non_unique_operators(ray_start_10_cpus_shared): +def test_disallow_non_unique_operators(): inputs = make_ref_bundles([[x] for x in range(20)]) # An operator [o1] cannot used in the same DAG twice. o1 = InputDataBuffer(inputs) @@ -79,7 +63,7 @@ def test_disallow_non_unique_operators(ray_start_10_cpus_shared): build_streaming_topology(o4, ExecutionOptions()) -def test_process_completed_tasks(ray_start_10_cpus_shared): +def test_process_completed_tasks(): inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) @@ -111,7 +95,7 @@ def test_process_completed_tasks(ray_start_10_cpus_shared): o2.inputs_done.assert_called_once() -def test_select_operator_to_run(ray_start_10_cpus_shared): +def test_select_operator_to_run(): opt = ExecutionOptions() inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) @@ -170,7 +154,7 @@ def test_select_operator_to_run(ray_start_10_cpus_shared): ) -def test_dispatch_next_task(ray_start_10_cpus_shared): +def test_dispatch_next_task(): inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o1_state = OpState(o1, []) @@ -190,7 +174,7 @@ def test_dispatch_next_task(ray_start_10_cpus_shared): assert o2.add_input.called_once_with("dummy2") -def test_debug_dump_topology(ray_start_10_cpus_shared): +def test_debug_dump_topology(): opt = ExecutionOptions() inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) @@ -201,7 +185,7 @@ def test_debug_dump_topology(ray_start_10_cpus_shared): _debug_dump_topology(topo) -def test_validate_topology(ray_start_10_cpus_shared): +def test_validate_topology(): opt = ExecutionOptions() inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) @@ -223,7 +207,7 @@ def test_validate_topology(ray_start_10_cpus_shared): _validate_topology(topo, ExecutionResources(cpu=10)) -def test_execution_allowed(ray_start_10_cpus_shared): +def test_execution_allowed(): op = InputDataBuffer([]) # CPU. @@ -268,7 +252,7 @@ def test_execution_allowed(ray_start_10_cpus_shared): ) -def test_select_ops_ensure_at_least_one_live_operator(ray_start_10_cpus_shared): +def test_select_ops_ensure_at_least_one_live_operator(): opt = ExecutionOptions() inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) @@ -304,163 +288,7 @@ def test_select_ops_ensure_at_least_one_live_operator(ray_start_10_cpus_shared): ) -def test_pipelined_execution(ray_start_10_cpus_shared): - executor = StreamingExecutor(ExecutionOptions()) - inputs = make_ref_bundles([[x] for x in range(20)]) - o1 = InputDataBuffer(inputs) - o2 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) - o3 = MapOperator.create(make_transform(lambda block: [b * 2 for b in block]), o2) - - def reverse_sort(inputs: List[RefBundle]): - reversed_list = inputs[::-1] - return reversed_list, {} - - o4 = AllToAllOperator(reverse_sort, o3) - it = executor.execute(o4) - output = ref_bundles_to_list(it) - expected = [[x * -2] for x in range(20)][::-1] - assert output == expected, (output, expected) - - -def test_e2e_option_propagation(ray_start_10_cpus_shared): - DatasetContext.get_current().new_execution_backend = True - DatasetContext.get_current().use_streaming_executor = True - - def run(): - ray.data.range(5, parallelism=5).map( - lambda x: x, compute=ray.data.ActorPoolStrategy(2, 2) - ).take_all() - - DatasetContext.get_current().execution_options.resource_limits = ( - ExecutionResources() - ) - run() - - try: - DatasetContext.get_current().execution_options.resource_limits.cpu = 1 - with pytest.raises(ValueError): - run() - finally: - DatasetContext.get_current().execution_options.resource_limits.cpu = None - - -def test_configure_spread_e2e(ray_start_10_cpus_shared): - from ray import remote_function - - tasks = [] - - def _test_hook(fn, args, strategy): - if "map_task" in str(fn): - tasks.append(strategy) - - remote_function._task_launch_hook = _test_hook - DatasetContext.get_current().use_streaming_executor = True - DatasetContext.get_current().execution_options.preserve_order = True - - # Simple 2-stage pipeline. - ray.data.range(2, parallelism=2).map(lambda x: x, num_cpus=2).take_all() - - # Read tasks get SPREAD by default, subsequent ones use default policy. - tasks = sorted(tasks) - assert tasks == ["DEFAULT", "DEFAULT", "SPREAD", "SPREAD"] - - -def test_scheduling_progress_when_output_blocked(): - # Processing stages should fully finish even if output is completely stalled. - - @ray.remote - class Counter: - def __init__(self): - self.i = 0 - - def inc(self): - self.i += 1 - - def get(self): - return self.i - - counter = Counter.remote() - - def func(x): - ray.get(counter.inc.remote()) - return x - - DatasetContext.get_current().use_streaming_executor = True - DatasetContext.get_current().execution_options.preserve_order = True - - # Only take the first item from the iterator. - it = iter( - ray.data.range(100, parallelism=100) - .map_batches(func, batch_size=None) - .iter_batches(batch_size=None) - ) - next(it) - # The pipeline should fully execute even when the output iterator is blocked. - wait_for_condition(lambda: ray.get(counter.get.remote()) == 100) - # Check we can take the rest. - assert list(it) == [[x] for x in range(1, 100)] - - -def test_backpressure_from_output(): - # Here we set the memory limit low enough so the output getting blocked will - # actually stall execution. - - @ray.remote - class Counter: - def __init__(self): - self.i = 0 - - def inc(self): - self.i += 1 - - def get(self): - return self.i - - counter = Counter.remote() - - def func(x): - ray.get(counter.inc.remote()) - return x - - ctx = DatasetContext.get_current() - try: - ctx.use_streaming_executor = True - ctx.execution_options.resource_limits.object_store_memory = 10000 - - # Only take the first item from the iterator. - it = iter( - ray.data.range(100000, parallelism=100) - .map_batches(func, batch_size=None) - .iter_batches(batch_size=None) - ) - next(it) - num_finished = ray.get(counter.get.remote()) - assert num_finished < 5, num_finished - - # Check we can get the rest. - for rest in it: - pass - assert ray.get(counter.get.remote()) == 100 - finally: - ctx.execution_options.resource_limits.object_store_memory = None - - -def test_e2e_liveness_with_output_backpressure_edge_case(): - # At least one operator is ensured to be running, if the output becomes idle. - ctx = DatasetContext.get_current() - ctx.use_streaming_executor = True - ctx.execution_options.preserve_order = True - try: - ctx.execution_options.resource_limits.object_store_memory = 1 - ds = ray.data.range(10000, parallelism=100).map(lambda x: x, num_cpus=2) - # This will hang forever if the liveness logic is wrong, since the output - # backpressure will prevent any operators from running at all. - assert ds.take_all() == list(range(10000)) - finally: - ctx.execution_options.resource_limits.object_store_memory = None - - -def test_configure_output_locality(ray_start_10_cpus_shared): +def test_configure_output_locality(): inputs = make_ref_bundles([[x] for x in range(20)]) o1 = InputDataBuffer(inputs) o2 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) @@ -482,98 +310,6 @@ def test_configure_output_locality(ray_start_10_cpus_shared): ) -def test_e2e_autoscaling_up(ray_start_10_cpus_shared): - DatasetContext.get_current().new_execution_backend = True - DatasetContext.get_current().use_streaming_executor = True - - @ray.remote(max_concurrency=10) - class Barrier: - def __init__(self, n, delay=0): - self.n = n - self.delay = delay - self.max_waiters = 0 - self.cur_waiters = 0 - - def wait(self): - self.cur_waiters += 1 - if self.cur_waiters > self.max_waiters: - self.max_waiters = self.cur_waiters - self.n -= 1 - print("wait", self.n) - while self.n > 0: - time.sleep(0.1) - time.sleep(self.delay) - print("wait done") - self.cur_waiters -= 1 - - def get_max_waiters(self): - return self.max_waiters - - b1 = Barrier.remote(6) - - def barrier1(x): - ray.get(b1.wait.remote(), timeout=10) - return x - - # Tests that we autoscale up to necessary size. - # 6 tasks + 1 tasks in flight per actor => need at least 6 actors to run. - ray.data.range(6, parallelism=6).map_batches( - barrier1, - compute=ray.data.ActorPoolStrategy(1, 6, max_tasks_in_flight_per_actor=1), - batch_size=None, - ).take_all() - assert ray.get(b1.get_max_waiters.remote()) == 6 - - b2 = Barrier.remote(3, delay=2) - - def barrier2(x): - ray.get(b2.wait.remote(), timeout=10) - return x - - # Tests that we don't over-scale up. - # 6 tasks + 2 tasks in flight per actor => only scale up to 3 actors - ray.data.range(6, parallelism=6).map_batches( - barrier2, - compute=ray.data.ActorPoolStrategy(1, 3, max_tasks_in_flight_per_actor=2), - batch_size=None, - ).take_all() - assert ray.get(b2.get_max_waiters.remote()) == 3 - - # Tests that the max pool size is respected. - b3 = Barrier.remote(6) - - def barrier3(x): - ray.get(b3.wait.remote(), timeout=2) - return x - - # This will hang, since the actor pool is too small. - with pytest.raises(ray.exceptions.RayTaskError): - ray.data.range(6, parallelism=6).map( - barrier3, compute=ray.data.ActorPoolStrategy(1, 2) - ).take_all() - - -def test_e2e_autoscaling_down(ray_start_10_cpus_shared): - DatasetContext.get_current().new_execution_backend = True - DatasetContext.get_current().use_streaming_executor = True - - def f(x): - time.sleep(1) - return x - - # Tests that autoscaling works even when resource constrained via actor killing. - # To pass this, we need to autoscale down to free up slots for task execution. - try: - DatasetContext.get_current().execution_options.resource_limits.cpu = 2 - ray.data.range(5, parallelism=5).map_batches( - f, - compute=ray.data.ActorPoolStrategy(1, 2), - batch_size=None, - ).map_batches(lambda x: x, batch_size=None, num_cpus=2).take_all() - finally: - DatasetContext.get_current().execution_options.resource_limits.cpu = None - - if __name__ == "__main__": import sys diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py new file mode 100644 index 000000000000..91c27c8816bf --- /dev/null +++ b/python/ray/data/tests/test_streaming_integration.py @@ -0,0 +1,291 @@ +import pytest +import time + +from typing import List, Any + +import ray +from ray.data.context import DatasetContext +from ray.data._internal.execution.interfaces import ( + ExecutionOptions, + ExecutionResources, + RefBundle, +) +from ray.data._internal.execution.streaming_executor import ( + StreamingExecutor, +) +from ray.data._internal.execution.operators.all_to_all_operator import AllToAllOperator +from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer +from ray.data._internal.execution.util import make_ref_bundles +from ray._private.test_utils import wait_for_condition +from ray.data.tests.conftest import * # noqa + + +def make_transform(block_fn): + def map_fn(block_iter): + for block in block_iter: + yield block_fn(block) + + return map_fn + + +def ref_bundles_to_list(bundles: List[RefBundle]) -> List[List[Any]]: + output = [] + for bundle in bundles: + for block, _ in bundle.blocks: + output.append(ray.get(block)) + return output + + +def test_pipelined_execution(ray_start_10_cpus_shared): + executor = StreamingExecutor(ExecutionOptions()) + inputs = make_ref_bundles([[x] for x in range(20)]) + o1 = InputDataBuffer(inputs) + o2 = MapOperator.create(make_transform(lambda block: [b * -1 for b in block]), o1) + o3 = MapOperator.create(make_transform(lambda block: [b * 2 for b in block]), o2) + + def reverse_sort(inputs: List[RefBundle]): + reversed_list = inputs[::-1] + return reversed_list, {} + + o4 = AllToAllOperator(reverse_sort, o3) + it = executor.execute(o4) + output = ref_bundles_to_list(it) + expected = [[x * -2] for x in range(20)][::-1] + assert output == expected, (output, expected) + + +def test_e2e_option_propagation(ray_start_10_cpus_shared): + DatasetContext.get_current().new_execution_backend = True + DatasetContext.get_current().use_streaming_executor = True + + def run(): + ray.data.range(5, parallelism=5).map( + lambda x: x, compute=ray.data.ActorPoolStrategy(2, 2) + ).take_all() + + DatasetContext.get_current().execution_options.resource_limits = ( + ExecutionResources() + ) + run() + + try: + DatasetContext.get_current().execution_options.resource_limits.cpu = 1 + with pytest.raises(ValueError): + run() + finally: + DatasetContext.get_current().execution_options.resource_limits.cpu = None + + +def test_configure_spread_e2e(ray_start_10_cpus_shared): + from ray import remote_function + + tasks = [] + + def _test_hook(fn, args, strategy): + if "map_task" in str(fn): + tasks.append(strategy) + + remote_function._task_launch_hook = _test_hook + DatasetContext.get_current().use_streaming_executor = True + DatasetContext.get_current().execution_options.preserve_order = True + + # Simple 2-stage pipeline. + ray.data.range(2, parallelism=2).map(lambda x: x, num_cpus=2).take_all() + + # Read tasks get SPREAD by default, subsequent ones use default policy. + tasks = sorted(tasks) + assert tasks == ["DEFAULT", "DEFAULT", "SPREAD", "SPREAD"] + + +def test_scheduling_progress_when_output_blocked(): + # Processing stages should fully finish even if output is completely stalled. + + @ray.remote + class Counter: + def __init__(self): + self.i = 0 + + def inc(self): + self.i += 1 + + def get(self): + return self.i + + counter = Counter.remote() + + def func(x): + ray.get(counter.inc.remote()) + return x + + DatasetContext.get_current().use_streaming_executor = True + DatasetContext.get_current().execution_options.preserve_order = True + + # Only take the first item from the iterator. + it = iter( + ray.data.range(100, parallelism=100) + .map_batches(func, batch_size=None) + .iter_batches(batch_size=None) + ) + next(it) + # The pipeline should fully execute even when the output iterator is blocked. + wait_for_condition(lambda: ray.get(counter.get.remote()) == 100) + # Check we can take the rest. + assert list(it) == [[x] for x in range(1, 100)] + + +def test_backpressure_from_output(): + # Here we set the memory limit low enough so the output getting blocked will + # actually stall execution. + + @ray.remote + class Counter: + def __init__(self): + self.i = 0 + + def inc(self): + self.i += 1 + + def get(self): + return self.i + + counter = Counter.remote() + + def func(x): + ray.get(counter.inc.remote()) + return x + + ctx = DatasetContext.get_current() + try: + ctx.use_streaming_executor = True + ctx.execution_options.resource_limits.object_store_memory = 10000 + + # Only take the first item from the iterator. + it = iter( + ray.data.range(100000, parallelism=100) + .map_batches(func, batch_size=None) + .iter_batches(batch_size=None) + ) + next(it) + num_finished = ray.get(counter.get.remote()) + assert num_finished < 5, num_finished + + # Check we can get the rest. + for rest in it: + pass + assert ray.get(counter.get.remote()) == 100 + finally: + ctx.execution_options.resource_limits.object_store_memory = None + + +def test_e2e_liveness_with_output_backpressure_edge_case(): + # At least one operator is ensured to be running, if the output becomes idle. + ctx = DatasetContext.get_current() + ctx.use_streaming_executor = True + ctx.execution_options.preserve_order = True + try: + ctx.execution_options.resource_limits.object_store_memory = 1 + ds = ray.data.range(10000, parallelism=100).map(lambda x: x, num_cpus=2) + # This will hang forever if the liveness logic is wrong, since the output + # backpressure will prevent any operators from running at all. + assert ds.take_all() == list(range(10000)) + finally: + ctx.execution_options.resource_limits.object_store_memory = None + + +def test_e2e_autoscaling_up(ray_start_10_cpus_shared): + DatasetContext.get_current().new_execution_backend = True + DatasetContext.get_current().use_streaming_executor = True + + @ray.remote(max_concurrency=10) + class Barrier: + def __init__(self, n, delay=0): + self.n = n + self.delay = delay + self.max_waiters = 0 + self.cur_waiters = 0 + + def wait(self): + self.cur_waiters += 1 + if self.cur_waiters > self.max_waiters: + self.max_waiters = self.cur_waiters + self.n -= 1 + print("wait", self.n) + while self.n > 0: + time.sleep(0.1) + time.sleep(self.delay) + print("wait done") + self.cur_waiters -= 1 + + def get_max_waiters(self): + return self.max_waiters + + b1 = Barrier.remote(6) + + def barrier1(x): + ray.get(b1.wait.remote(), timeout=10) + return x + + # Tests that we autoscale up to necessary size. + # 6 tasks + 1 tasks in flight per actor => need at least 6 actors to run. + ray.data.range(6, parallelism=6).map_batches( + barrier1, + compute=ray.data.ActorPoolStrategy(1, 6, max_tasks_in_flight_per_actor=1), + batch_size=None, + ).take_all() + assert ray.get(b1.get_max_waiters.remote()) == 6 + + b2 = Barrier.remote(3, delay=2) + + def barrier2(x): + ray.get(b2.wait.remote(), timeout=10) + return x + + # Tests that we don't over-scale up. + # 6 tasks + 2 tasks in flight per actor => only scale up to 3 actors + ray.data.range(6, parallelism=6).map_batches( + barrier2, + compute=ray.data.ActorPoolStrategy(1, 3, max_tasks_in_flight_per_actor=2), + batch_size=None, + ).take_all() + assert ray.get(b2.get_max_waiters.remote()) == 3 + + # Tests that the max pool size is respected. + b3 = Barrier.remote(6) + + def barrier3(x): + ray.get(b3.wait.remote(), timeout=2) + return x + + # This will hang, since the actor pool is too small. + with pytest.raises(ray.exceptions.RayTaskError): + ray.data.range(6, parallelism=6).map( + barrier3, compute=ray.data.ActorPoolStrategy(1, 2) + ).take_all() + + +def test_e2e_autoscaling_down(ray_start_10_cpus_shared): + DatasetContext.get_current().new_execution_backend = True + DatasetContext.get_current().use_streaming_executor = True + + def f(x): + time.sleep(1) + return x + + # Tests that autoscaling works even when resource constrained via actor killing. + # To pass this, we need to autoscale down to free up slots for task execution. + try: + DatasetContext.get_current().execution_options.resource_limits.cpu = 2 + ray.data.range(5, parallelism=5).map_batches( + f, + compute=ray.data.ActorPoolStrategy(1, 2), + batch_size=None, + ).map_batches(lambda x: x, batch_size=None, num_cpus=2).take_all() + finally: + DatasetContext.get_current().execution_options.resource_limits.cpu = None + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) From 6f80d4defcae066d58860264dba49d6f17a7b6fa Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 27 Jan 2023 23:15:02 -0800 Subject: [PATCH 6/9] rename for clarity Signed-off-by: Eric Liang --- .../execution/streaming_executor_state.py | 20 ++++++++----------- .../ray/data/tests/test_streaming_executor.py | 10 +++++----- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index e8f0bf1a8b9c..4b6e6e739242 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -60,9 +60,9 @@ def num_queued(self) -> int: """Return the number of queued bundles across all inqueues.""" return sum(len(q) for q in self.inqueues) - def num_active_tasks(self): - """Return the number of Ray futures pending for this operator.""" - return self.op.num_active_work_refs() + def num_processing(self): + """Return the number of bundles currently in processing for this operator.""" + return self.op.num_active_work_refs() + self.op.internal_queue_size() def add_output(self, ref: RefBundle) -> None: """Move a bundle produced by the operator to its outqueue.""" @@ -78,7 +78,8 @@ def refresh_progress_bar(self) -> None: def summary_str(self) -> str: queued = self.num_queued() + self.op.internal_queue_size() - desc = f"{self.op.name}: {self.num_active_tasks()} active, {queued} queued" + active = self.op.num_active_work_refs() + desc = f"{self.op.name}: {active} active, {queued} queued" suffix = self.op.progress_str() if suffix: desc += f", {suffix}" @@ -209,7 +210,7 @@ def select_operator_to_run( This is currently implemented by applying backpressure on operators that are producing outputs faster than they are consuming them `len(outqueue)`, as well as - operators with a large number of running tasks `num_active_tasks()`. + operators with a large number of running tasks `num_processing()`. Note that memory limits also apply to the outqueue of the output operator. This provides backpressure if the consumer is slow. However, once a bundle is returned @@ -237,14 +238,9 @@ def select_operator_to_run( if not ops: return None - # Equally penalize outqueue length and active tasks for backpressure. Note that - # for actor pool ops, we must also include the internal queue size, since - # num_active_tasks() does not reflect tasks pending submission to the actor pool. + # Equally penalize outqueue length and num bundles processing for backpressure. return min( - ops, - key=lambda op: len(topology[op].outqueue) - + topology[op].num_active_tasks() - + op.internal_queue_size(), + ops, key=lambda op: len(topology[op].outqueue) + topology[op].num_processing() ) diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 8ca97507f001..02ecefeed691 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -127,26 +127,26 @@ def test_select_operator_to_run(): ) # Test backpressure includes num active tasks as well. - topo[o3].num_active_tasks = MagicMock(return_value=2) + o3.num_active_work_refs = MagicMock(return_value=2) o3.internal_queue_size = MagicMock(return_value=0) assert ( select_operator_to_run(topo, ExecutionResources(), ExecutionResources(), True) == o2 ) - # Internal queue size is added to num active tasks. - topo[o3].num_active_tasks = MagicMock(return_value=0) + # nternal queue size is added to num active tasks. + o3.num_active_work_refs = MagicMock(return_value=0) o3.internal_queue_size = MagicMock(return_value=2) assert ( select_operator_to_run(topo, ExecutionResources(), ExecutionResources(), True) == o2 ) - topo[o2].num_active_tasks = MagicMock(return_value=2) + o2.num_active_work_refs = MagicMock(return_value=2) o2.internal_queue_size = MagicMock(return_value=0) assert ( select_operator_to_run(topo, ExecutionResources(), ExecutionResources(), True) == o3 ) - topo[o2].num_active_tasks = MagicMock(return_value=0) + o2.num_active_work_refs = MagicMock(return_value=0) o2.internal_queue_size = MagicMock(return_value=2) assert ( select_operator_to_run(topo, ExecutionResources(), ExecutionResources(), True) From 4b0006ccadde7dbb5312455e3ff5947a777512a4 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 30 Jan 2023 11:50:40 -0800 Subject: [PATCH 7/9] wip Signed-off-by: Eric Liang --- python/ray/data/tests/test_streaming_integration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py index 91c27c8816bf..a79884960141 100644 --- a/python/ray/data/tests/test_streaming_integration.py +++ b/python/ray/data/tests/test_streaming_integration.py @@ -98,7 +98,7 @@ def _test_hook(fn, args, strategy): assert tasks == ["DEFAULT", "DEFAULT", "SPREAD", "SPREAD"] -def test_scheduling_progress_when_output_blocked(): +def test_scheduling_progress_when_output_blocked(ray_start_10_cpus_shared): # Processing stages should fully finish even if output is completely stalled. @ray.remote @@ -134,7 +134,7 @@ def func(x): assert list(it) == [[x] for x in range(1, 100)] -def test_backpressure_from_output(): +def test_backpressure_from_output(ray_start_10_cpus_shared): # Here we set the memory limit low enough so the output getting blocked will # actually stall execution. @@ -178,7 +178,7 @@ def func(x): ctx.execution_options.resource_limits.object_store_memory = None -def test_e2e_liveness_with_output_backpressure_edge_case(): +def test_e2e_liveness_with_output_backpressure_edge_case(ray_start_10_cpus_shared): # At least one operator is ensured to be running, if the output becomes idle. ctx = DatasetContext.get_current() ctx.use_streaming_executor = True From 1899c639486224832f0cd31fc3c2f1c8b8b0a13a Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 30 Jan 2023 12:11:56 -0800 Subject: [PATCH 8/9] context manager Signed-off-by: Eric Liang --- python/ray/data/tests/conftest.py | 9 ++ .../data/tests/test_streaming_integration.py | 90 +++++++++---------- 2 files changed, 49 insertions(+), 50 deletions(-) diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py index abc66d8ac51e..de089750c052 100644 --- a/python/ray/data/tests/conftest.py +++ b/python/ray/data/tests/conftest.py @@ -1,3 +1,4 @@ +import copy import os import posixpath @@ -272,6 +273,14 @@ def _assert_base_partitioned_ds( yield _assert_base_partitioned_ds +@pytest.fixture +def restore_dataset_context(request): + """Restore any DatasetContext changes after the test runs""" + original = copy.deepcopy(ray.data.context.DatasetContext.get_current()) + yield + ray.data.context.DatasetContext._set_current(original) + + @pytest.fixture(params=[True, False]) def use_push_based_shuffle(request): ctx = ray.data.context.DatasetContext.get_current() diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py index a79884960141..a8decfc849dd 100644 --- a/python/ray/data/tests/test_streaming_integration.py +++ b/python/ray/data/tests/test_streaming_integration.py @@ -55,7 +55,7 @@ def reverse_sort(inputs: List[RefBundle]): assert output == expected, (output, expected) -def test_e2e_option_propagation(ray_start_10_cpus_shared): +def test_e2e_option_propagation(ray_start_10_cpus_shared, restore_dataset_context): DatasetContext.get_current().new_execution_backend = True DatasetContext.get_current().use_streaming_executor = True @@ -69,12 +69,9 @@ def run(): ) run() - try: - DatasetContext.get_current().execution_options.resource_limits.cpu = 1 - with pytest.raises(ValueError): - run() - finally: - DatasetContext.get_current().execution_options.resource_limits.cpu = None + DatasetContext.get_current().execution_options.resource_limits.cpu = 1 + with pytest.raises(ValueError): + run() def test_configure_spread_e2e(ray_start_10_cpus_shared): @@ -134,7 +131,7 @@ def func(x): assert list(it) == [[x] for x in range(1, 100)] -def test_backpressure_from_output(ray_start_10_cpus_shared): +def test_backpressure_from_output(ray_start_10_cpus_shared, restore_dataset_context): # Here we set the memory limit low enough so the output getting blocked will # actually stall execution. @@ -156,44 +153,40 @@ def func(x): return x ctx = DatasetContext.get_current() - try: - ctx.use_streaming_executor = True - ctx.execution_options.resource_limits.object_store_memory = 10000 - - # Only take the first item from the iterator. - it = iter( - ray.data.range(100000, parallelism=100) - .map_batches(func, batch_size=None) - .iter_batches(batch_size=None) - ) - next(it) - num_finished = ray.get(counter.get.remote()) - assert num_finished < 5, num_finished - - # Check we can get the rest. - for rest in it: - pass - assert ray.get(counter.get.remote()) == 100 - finally: - ctx.execution_options.resource_limits.object_store_memory = None - - -def test_e2e_liveness_with_output_backpressure_edge_case(ray_start_10_cpus_shared): + ctx.use_streaming_executor = True + ctx.execution_options.resource_limits.object_store_memory = 10000 + + # Only take the first item from the iterator. + it = iter( + ray.data.range(100000, parallelism=100) + .map_batches(func, batch_size=None) + .iter_batches(batch_size=None) + ) + next(it) + num_finished = ray.get(counter.get.remote()) + assert num_finished < 5, num_finished + + # Check we can get the rest. + for rest in it: + pass + assert ray.get(counter.get.remote()) == 100 + + +def test_e2e_liveness_with_output_backpressure_edge_case( + ray_start_10_cpus_shared, restore_dataset_context +): # At least one operator is ensured to be running, if the output becomes idle. ctx = DatasetContext.get_current() ctx.use_streaming_executor = True ctx.execution_options.preserve_order = True - try: - ctx.execution_options.resource_limits.object_store_memory = 1 - ds = ray.data.range(10000, parallelism=100).map(lambda x: x, num_cpus=2) - # This will hang forever if the liveness logic is wrong, since the output - # backpressure will prevent any operators from running at all. - assert ds.take_all() == list(range(10000)) - finally: - ctx.execution_options.resource_limits.object_store_memory = None + ctx.execution_options.resource_limits.object_store_memory = 1 + ds = ray.data.range(10000, parallelism=100).map(lambda x: x, num_cpus=2) + # This will hang forever if the liveness logic is wrong, since the output + # backpressure will prevent any operators from running at all. + assert ds.take_all() == list(range(10000)) -def test_e2e_autoscaling_up(ray_start_10_cpus_shared): +def test_e2e_autoscaling_up(ray_start_10_cpus_shared, restore_dataset_context): DatasetContext.get_current().new_execution_backend = True DatasetContext.get_current().use_streaming_executor = True @@ -264,7 +257,7 @@ def barrier3(x): ).take_all() -def test_e2e_autoscaling_down(ray_start_10_cpus_shared): +def test_e2e_autoscaling_down(ray_start_10_cpus_shared, restore_dataset_context): DatasetContext.get_current().new_execution_backend = True DatasetContext.get_current().use_streaming_executor = True @@ -274,15 +267,12 @@ def f(x): # Tests that autoscaling works even when resource constrained via actor killing. # To pass this, we need to autoscale down to free up slots for task execution. - try: - DatasetContext.get_current().execution_options.resource_limits.cpu = 2 - ray.data.range(5, parallelism=5).map_batches( - f, - compute=ray.data.ActorPoolStrategy(1, 2), - batch_size=None, - ).map_batches(lambda x: x, batch_size=None, num_cpus=2).take_all() - finally: - DatasetContext.get_current().execution_options.resource_limits.cpu = None + DatasetContext.get_current().execution_options.resource_limits.cpu = 2 + ray.data.range(5, parallelism=5).map_batches( + f, + compute=ray.data.ActorPoolStrategy(1, 2), + batch_size=None, + ).map_batches(lambda x: x, batch_size=None, num_cpus=2).take_all() if __name__ == "__main__": From 03a5e210f3597a93d717400348d20c80602528fe Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 30 Jan 2023 12:26:25 -0800 Subject: [PATCH 9/9] fix test Signed-off-by: Eric Liang --- python/ray/data/tests/test_streaming_integration.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py index a8decfc849dd..3512cbe68dfd 100644 --- a/python/ray/data/tests/test_streaming_integration.py +++ b/python/ray/data/tests/test_streaming_integration.py @@ -74,7 +74,7 @@ def run(): run() -def test_configure_spread_e2e(ray_start_10_cpus_shared): +def test_configure_spread_e2e(ray_start_10_cpus_shared, restore_dataset_context): from ray import remote_function tasks = [] @@ -95,7 +95,9 @@ def _test_hook(fn, args, strategy): assert tasks == ["DEFAULT", "DEFAULT", "SPREAD", "SPREAD"] -def test_scheduling_progress_when_output_blocked(ray_start_10_cpus_shared): +def test_scheduling_progress_when_output_blocked( + ray_start_10_cpus_shared, restore_dataset_context +): # Processing stages should fully finish even if output is completely stalled. @ray.remote