Skip to content

Commit

Permalink
[data] MapOperator.num_active_tasks should exclude pending actors (#4…
Browse files Browse the repository at this point in the history
…6364)

`MapOperator.num_active_tasks` should exclude the pending actors.
Because
1. `PhysicalOperator.completed` checks `num_active_tasks`. The operator
should be considered completed if there are still pending actors.
2. The number of active tasks in the progress bar will be more accurate
to reflect the actual data processing tasks.

---------

Signed-off-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
raulchen authored Jul 2, 2024
1 parent 77660e9 commit acf792e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,25 @@ def _get_next_inner(self) -> RefBundle:
raise NotImplementedError

def get_active_tasks(self) -> List[OpTask]:
"""Get a list of the active tasks of this operator."""
"""Get a list of the active tasks of this operator.
Subclasses should return *all* running normal/actor tasks. The
StreamingExecutor will wait on these tasks and trigger callbacks.
"""
return []

def num_active_tasks(self) -> int:
"""Return the number of active tasks.
This method is used for 2 purposes:
* Determine if this operator is completed.
* Displaying active task info in the progress bar.
Thus, the return value can be less than `len(get_active_tasks())`,
if some tasks are not needed for the above purposes. E.g., for the
actor pool map operator, readiness checking tasks can be excluded
from `num_active_tasks`, but they should be included in
`get_active_tasks`.
Subclasses can override this as a performance optimization.
"""
return len(self.get_active_tasks())
Expand Down
11 changes: 11 additions & 0 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,17 @@ def implements_accurate_memory_accounting(self) -> bool:
def supports_fusion(self) -> bool:
return self._supports_fusion

def num_active_tasks(self) -> int:
# Override `num_active_tasks` to only include data tasks and exclude
# metadata tasks, which are used by the actor-pool map operator to
# check if a newly created actor is ready.
# The reasons are because:
# 1. `PhysicalOperator.completed` checks `num_active_tasks`. The operator
# should be considered completed if there are still pending actors.
# 2. The number of active tasks in the progress bar will be more accurate
# to reflect the actual data processing tasks.
return len(self._data_tasks)


def _map_task(
map_transformer: MapTransformer,
Expand Down
6 changes: 4 additions & 2 deletions python/ray/data/tests/test_executor_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co
assert op.metrics.obj_store_mem_pending_task_outputs == 0

# Wait for actors to start.
assert op.num_active_tasks() == 2
assert op.num_active_tasks() == 0
assert op._actor_pool.num_pending_actors() == 2
run_op_tasks_sync(op, only_existing=True)

# Actors have now started and the pool is actively running tasks.
Expand Down Expand Up @@ -411,7 +412,8 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
assert op.metrics.obj_store_mem_pending_task_outputs == 0

# Wait for actors to start.
assert op.num_active_tasks() == 2
assert op.num_active_tasks() == 0
assert op._actor_pool.num_pending_actors() == 2
run_op_tasks_sync(op, only_existing=True)

# Actors have now started and the pool is actively running tasks.
Expand Down
67 changes: 66 additions & 1 deletion python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from ray.data.block import Block, BlockAccessor
from ray.data.context import DataContext
from ray.data.tests.util import run_one_op_task, run_op_tasks_sync
from ray.tests.client_test_utils import create_remote_signal_actor
from ray.tests.conftest import * # noqa


Expand Down Expand Up @@ -443,7 +444,7 @@ def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]:

# Create with inputs.
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(10)]))
compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
compute_strategy = ActorPoolStrategy(size=1) if use_actors else TaskPoolStrategy()
op = MapOperator.create(
create_map_transformer_from_block_fn(_sleep),
input_op=input_op,
Expand All @@ -454,6 +455,9 @@ def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]:

# Start one task and then cancel.
op.start(ExecutionOptions())
if use_actors:
# Wait for the actor to start.
run_op_tasks_sync(op)
op.add_input(input_op.get_next(), 0)
assert op.num_active_tasks() == 1
op.shutdown()
Expand Down Expand Up @@ -517,6 +521,67 @@ def _sleep(block_iter: Iterable[Block]) -> Iterable[Block]:
assert not op.should_add_input()


def test_actor_pool_map_operator_num_active_tasks_and_completed(shutdown_only):
"""Tests ActorPoolMapOperator's num_active_tasks and completed methods."""
num_actors = 2
ray.shutdown()
ray.init(num_cpus=num_actors)

signal_actor = create_remote_signal_actor(ray).options(num_cpus=0).remote()

def _map_transfom_fn(block_iter: Iterable[Block], _) -> Iterable[Block]:
ray.get(signal_actor.wait.remote())
yield from block_iter

input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(num_actors)]))
compute_strategy = ActorPoolStrategy(min_size=num_actors, max_size=2 * num_actors)

# Create an operator with [num_actors, 2 * num_actors] actors.
# Resources are limited to num_actors, so the second half will be pending.
op = MapOperator.create(
create_map_transformer_from_block_fn(_map_transfom_fn),
input_op=input_op,
name="TestMapper",
compute_strategy=compute_strategy,
)
actor_pool = op._actor_pool

# Wait for the op to scale up to the min size.
op.start(ExecutionOptions())
run_op_tasks_sync(op)
assert actor_pool.num_running_actors() == num_actors
assert op.num_active_tasks() == 0

# Scale up to the max size, the second half of the actors will be pending.
actor_pool.scale_up(num_actors)
assert actor_pool.num_pending_actors() == num_actors
# `num_active_tasks` should exclude the metadata tasks for the pending actors.
assert op.num_active_tasks() == 0

# Add inputs.
for _ in range(num_actors):
assert op.should_add_input()
op.add_input(input_op.get_next(), 0)
# Still `num_active_tasks` should only include data tasks.
assert op.num_active_tasks() == num_actors
assert actor_pool.num_pending_actors() == num_actors

# Let the data tasks complete.
signal_actor.send.remote()
while len(op._data_tasks) > 0:
run_one_op_task(op)
assert op.num_active_tasks() == 0
assert actor_pool.num_pending_actors() == num_actors

# Mark the inputs done and take all outputs.
# The operator should be completed, even if there are pending actors.
op.all_inputs_done()
while op.has_next():
op.get_next()
assert actor_pool.num_pending_actors() == num_actors
assert op.completed()


@pytest.mark.parametrize(
"compute,expected",
[
Expand Down

0 comments on commit acf792e

Please sign in to comment.