Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

138 changes: 74 additions & 64 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,67 @@ def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
return None
return self._op_resource_allocator.get_budget(op)

def is_op_eligible(self, op: PhysicalOperator) -> bool:
"""Whether the op is eligible for memory reservation."""
return (
not op.throttling_disabled()
# As long as the op has finished execution, even if there are still
# non-taken outputs, we don't need to allocate resources for it.
and not op.execution_finished()
)

def get_eligible_ops(self) -> List[PhysicalOperator]:
return [op for op in self._topology if self.is_op_eligible(op)]

def get_downstream_ineligible_ops(
self, op: PhysicalOperator
) -> Iterable[PhysicalOperator]:
"""Get the downstream ineligible operators of the given operator.

E.g.,
- "cur_map->downstream_map" will return an empty list.
- "cur_map->limit1->limit2->downstream_map" will return [limit1, limit2].
"""
for next_op in op.output_dependencies:
if not self.is_op_eligible(next_op):
yield next_op
yield from self.get_downstream_ineligible_ops(next_op)

def get_downstream_eligible_ops(
self, op: PhysicalOperator
) -> Iterable[PhysicalOperator]:
"""Get the downstream eligible operators of the given operator, ignoring
intermediate ineligible operators.

E.g.,
- "cur_map->downstream_map" will return [downstream_map].
- "cur_map->limit1->limit2->downstream_map" will return [downstream_map].
"""
for next_op in op.output_dependencies:
if self.is_op_eligible(next_op):
yield next_op
else:
yield from self.get_downstream_eligible_ops(next_op)

def get_op_outputs_object_store_usage_with_downstream(
self, op: PhysicalOperator
) -> int:
"""Get the outputs memory usage of the given operator, including the downstream
ineligible operators.
"""
# Outputs usage of the current operator.
op_outputs_usage = self._mem_op_outputs[op]
# Also account the downstream ineligible operators' memory usage.
op_outputs_usage += sum(
self.get_op_usage(next_op).object_store_memory
for next_op in self.get_downstream_ineligible_ops(op)
)
return op_outputs_usage

def get_op_internal_object_store_usage(self, op: PhysicalOperator) -> int:
"""Get the internal object store memory usage of the given operator"""
return self._mem_op_internal[op]


class OpResourceAllocator(ABC):
"""An interface for dynamic operator resource allocation.
Expand Down Expand Up @@ -479,20 +540,6 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):

self._idle_detector = self.IdleDetector()

def _is_op_eligible(self, op: PhysicalOperator) -> bool:
"""Whether the op is eligible for memory reservation."""
return (
not op.throttling_disabled()
# As long as the op has finished execution, even if there are still
# non-taken outputs, we don't need to allocate resources for it.
and not op.execution_finished()
)

def _get_eligible_ops(self) -> List[PhysicalOperator]:
return [
op for op in self._resource_manager._topology if self._is_op_eligible(op)
]

def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]:
"""
Resource reservation is based on the number of eligible operators.
Expand All @@ -519,14 +566,14 @@ def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]:
# filter out downstream ineligible operators since they are omitted from reservation calculations.
for op in last_completed_ops:
ops_to_exclude_from_reservation.extend(
list(self._get_downstream_ineligible_ops(op))
list(self._resource_manager.get_downstream_ineligible_ops(op))
)
ops_to_exclude_from_reservation.append(op)
return list(set(ops_to_exclude_from_reservation))

def _update_reservation(self):
global_limits = self._resource_manager.get_global_limits().copy()
eligible_ops = self._get_eligible_ops()
eligible_ops = self._resource_manager.get_eligible_ops()

self._op_reserved.clear()
self._reserved_for_op_outputs.clear()
Expand Down Expand Up @@ -610,7 +657,7 @@ def _should_unblock_streaming_output_backpressure(
# launch tasks. Then we should temporarily unblock the streaming output
# backpressure by allowing reading at least 1 block. So the current operator
# can finish at least one task and yield resources to the downstream operators.
for next_op in self._get_downstream_eligible_ops(op):
for next_op in self._resource_manager.get_downstream_eligible_ops(op):
if not self._reserved_min_resources[next_op]:
# Case 1: the downstream operator hasn't reserved the minimum resources
# to run at least one task.
Expand All @@ -623,25 +670,14 @@ def _should_unblock_streaming_output_backpressure(
return True
return False

def _get_op_outputs_usage_with_downstream(self, op: PhysicalOperator) -> float:
"""Get the outputs memory usage of the given operator, including the downstream
ineligible operators.
"""
# Outputs usage of the current operator.
op_outputs_usage = self._resource_manager._mem_op_outputs[op]
# Also account the downstream ineligible operators' memory usage.
op_outputs_usage += sum(
self._resource_manager.get_op_usage(next_op).object_store_memory
for next_op in self._get_downstream_ineligible_ops(op)
)
return op_outputs_usage

def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
if op not in self._op_budgets:
return None
res = self._op_budgets[op].object_store_memory
# Add the remaining of `_reserved_for_op_outputs`.
op_outputs_usage = self._get_op_outputs_usage_with_downstream(op)
op_outputs_usage = (
self._resource_manager.get_op_outputs_object_store_usage_with_downstream(op)
)
res += max(self._reserved_for_op_outputs[op] - op_outputs_usage, 0)
if math.isinf(res):
self._output_budgets[op] = res
Expand All @@ -654,41 +690,11 @@ def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
self._output_budgets[op] = res
return res

def _get_downstream_ineligible_ops(
self, op: PhysicalOperator
) -> Iterable[PhysicalOperator]:
"""Get the downstream ineligible operators of the given operator.

E.g.,
- "cur_map->downstream_map" will return an empty list.
- "cur_map->limit1->limit2->downstream_map" will return [limit1, limit2].
"""
for next_op in op.output_dependencies:
if not self._is_op_eligible(next_op):
yield next_op
yield from self._get_downstream_ineligible_ops(next_op)

def _get_downstream_eligible_ops(
self, op: PhysicalOperator
) -> Iterable[PhysicalOperator]:
"""Get the downstream eligible operators of the given operator, ignoring
intermediate ineligible operators.

E.g.,
- "cur_map->downstream_map" will return [downstream_map].
- "cur_map->limit1->limit2->downstream_map" will return [downstream_map].
"""
for next_op in op.output_dependencies:
if self._is_op_eligible(next_op):
yield next_op
else:
yield from self._get_downstream_eligible_ops(next_op)

def update_usages(self):
self._update_reservation()

self._op_budgets.clear()
eligible_ops = self._get_eligible_ops()
eligible_ops = self._resource_manager.get_eligible_ops()
if len(eligible_ops) == 0:
return

Expand All @@ -699,10 +705,14 @@ def update_usages(self):
op_mem_usage = 0
# Add the memory usage of the operator itself,
# excluding `_reserved_for_op_outputs`.
op_mem_usage += self._resource_manager._mem_op_internal[op]
op_mem_usage += self._resource_manager.get_op_internal_object_store_usage(
op
)
# Add the portion of op outputs usage that has
# exceeded `_reserved_for_op_outputs`.
op_outputs_usage = self._get_op_outputs_usage_with_downstream(op)
op_outputs_usage = self._resource_manager.get_op_outputs_object_store_usage_with_downstream(
op
)
op_mem_usage += max(op_outputs_usage - self._reserved_for_op_outputs[op], 0)
op_usage = self._resource_manager.get_op_usage(op).copy(
object_store_memory=op_mem_usage
Expand Down
11 changes: 11 additions & 0 deletions python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ class ShuffleStrategy(str, enum.Enum):
)


DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE: bool = env_bool(
"RAY_DATA_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE", False
)


@DeveloperAPI
@dataclass
class AutoscalingConfig:
Expand Down Expand Up @@ -453,6 +458,8 @@ class DataContext:
later. If `None`, this type of backpressure is disabled.
downstream_capacity_backpressure_max_queued_bundles: Maximum number of queued
bundles before applying backpressure. If `None`, no limit is applied.
enable_dynamic_output_queue_size_backpressure: Whether to cap the concurrency
of an operator based on it's and downstream's queue size.
enforce_schemas: Whether to enforce schema consistency across dataset operations.
pandas_block_ignore_metadata: Whether to ignore pandas metadata when converting
between Arrow and pandas formats for better type inference.
Expand Down Expand Up @@ -591,6 +598,10 @@ class DataContext:
downstream_capacity_backpressure_ratio: float = None
downstream_capacity_backpressure_max_queued_bundles: int = None

enable_dynamic_output_queue_size_backpressure: bool = (
DEFAULT_ENABLE_DYNAMIC_OUTPUT_QUEUE_SIZE_BACKPRESSURE
)

enforce_schemas: bool = DEFAULT_ENFORCE_SCHEMAS

pandas_block_ignore_metadata: bool = DEFAULT_PANDAS_BLOCK_IGNORE_METADATA
Expand Down
Loading