Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] [streaming] Fixes to autoscaling actor pool streaming op #32023

Merged
merged 9 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions python/ray/data/_internal/execution/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ def num_active_work_refs(self) -> int:
"""
return len(self.get_work_refs())

def internal_queue_size(self) -> int:
"""If the operator has an internal input queue, return its size.

This is used to report tasks pending submission to actor pools.
"""
return 0

def notify_work_completed(self, work_ref: ray.ObjectRef) -> None:
"""Executor calls this when the given work is completed and local.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ def __init__(
ObjectRef[ObjectRefGenerator], Tuple[_TaskState, ray.actor.ActorHandle]
] = {}
# A pool of running actors on which we can execute mapper tasks.
self._actor_pool = _ActorPool()
self._actor_pool = _ActorPool(autoscaling_policy._config.max_tasks_in_flight)
ericl marked this conversation as resolved.
Show resolved Hide resolved
# A queue of bundles awaiting dispatch to actors.
self._bundle_queue = collections.deque()
# Cached actor class.
self._cls = None
# Whether no more submittable bundles will be added.
self._inputs_done = False

def internal_queue_size(self) -> int:
return len(self._bundle_queue)

def start(self, options: ExecutionOptions):
super().start(options)

Expand Down Expand Up @@ -197,10 +200,11 @@ def num_active_work_refs(self) -> int:
return len(self._tasks)

def progress_str(self) -> str:
return (
f"{self._actor_pool.num_running_actors()} "
f"({self._actor_pool.num_pending_actors()} pending)"
)
base = f"{self._actor_pool.num_running_actors()} actors"
pending = self._actor_pool.num_pending_actors()
if pending:
base += f" ({pending} pending)"
return base

def base_resource_usage(self) -> ExecutionResources:
min_workers = self._autoscaling_policy.min_workers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ def num_queued(self) -> int:
"""Return the number of queued bundles across all inqueues."""
return sum(len(q) for q in self.inqueues)

def num_active_tasks(self):
"""Return the number of Ray futures pending for this operator."""
return self.op.num_active_work_refs()
def num_processing(self):
"""Return the number of bundles currently in processing for this operator."""
return self.op.num_active_work_refs() + self.op.internal_queue_size()

def add_output(self, ref: RefBundle) -> None:
"""Move a bundle produced by the operator to its outqueue."""
Expand All @@ -77,8 +77,9 @@ def refresh_progress_bar(self) -> None:
self.progress_bar.set_description(self.summary_str())

def summary_str(self) -> str:
queued = self.num_queued()
desc = f"{self.op.name}: {self.num_active_tasks()} active, {queued} queued"
queued = self.num_queued() + self.op.internal_queue_size()
active = self.op.num_active_work_refs()
desc = f"{self.op.name}: {active} active, {queued} queued"
suffix = self.op.progress_str()
if suffix:
desc += f", {suffix}"
Expand Down Expand Up @@ -209,7 +210,7 @@ def select_operator_to_run(

This is currently implemented by applying backpressure on operators that are
producing outputs faster than they are consuming them `len(outqueue)`, as well as
operators with a large number of running tasks `num_active_tasks()`.
operators with a large number of running tasks `num_processing()`.

Note that memory limits also apply to the outqueue of the output operator. This
provides backpressure if the consumer is slow. However, once a bundle is returned
Expand Down Expand Up @@ -237,9 +238,9 @@ def select_operator_to_run(
if not ops:
return None

# Equally penalize outqueue length and active tasks for backpressure.
# Equally penalize outqueue length and num bundles processing for backpressure.
return min(
ops, key=lambda op: len(topology[op].outqueue) + topology[op].num_active_tasks()
ops, key=lambda op: len(topology[op].outqueue) + topology[op].num_processing()
)


Expand Down
9 changes: 9 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import os
import posixpath

Expand Down Expand Up @@ -272,6 +273,14 @@ def _assert_base_partitioned_ds(
yield _assert_base_partitioned_ds


@pytest.fixture
def restore_dataset_context(request):
"""Restore any DatasetContext changes after the test runs"""
original = copy.deepcopy(ray.data.context.DatasetContext.get_current())
yield
ray.data.context.DatasetContext._set_current(original)


@pytest.fixture(params=[True, False])
def use_push_based_shuffle(request):
ctx = ray.data.context.DatasetContext.get_current()
Expand Down
22 changes: 17 additions & 5 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ def dummy_all_transform(bundles: List[RefBundle]):
def test_map_operator_bulk(ray_start_regular_shared, use_actors):
# Create with inputs.
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))
compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
compute_strategy = (
ActorPoolStrategy(max_size=1) if use_actors else TaskPoolStrategy()
)
op = MapOperator.create(
_mul2_transform,
input_op=input_op,
Expand All @@ -113,9 +115,16 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors):
op.start(ExecutionOptions())
if use_actors:
# Actor will be pending after starting the operator.
assert op.progress_str() == "0 (1 pending)"
assert op.progress_str() == "0 actors (1 pending)"
assert op.internal_queue_size() == 0
i = 0
while input_op.has_next():
op.add_input(input_op.get_next(), 0)
i += 1
if use_actors:
assert op.internal_queue_size() == i
else:
assert op.internal_queue_size() == 0
op.inputs_done()
work_refs = op.get_work_refs()
while work_refs:
Expand All @@ -126,10 +135,11 @@ def test_map_operator_bulk(ray_start_regular_shared, use_actors):
if use_actors and work_refs:
# After actor is ready (first work ref resolved), actor will remain ready
# while there is work to do.
assert op.progress_str() == "1 (0 pending)"
assert op.progress_str() == "1 actors"
assert op.internal_queue_size() == 0
if use_actors:
# After all work is done, actor will have been killed to free up resources..
assert op.progress_str() == "0 (0 pending)"
assert op.progress_str() == "0 actors"
else:
assert op.progress_str() == ""

Expand Down Expand Up @@ -227,7 +237,9 @@ def test_map_operator_ray_args(shutdown_only, use_actors):
ray.init(num_cpus=0, num_gpus=1)
# Create with inputs.
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(10)]))
compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
compute_strategy = (
ActorPoolStrategy(max_size=1) if use_actors else TaskPoolStrategy()
)
op = MapOperator.create(
_mul2_transform,
input_op=input_op,
Expand Down
Loading