Skip to content

Commit

Permalink
[Datasets] Track bundles object store utilization as soon as they're …
Browse files Browse the repository at this point in the history
…added to an operator (#32482)

This PR ensures that the object store utilization for a bundle is still tracked when it's queued internally by an operator, e.g. MapOperator queueing bundles for the sake of bundling up to a minimum bundle size, or due to workers not yet being ready for dispatch.
  • Loading branch information
clarkzinzow authored Feb 13, 2023
1 parent 6de3cbe commit 80f2161
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 8 deletions.
8 changes: 4 additions & 4 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ def start(self, options: "ExecutionOptions"):

def add_input(self, refs: RefBundle, input_index: int):
assert input_index == 0, input_index
# Add ref bundle allocation to operator's object store metrics.
self._metrics.cur += refs.size_bytes()
if self._metrics.cur > self._metrics.peak:
self._metrics.peak = self._metrics.cur
# Add RefBundle to the bundler.
self._block_ref_bundler.add_bundle(refs)
if self._block_ref_bundler.has_bundle():
Expand Down Expand Up @@ -181,10 +185,6 @@ def _handle_task_submitted(self, task: "_TaskState"):
"""
# Notify output queue that this task is pending.
self._output_queue.notify_pending_task(task)
# Update object store metrics.
self._metrics.cur += task.inputs.size_bytes()
if self._metrics.cur > self._metrics.peak:
self._metrics.peak = self._metrics.cur

@abstractmethod
def notify_work_completed(
Expand Down
134 changes: 130 additions & 4 deletions python/ray/data/tests/test_executor_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,41 @@ def test_task_pool_resource_reporting(ray_start_10_cpus_shared):
assert usage.object_store_memory == pytest.approx(128, abs=50), usage


def test_task_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))
op = MapOperator.create(
_mul2_transform,
input_op=input_op,
name="TestMapper",
compute_strategy=TaskPoolStrategy(),
min_rows_per_bundle=3,
)
assert op.current_resource_usage() == ExecutionResources(
cpu=0, gpu=0, object_store_memory=0
)
op.start(ExecutionOptions())
op.add_input(input_op.get_next(), 0)
usage = op.current_resource_usage()
# No tasks submitted yet due to bundling.
assert usage.cpu == 0, usage
assert usage.gpu == 0, usage
# Queued bundles (in bundler) still count against object storage usage.
assert usage.object_store_memory == pytest.approx(80, abs=10), usage
op.add_input(input_op.get_next(), 0)
usage = op.current_resource_usage()
# No tasks submitted yet due to bundling.
assert usage.cpu == 0, usage
assert usage.gpu == 0, usage
# Queued bundles (in bundler) still count against object storage usage.
assert usage.object_store_memory == pytest.approx(160, abs=10), usage
op.add_input(input_op.get_next(), 0)
usage = op.current_resource_usage()
# Task has now been submitted since we've met the minimum bundle size.
assert usage.cpu == 1, usage
assert usage.gpu == 0, usage
assert usage.object_store_memory == pytest.approx(240, abs=10), usage


def test_actor_pool_resource_reporting(ray_start_10_cpus_shared):
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))
op = MapOperator.create(
Expand All @@ -116,15 +151,22 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared):
)

# Add inputs.
for _ in range(4):
for i in range(4):
# Pool is still idle while waiting for actors to start, so additional tasks
# shouldn't trigger scale-up, so incremental resource usage should still be 0.
assert op.incremental_resource_usage() == ExecutionResources(cpu=0, gpu=0)
op.add_input(input_op.get_next(), 0)
usage = op.current_resource_usage()
assert usage.cpu == 2, usage
assert usage.gpu == 0, usage
# Queued bundles still count against object store usage.
assert usage.object_store_memory == pytest.approx((i + 1) * 80, abs=10), usage
# Pool is still idle while waiting for actors to start.
assert op.current_resource_usage() == ExecutionResources(
cpu=2, gpu=0, object_store_memory=0
)
usage = op.current_resource_usage()
assert usage.cpu == 2, usage
assert usage.gpu == 0, usage
# Queued bundles still count against object store usage.
assert usage.object_store_memory == pytest.approx(320, abs=10), usage

# Wait for actors to start.
work_refs = op.get_work_refs()
Expand Down Expand Up @@ -174,6 +216,90 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared):
assert usage.object_store_memory == 0, usage


def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))
op = MapOperator.create(
_mul2_transform,
input_op=input_op,
name="TestMapper",
compute_strategy=ActorPoolStrategy(2, 10),
min_rows_per_bundle=2,
)
op.start(ExecutionOptions())
assert op.base_resource_usage() == ExecutionResources(cpu=2, gpu=0)
# All actors are idle (pending creation), therefore shouldn't need to scale up when
# submitting a new task, so incremental resource usage should be 0.
assert op.incremental_resource_usage() == ExecutionResources(cpu=0, gpu=0)
# Actors are pending creation, but they still count against CPU utilization.
assert op.current_resource_usage() == ExecutionResources(
cpu=2, gpu=0, object_store_memory=0
)

# Add inputs.
for i in range(4):
# Pool is still idle while waiting for actors to start, so additional tasks
# shouldn't trigger scale-up, so incremental resource usage should still be 0.
assert op.incremental_resource_usage() == ExecutionResources(cpu=0, gpu=0)
op.add_input(input_op.get_next(), 0)
usage = op.current_resource_usage()
assert usage.cpu == 2, usage
assert usage.gpu == 0, usage
# Queued bundles still count against object store usage.
assert usage.object_store_memory == pytest.approx((i + 1) * 80, abs=10), usage
# Pool is still idle while waiting for actors to start.
usage = op.current_resource_usage()
assert usage.cpu == 2, usage
assert usage.gpu == 0, usage
# Queued bundles still count against object store usage.
assert usage.object_store_memory == pytest.approx(320, abs=10), usage

# Wait for actors to start.
work_refs = op.get_work_refs()
assert len(work_refs) == 2
for work_ref in work_refs:
ray.get(work_ref)
op.notify_work_completed(work_ref)

# Now that both actors have started, a new task would trigger scale-up, so
# incremental resource usage should be 1 CPU.
inc_usage = op.incremental_resource_usage()
assert inc_usage.cpu == 1, inc_usage
assert inc_usage.gpu == 0, inc_usage

# Actors have now started and the pool is actively running tasks.
usage = op.current_resource_usage()
assert usage.cpu == 2, usage
assert usage.gpu == 0, usage
assert usage.object_store_memory == pytest.approx(320, abs=10), usage

# Indicate that no more inputs will arrive.
op.inputs_done()

# Wait until tasks are done.
work_refs = op.get_work_refs()
while work_refs:
for work_ref in work_refs:
ray.get(work_ref)
op.notify_work_completed(work_ref)
work_refs = op.get_work_refs()

# Work is done and the pool has been scaled down.
usage = op.current_resource_usage()
assert usage.cpu == 0, usage
assert usage.gpu == 0, usage
assert usage.object_store_memory == pytest.approx(416, abs=10), usage

# Consume task outputs.
while op.has_next():
op.get_next()

# Work is done, pool has been scaled down, and outputs have been consumed.
usage = op.current_resource_usage()
assert usage.cpu == 0, usage
assert usage.gpu == 0, usage
assert usage.object_store_memory == 0, usage


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 80f2161

Please sign in to comment.