Skip to content

Commit

Permalink
[data] [streaming] Fixes to autoscaling actor pool streaming op (ray-…
Browse files Browse the repository at this point in the history
…project#32023)

Fixes:
- Properly wire max tasks per actor to pool
- Account for internal queue size in scheduling algorithm
- Small improvements to progress bar UX
  • Loading branch information
ericl authored and clarence-wu committed Jan 31, 2023
1 parent a7de8fa commit 68f2ef8
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 199 deletions.
7 changes: 7 additions & 0 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ def num_active_work_refs(self) -> int:
"""
return len(self.get_work_refs())

def internal_queue_size(self) -> int:
"""If the operator has an internal input queue, return its size.
This is used to report tasks pending submission to actor pools.
"""
return 0

def notify_work_completed(self, work_ref: ray.ObjectRef) -> None:
"""Executor calls this when the given work is completed and local.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ def __init__(
ObjectRef[ObjectRefGenerator], Tuple[_TaskState, ray.actor.ActorHandle]
] = {}
# A pool of running actors on which we can execute mapper tasks.
self._actor_pool = _ActorPool()
self._actor_pool = _ActorPool(autoscaling_policy._config.max_tasks_in_flight)
# A queue of bundles awaiting dispatch to actors.
self._bundle_queue = collections.deque()
# Cached actor class.
self._cls = None
# Whether no more submittable bundles will be added.
self._inputs_done = False

def internal_queue_size(self) -> int:
return len(self._bundle_queue)

def start(self, options: ExecutionOptions):
super().start(options)

Expand Down Expand Up @@ -197,10 +200,11 @@ def num_active_work_refs(self) -> int:
return len(self._tasks)

def progress_str(self) -> str:
return (
f"{self._actor_pool.num_running_actors()} "
f"({self._actor_pool.num_pending_actors()} pending)"
)
base = f"{self._actor_pool.num_running_actors()} actors"
pending = self._actor_pool.num_pending_actors()
if pending:
base += f" ({pending} pending)"
return base

def base_resource_usage(self) -> ExecutionResources:
min_workers = self._autoscaling_policy.min_workers
Expand Down
17 changes: 9 additions & 8 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ def num_queued(self) -> int:
"""Return the number of queued bundles across all inqueues."""
return sum(len(q) for q in self.inqueues)

def num_active_tasks(self):
"""Return the number of Ray futures pending for this operator."""
return self.op.num_active_work_refs()
def num_processing(self):
"""Return the number of bundles currently in processing for this operator."""
return self.op.num_active_work_refs() + self.op.internal_queue_size()

def add_output(self, ref: RefBundle) -> None:
"""Move a bundle produced by the operator to its outqueue."""
Expand All @@ -77,8 +77,9 @@ def refresh_progress_bar(self) -> None:
self.progress_bar.set_description(self.summary_str())

def summary_str(self) -> str:
queued = self.num_queued()
desc = f"{self.op.name}: {self.num_active_tasks()} active, {queued} queued"
queued = self.num_queued() + self.op.internal_queue_size()
active = self.op.num_active_work_refs()
desc = f"{self.op.name}: {active} active, {queued} queued"
suffix = self.op.progress_str()
if suffix:
desc += f", {suffix}"
Expand Down Expand Up @@ -209,7 +210,7 @@ def select_operator_to_run(
This is currently implemented by applying backpressure on operators that are
producing outputs faster than they are consuming them `len(outqueue)`, as well as
operators with a large number of running tasks `num_active_tasks()`.
operators with a large number of running tasks `num_processing()`.
Note that memory limits also apply to the outqueue of the output operator. This
provides backpressure if the consumer is slow. However, once a bundle is returned
Expand Down Expand Up @@ -237,9 +238,9 @@ def select_operator_to_run(
if not ops:
return None

# Equally penalize outqueue length and active tasks for backpressure.
# Equally penalize outqueue length and num bundles processing for backpressure.
return min(
ops, key=lambda op: len(topology[op].outqueue) + topology[op].num_active_tasks()
ops, key=lambda op: len(topology[op].outqueue) + topology[op].num_processing()
)


Expand Down
9 changes: 9 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import os
import posixpath

Expand Down Expand Up @@ -272,6 +273,14 @@ def _assert_base_partitioned_ds(
yield _assert_base_partitioned_ds


@pytest.fixture
def restore_dataset_context(request):
"""Restore any DatasetContext changes after the test runs"""
original = copy.deepcopy(ray.data.context.DatasetContext.get_current())
yield
ray.data.context.DatasetContext._set_current(original)


@pytest.fixture(params=[True, False])
def use_push_based_shuffle(request):
ctx = ray.data.context.DatasetContext.get_current()
Expand Down
22 changes: 17 additions & 5 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ def dummy_all_transform(bundles: List[RefBundle]):
def test_map_operator_bulk(ray_start_regular_shared, use_actors):
# Create with inputs.
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))
compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
compute_strategy = (
ActorPoolStrategy(max_size=1) if use_actors else TaskPoolStrategy()
)
op = MapOperator.create(
_mul2_transform,
input_op=input_op,
Expand All @@ -113,9 +115,16 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors):
op.start(ExecutionOptions())
if use_actors:
# Actor will be pending after starting the operator.
assert op.progress_str() == "0 (1 pending)"
assert op.progress_str() == "0 actors (1 pending)"
assert op.internal_queue_size() == 0
i = 0
while input_op.has_next():
op.add_input(input_op.get_next(), 0)
i += 1
if use_actors:
assert op.internal_queue_size() == i
else:
assert op.internal_queue_size() == 0
op.inputs_done()
work_refs = op.get_work_refs()
while work_refs:
Expand All @@ -126,10 +135,11 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors):
if use_actors and work_refs:
# After actor is ready (first work ref resolved), actor will remain ready
# while there is work to do.
assert op.progress_str() == "1 (0 pending)"
assert op.progress_str() == "1 actors"
assert op.internal_queue_size() == 0
if use_actors:
# After all work is done, actor will have been killed to free up resources..
assert op.progress_str() == "0 (0 pending)"
assert op.progress_str() == "0 actors"
else:
assert op.progress_str() == ""

Expand Down Expand Up @@ -227,7 +237,9 @@ def test_map_operator_ray_args(shutdown_only, use_actors):
ray.init(num_cpus=0, num_gpus=1)
# Create with inputs.
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(10)]))
compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
compute_strategy = (
ActorPoolStrategy(max_size=1) if use_actors else TaskPoolStrategy()
)
op = MapOperator.create(
_mul2_transform,
input_op=input_op,
Expand Down
Loading

0 comments on commit 68f2ef8

Please sign in to comment.