Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _actor_pool_should_scale_up(
op_state: "OpState",
):
# Do not scale up, if the op is completed or no more inputs are coming.
if op.completed() or (op._inputs_complete and op.internal_queue_size() == 0):
if op.completed() or (op_state.total_enqueued_input_bundles() == 0):
return False
if actor_pool.current_size() < actor_pool.min_size():
# Scale up, if the actor pool is below min size.
Expand All @@ -71,7 +71,7 @@ def _actor_pool_should_scale_up(
if not op_state._scheduling_status.under_resource_limits:
return False
# Do not scale up, if the op has enough free slots for the existing inputs.
if op_state.total_input_enqueued() <= actor_pool.num_free_task_slots():
if op_state.total_enqueued_input_bundles() <= actor_pool.num_free_task_slots():
return False
# Determine whether to scale up based on the actor pool utilization.
util = self._calculate_actor_pool_util(actor_pool)
Expand All @@ -81,9 +81,10 @@ def _actor_pool_should_scale_down(
self,
actor_pool: AutoscalingActorPool,
op: "PhysicalOperator",
op_state: "OpState",
):
# Scale down, if the op is completed or no more inputs are coming.
if op.completed() or (op._inputs_complete and op.internal_queue_size() == 0):
if op.completed() or (op_state.total_enqueued_input_bundles() == 0):
return True
if actor_pool.current_size() > actor_pool.max_size():
# Scale down, if the actor pool is above max size.
Expand All @@ -107,7 +108,9 @@ def _try_scale_up_or_down_actor_pool(self):
state,
)
should_scale_down = self._actor_pool_should_scale_down(
actor_pool, op
actor_pool,
op,
state,
)
if should_scale_up and not should_scale_down:
if actor_pool.scale_up(1) == 0:
Expand Down Expand Up @@ -143,7 +146,7 @@ def _try_scale_up_cluster(self):
for _, op_state in self._topology.items()
)
any_has_input = any(
op_state.total_input_enqueued() > 0
op_state._pending_dispatch_input_bundles_count() > 0
for _, op_state in self._topology.items()
)
if not (no_runnable_op and any_has_input):
Expand All @@ -169,7 +172,7 @@ def to_bundle(resource: ExecutionResources) -> Dict:
resource_request.extend([task_bundle] * op.num_active_tasks())
# Only include incremental resource usage for ops that are ready for
# dispatch.
if state.total_input_enqueued() > 0:
if state._pending_dispatch_input_bundles_count() > 0:
# TODO(Clark): Scale up more aggressively by adding incremental resource
# usage for more than one bundle in the queue for this op?
resource_request.append(task_bundle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ def set_target_max_block_size(self, target_max_block_size: Optional[int]):
self._output_block_size_option = None

def mark_execution_finished(self):
from ..operators.base_physical_operator import InternalQueueOperatorMixin

if isinstance(self, InternalQueueOperatorMixin):
assert self.internal_queue_size() == 0, (
"Operator is marked as finished execution, but internal queue is "
f"non-empty (got {self.internal_queue_size()} bundles)!"
)

"""Manually mark that this operator has finished execution."""
self._execution_finished = True

Expand All @@ -305,17 +313,32 @@ def execution_finished(self) -> bool:
return self._execution_finished

def completed(self) -> bool:
"""Return True when this operator is completed.
"""Returns whether this operator has been fully completed.

An operator is completed when all these conditions hold true:
* The operator has finished execution (i.e., `execution_finished()` is True).
* All outputs have been taken (i.e., `has_next()` is False).
An operator is completed iff:
* The operator has finished execution (i.e., `execution_finished()` is True).
* All outputs have been taken (i.e., `has_next()` is False) from it.
"""
from ..operators.base_physical_operator import InternalQueueOperatorMixin

internal_queue_size = (
self.internal_queue_size()
if isinstance(self, InternalQueueOperatorMixin)
else 0
)

if not self._execution_finished:
if self._inputs_complete and self.num_active_tasks() == 0:
# If all inputs are complete and there are no active tasks,
# then the operator has completed execution.
if (
self._inputs_complete
and internal_queue_size == 0
and self.num_active_tasks() == 0
):
# NOTE: Operator is considered completed iff
# - All input blocks have been ingested
# - Internal queue is empty
# - There are no active or pending tasks
self._execution_finished = True

return self._execution_finished and not self.has_next()

def get_stats(self) -> StatsDict:
Expand Down Expand Up @@ -479,13 +502,6 @@ def throttling_disabled(self) -> bool:
"""
return False

def internal_queue_size(self) -> int:
"""If the operator has an internal input queue, return its size.

This is used to report tasks pending submission to actor pools.
"""
return 0

def shutdown(self, timer: Timer, force: bool = False) -> None:
"""Abort execution and release all resources used by this operator.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ def __init__(
self._inputs_done = False

def internal_queue_size(self) -> int:
return len(self._bundle_queue)
# NOTE: Internal queue size for ``ActorPoolMapOperator`` includes both
# - Input blocks bundler, alas
# - Own bundle's queue
return self._block_ref_bundler.num_bundles() + len(self._bundle_queue)

def completed(self) -> bool:
# TODO separate marking as completed from the check
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import abc
from typing import List, Optional

from ray.data._internal.execution.interfaces import (
Expand All @@ -12,6 +13,13 @@
from ray.data.context import DataContext


class InternalQueueOperatorMixin(PhysicalOperator, abc.ABC):
@abc.abstractmethod
def internal_queue_size(self) -> int:
"""Returns Operator's internal queue size"""
...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the benefit of introducing this Mixin?
I feel this makes the dependency more convoluted.
If we want to force subclasses to implement this. We can just make it abstract in the PhysicalOperator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a typical mixin use-case -- to abstract and inject a scope of functionality into the class

  • We don't want to force every class to implement it (it's only relevant for a handful of classes)
  • This mixin will be expanded with the queue itself offloaded to it (at later point to avoid overloading this PR)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mixin will be expanded with the queue itself offloaded to it (at later point to avoid overloading this PR)

This is a good point. Let's leave a TODO here?



class OneToOneOperator(PhysicalOperator):
"""An operator that has one input and one output dependency.

Expand Down Expand Up @@ -39,7 +47,7 @@ def input_dependency(self) -> PhysicalOperator:
return self.input_dependencies[0]


class AllToAllOperator(PhysicalOperator):
class AllToAllOperator(InternalQueueOperatorMixin, PhysicalOperator):
"""A blocking operator that executes once its inputs are complete.

This operator implements distributed sort / shuffle operations, etc.
Expand Down Expand Up @@ -96,6 +104,9 @@ def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
self._input_buffer.append(refs)
self._metrics.on_input_queued(refs)

def internal_queue_size(self) -> int:
return len(self._input_buffer)

def all_inputs_done(self) -> None:
ctx = TaskContext(
task_idx=self._next_task_index,
Expand Down
44 changes: 25 additions & 19 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
OpTask,
)
from ray.data._internal.execution.operators.base_physical_operator import (
InternalQueueOperatorMixin,
OneToOneOperator,
)
from ray.data._internal.execution.operators.map_transformer import (
Expand All @@ -61,7 +62,7 @@
logger = logging.getLogger(__name__)


class MapOperator(OneToOneOperator, ABC):
class MapOperator(OneToOneOperator, InternalQueueOperatorMixin, ABC):
"""A streaming operator that maps input bundles 1:1 to output bundles.

This operator implements the distributed map operation, supporting both task
Expand Down Expand Up @@ -145,6 +146,9 @@ def get_additional_split_factor(self) -> int:
def set_additional_split_factor(self, k: int):
self._additional_split_factor = k

def internal_queue_size(self) -> int:
return self._block_ref_bundler.num_bundles()

@property
def name(self) -> str:
name = super().name
Expand Down Expand Up @@ -575,6 +579,9 @@ def __init__(self, min_rows_per_bundle: Optional[int]):
self._bundle_buffer_size = 0
self._finalized = False

def num_bundles(self):
return len(self._bundle_buffer)

def add_bundle(self, bundle: RefBundle):
"""Add a bundle to the bundler."""
self._bundle_buffer.append(bundle)
Expand All @@ -585,7 +592,7 @@ def has_bundle(self) -> bool:
return self._bundle_buffer and (
self._min_rows_per_bundle is None
or self._bundle_buffer_size >= self._min_rows_per_bundle
or (self._finalized and self._bundle_buffer_size > 0)
or (self._finalized and self._bundle_buffer_size >= 0)
)

def get_next_bundle(self) -> Tuple[List[RefBundle], RefBundle]:
Expand All @@ -596,40 +603,39 @@ def get_next_bundle(self) -> Tuple[List[RefBundle], RefBundle]:
the output bundle. The second element is the output bundle.
"""
assert self.has_bundle()

if self._min_rows_per_bundle is None:
# Short-circuit if no bundle row target was defined.
assert len(self._bundle_buffer) == 1
bundle = self._bundle_buffer[0]
self._bundle_buffer = []
self._bundle_buffer_size = 0
return [bundle], bundle
leftover = []

remainder = []
output_buffer = []
output_buffer_size = 0
buffer_filled = False
for bundle in self._bundle_buffer:

for idx, bundle in enumerate(self._bundle_buffer):
bundle_size = self._get_bundle_size(bundle)
if buffer_filled:
# Buffer has been filled, save it in the leftovers.
leftover.append(bundle)
elif (
output_buffer_size + bundle_size <= self._min_rows_per_bundle

# Add bundle to the output buffer so long as either
# - Output buffer size is still 0
# - Output buffer doesn't exceeds the `_min_rows_per_bundle` threshold
if (
output_buffer_size < self._min_rows_per_bundle
or output_buffer_size == 0
):
# Bundle fits in buffer, or bundle doesn't fit but the buffer still
# needs a non-empty bundle.
output_buffer.append(bundle)
output_buffer_size += bundle_size
else:
# Bundle doesn't fit in a buffer that already has at least one non-empty
# bundle, so we add it to the leftovers.
leftover.append(bundle)
# Add all remaining bundles to the leftovers.
buffer_filled = True
self._bundle_buffer = leftover
remainder = self._bundle_buffer[idx:]

self._bundle_buffer = remainder
self._bundle_buffer_size = sum(
self._get_bundle_size(bundle) for bundle in leftover
self._get_bundle_size(bundle) for bundle in remainder
)

return list(output_buffer), _merge_ref_bundles(*output_buffer)

def done_adding_bundles(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
PhysicalOperator,
RefBundle,
)
from ray.data._internal.execution.operators.base_physical_operator import (
InternalQueueOperatorMixin,
)
from ray.data._internal.execution.util import locality_string
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import StatsDict
Expand All @@ -17,7 +20,7 @@
from ray.types import ObjectRef


class OutputSplitter(PhysicalOperator):
class OutputSplitter(InternalQueueOperatorMixin, PhysicalOperator):
"""An operator that splits the given data into `n` output splits.

The output bundles of this operator will have a `bundle.output_split_idx` attr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
PhysicalOperator,
RefBundle,
)
from ray.data._internal.execution.operators.base_physical_operator import NAryOperator
from ray.data._internal.execution.operators.base_physical_operator import (
InternalQueueOperatorMixin,
NAryOperator,
)
from ray.data._internal.stats import StatsDict
from ray.data.context import DataContext


class UnionOperator(NAryOperator):
class UnionOperator(InternalQueueOperatorMixin, NAryOperator):
"""An operator that combines output blocks from
two or more input operators into a single output."""

Expand Down Expand Up @@ -69,6 +72,9 @@ def num_output_rows_total(self) -> Optional[int]:
total_rows += input_num_rows
return total_rows

def internal_queue_size(self) -> int:
return sum([len(buf) for buf in self._input_buffers])

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import ray
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle
from ray.data._internal.execution.operators.base_physical_operator import (
InternalQueueOperatorMixin,
)
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.split import _split_at_indices
from ray.data._internal.stats import StatsDict
Expand All @@ -18,7 +21,7 @@
from ray.data.context import DataContext


class ZipOperator(PhysicalOperator):
class ZipOperator(InternalQueueOperatorMixin, PhysicalOperator):
"""An operator that zips its inputs together.

NOTE: the implementation is bulk for now, which materializes all its inputs in
Expand Down Expand Up @@ -69,6 +72,9 @@ def num_output_rows_total(self) -> Optional[int]:
else:
return right_num_rows

def internal_queue_size(self) -> int:
return len(self._left_buffer) + len(self._right_buffer)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert input_index == 0 or input_index == 1, input_index
Expand Down
4 changes: 1 addition & 3 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,7 @@ def _get_state_dict(self, state):
"progress": op_state.num_completed_tasks,
"total": op.num_outputs_total(),
"total_rows": op.num_output_rows_total(),
"queued_blocks": (
op.internal_queue_size() + op_state.total_input_enqueued()
),
"queued_blocks": op_state.total_enqueued_input_bundles(),
"state": DatasetState.FINISHED.name
if op.execution_finished()
else state,
Expand Down
Loading