diff --git a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py index a15f935aca4b..517aa0f81901 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py @@ -108,8 +108,11 @@ def __init__(self, *args, **kwargs): # Initialize caps from operators (infinite if unset) for op, _ in self._topology.items(): - if isinstance(op, TaskPoolMapOperator) and op.get_concurrency() is not None: - self._concurrency_caps[op] = op.get_concurrency() + if ( + isinstance(op, TaskPoolMapOperator) + and op.get_max_concurrency_limit() is not None + ): + self._concurrency_caps[op] = op.get_max_concurrency_limit() else: self._concurrency_caps[op] = float("inf") diff --git a/python/ray/data/_internal/execution/backpressure_policy/resource_budget_backpressure_policy.py b/python/ray/data/_internal/execution/backpressure_policy/resource_budget_backpressure_policy.py index b97818d7b05d..5d383e07d7d6 100644 --- a/python/ray/data/_internal/execution/backpressure_policy/resource_budget_backpressure_policy.py +++ b/python/ray/data/_internal/execution/backpressure_policy/resource_budget_backpressure_policy.py @@ -15,10 +15,10 @@ class ResourceBudgetBackpressurePolicy(BackpressurePolicy): """A backpressure policy based on resource budgets in ResourceManager.""" def can_add_input(self, op: "PhysicalOperator") -> bool: - budget = self._resource_manager.get_budget(op) - if budget is None: - return True - return op.incremental_resource_usage().satisfies_limit(budget) + if self._resource_manager._op_resource_allocator is not None: + return self._resource_manager._op_resource_allocator.can_submit_new_task(op) + + return True def max_task_output_bytes_to_read(self, op: "PhysicalOperator") -> Optional[int]: """Determine maximum bytes to read based on the resource budgets. diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 1deee5f9a574..ca656bfce66a 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -481,7 +481,12 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta): metrics_type=MetricsType.Histogram, metrics_args={"boundaries": histogram_buckets_s}, ) - task_completion_time_without_backpressure: float = metric_field( + task_completion_time_s: float = metric_field( + default=0, + description="Time spent running tasks to completion.", + metrics_group=MetricsGroup.TASKS, + ) + task_completion_time_excl_backpressure_s: float = metric_field( default=0, description="Time spent running tasks to completion without backpressure.", metrics_group=MetricsGroup.TASKS, @@ -678,6 +683,30 @@ def num_output_blocks_per_task_s(self) -> Optional[float]: else: return self.num_task_outputs_generated / self.block_generation_time + @metric_property( + description="Average task's completion time in seconds (including throttling).", + metrics_group=MetricsGroup.TASKS, + ) + def average_total_task_completion_time_s(self) -> Optional[float]: + """Average task's completion time in seconds (including throttling)""" + if self.num_tasks_finished == 0: + return None + else: + return self.task_completion_time_s / self.num_tasks_finished + + @metric_property( + description="Average task's completion time in seconds (excluding throttling).", + metrics_group=MetricsGroup.TASKS, + ) + def average_task_completion_excl_backpressure_time_s(self) -> Optional[float]: + """Average task's completion time in seconds (excluding throttling)""" + if self.num_tasks_finished == 0: + return None + else: + return ( + self.task_completion_time_excl_backpressure_s / self.num_tasks_finished + ) + @metric_property( description="Average size of task output in bytes.", metrics_group=MetricsGroup.OUTPUTS, @@ -981,8 +1010,10 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]): self.bytes_outputs_of_finished_tasks += task_info.bytes_outputs self.rows_outputs_of_finished_tasks += task_info.num_rows_produced + task_time_delta = time.perf_counter() - task_info.start_time + self.task_completion_time_s += task_time_delta + with self._histogram_thread_lock: - task_time_delta = time.perf_counter() - task_info.start_time bucket_index = find_bucket_index(histogram_buckets_s, task_time_delta) self.task_completion_time[bucket_index] += 1 @@ -997,7 +1028,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]): # NOTE: This is used for Issue Detection self._op_task_duration_stats.add_duration(task_time_delta) - self.task_completion_time_without_backpressure += task_info.cum_block_gen_time + self.task_completion_time_excl_backpressure_s += task_info.cum_block_gen_time inputs = self._running_tasks[task_index].inputs self.num_task_inputs_processed += len(inputs) total_input_size = inputs.size_bytes() diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index cfe03a708dc6..913cbbcc1d1d 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -702,13 +702,12 @@ def pending_processor_usage(self) -> ExecutionResources: def min_max_resource_requirements( self, ) -> Tuple[ExecutionResources, ExecutionResources]: - """Returns the min and max resources to start the operator and make progress. + """Returns lower/upper boundary of resource requirements for this operator: - For example, an operator that creates an actor pool requiring 8 GPUs could - return ExecutionResources(gpu=8) as its minimum usage. - - This method is used by the resource manager to reserve minimum resources and to - ensure that it doesn't over-provision resources. + - Minimal: lower bound (min) of resources required to start this operator + (for most operators this is 0, except the ones that utilize actors) + - Maximum: upper bound (max) of how many resources this operator could + utilize. """ return ExecutionResources.zero(), ExecutionResources.inf() @@ -806,6 +805,11 @@ def upstream_op_num_outputs(self): ) return upstream_op_num_outputs + def get_max_concurrency_limit(self) -> Optional[int]: + """Max value of how many tasks this operator could run + concurrently (if limited)""" + return None + class ReportsExtraResourceUsage(abc.ABC): @abc.abstractmethod diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index eea97177dfa6..3d7328018c88 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -161,6 +161,7 @@ def __init__( self._actor_cls = None # Whether no more submittable bundles will be added. self._inputs_done = False + self._actor_locality_enabled: Optional[bool] = None # Locality metrics self._locality_hits = 0 @@ -496,14 +497,13 @@ def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]: def per_task_resource_allocation( self: "PhysicalOperator", ) -> ExecutionResources: - max_concurrency = self._ray_remote_args.get("max_concurrency", 1) + # For Actor tasks resource allocation is determined as: + # - Per actor resource allocation divided by + # - Actor's max task concurrency + max_concurrency = self._actor_pool.max_actor_concurrency() per_actor_resource_usage = self._actor_pool.per_actor_resource_usage() return per_actor_resource_usage.scale(1 / max_concurrency) - def max_task_concurrency(self: "PhysicalOperator") -> Optional[int]: - max_concurrency = self._ray_remote_args.get("max_concurrency", 1) - return max_concurrency * self._actor_pool.max_size() - def min_scheduling_resources( self: "PhysicalOperator", ) -> ExecutionResources: @@ -531,6 +531,9 @@ def get_actor_info(self) -> _ActorPoolInfo: """Returns Actor counts for Alive, Restarting and Pending Actors.""" return self._actor_pool.get_actor_info() + def get_max_concurrency_limit(self) -> Optional[int]: + return self._actor_pool.max_size() * self._actor_pool.max_actor_concurrency() + class _MapWorker: """An actor worker for MapOperator.""" diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 505099d6ec43..8fd9ac019f60 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -235,7 +235,7 @@ def create( name=name, target_max_block_size_override=target_max_block_size_override, min_rows_per_bundle=min_rows_per_bundle, - concurrency=compute_strategy.size, + max_concurrency=compute_strategy.size, supports_fusion=supports_fusion, map_task_kwargs=map_task_kwargs, ray_remote_args_fn=ray_remote_args_fn, @@ -518,12 +518,6 @@ def current_processor_usage(self) -> ExecutionResources: def pending_processor_usage(self) -> ExecutionResources: raise NotImplementedError - @abstractmethod - def min_max_resource_requirements( - self, - ) -> Tuple[ExecutionResources, ExecutionResources]: - ... - @abstractmethod def incremental_resource_usage(self) -> ExecutionResources: raise NotImplementedError diff --git a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py index fc7a8537cd94..bdd8e3c1dbbe 100644 --- a/python/ray/data/_internal/execution/operators/task_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/task_pool_map_operator.py @@ -1,5 +1,5 @@ import warnings -from typing import Any, Callable, Dict, Optional, Tuple +from typing import Any, Callable, Dict, Optional from ray.data._internal.execution.interfaces import ( ExecutionResources, @@ -24,7 +24,7 @@ def __init__( name: str = "TaskPoolMap", target_max_block_size_override: Optional[int] = None, min_rows_per_bundle: Optional[int] = None, - concurrency: Optional[int] = None, + max_concurrency: Optional[int] = None, supports_fusion: bool = True, map_task_kwargs: Optional[Dict[str, Any]] = None, ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None, @@ -41,7 +41,7 @@ def __init__( transform_fn, or None to use the block size. Setting the batch size is important for the performance of GPU-accelerated transform functions. The actual rows passed may be less if the dataset is small. - concurrency: The maximum number of Ray tasks to use concurrently, + max_concurrency: The maximum number of Ray tasks to use concurrently, or None to use as many tasks as possible. supports_fusion: Whether this operator supports fusion with other operators. map_task_kwargs: A dictionary of kwargs to pass to the map task. You can @@ -66,7 +66,11 @@ def __init__( ray_remote_args_fn, ray_remote_args, ) - self._concurrency = concurrency + + if max_concurrency is not None and max_concurrency <= 0: + raise ValueError(f"max_concurrency have to be > 0 (got {max_concurrency})") + + self._max_concurrency = max_concurrency # NOTE: Unlike static Ray remote args, dynamic arguments extracted from the # blocks themselves are going to be passed inside `fn.options(...)` @@ -115,11 +119,6 @@ def _add_bundled_input(self, bundle: RefBundle): def progress_str(self) -> str: return "" - def min_max_resource_requirements( - self, - ) -> Tuple[ExecutionResources, ExecutionResources]: - return self.incremental_resource_usage(), ExecutionResources.for_limits() - def current_processor_usage(self) -> ExecutionResources: num_active_workers = self.num_active_tasks() return ExecutionResources( @@ -131,40 +130,37 @@ def pending_processor_usage(self) -> ExecutionResources: return ExecutionResources() def incremental_resource_usage(self) -> ExecutionResources: + return self.per_task_resource_allocation().copy( + object_store_memory=( + self._metrics.obj_store_mem_max_pending_output_per_task or 0 + ), + ) + + def per_task_resource_allocation(self) -> ExecutionResources: return ExecutionResources( cpu=self._ray_remote_args.get("num_cpus", 0), gpu=self._ray_remote_args.get("num_gpus", 0), memory=self._ray_remote_args.get("memory", 0), - object_store_memory=self._metrics.obj_store_mem_max_pending_output_per_task - or 0, ) - def per_task_resource_allocation( - self: "PhysicalOperator", - ) -> ExecutionResources: - return self.incremental_resource_usage() - - def max_task_concurrency(self: "PhysicalOperator") -> Optional[int]: - return self._concurrency - def min_scheduling_resources( self: "PhysicalOperator", ) -> ExecutionResources: return self.incremental_resource_usage() - def get_concurrency(self) -> Optional[int]: - return self._concurrency + def get_max_concurrency_limit(self) -> Optional[int]: + return self._max_concurrency def all_inputs_done(self): super().all_inputs_done() if ( - self._concurrency is not None - and self._metrics.num_inputs_received < self._concurrency + self._max_concurrency is not None + and self._metrics.num_inputs_received < self._max_concurrency ): warnings.warn( f"The maximum number of concurrent tasks for '{self.name}' is set to " - f"{self._concurrency}, but the operator only received " + f"{self._max_concurrency}, but the operator only received " f"{self._metrics.num_inputs_received} input(s). This means that the " f"operator can launch at most {self._metrics.num_inputs_received} " "task(s), which is less than the concurrency limit. You might be able " diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index b4ac32aed28b..b8f559bb7d56 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -18,6 +18,9 @@ from ray.data._internal.execution.operators.base_physical_operator import ( AllToAllOperator, ) +from ray.data._internal.execution.operators.hash_shuffle import ( + HashShufflingOperatorBase, +) from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.operators.zip_operator import ZipOperator from ray.data._internal.execution.util import memory_string @@ -41,7 +44,7 @@ class ResourceManager: """A class that manages the resource usage of a streaming executor.""" # The interval in seconds at which the global resource limits are refreshed. - GLOBAL_LIMITS_UPDATE_INTERVAL_S = 10 + GLOBAL_LIMITS_UPDATE_INTERVAL_S = 1 # The fraction of the object store capacity that will be used as the default object # store memory limit for the streaming executor, @@ -83,9 +86,6 @@ def __init__( # Whether to print debug information. self._debug = DEBUG_RESOURCE_MANAGER - self._downstream_fraction: Dict[PhysicalOperator, float] = {} - self._downstream_object_store_memory: Dict[PhysicalOperator, float] = {} - self._op_resource_allocator: Optional["OpResourceAllocator"] = None if data_context.op_resource_reservation_enabled: @@ -154,7 +154,7 @@ def _estimate_object_store_memory( mem_op_internal += op.metrics.obj_store_mem_internal_outqueue # Op's external output buffer. - mem_op_outputs = state.outqueue_memory_usage() + mem_op_outputs = state.output_queue_bytes() # Input buffers of the downstream operators. for next_op in op.output_dependencies: mem_op_outputs += ( @@ -178,12 +178,8 @@ def update_usages(self): self._op_usages.clear() self._op_running_usages.clear() self._op_pending_usages.clear() - self._downstream_fraction.clear() - self._downstream_object_store_memory.clear() # Iterate from last to first operator. - num_ops_so_far = 0 - num_ops_total = len(self._topology) for op, state in reversed(self._topology.items()): # Update `self._op_usages`, `self._op_running_usages`, # and `self._op_pending_usages`. @@ -220,21 +216,14 @@ def update_usages(self): op_pending_usage ) - # Update `self._downstream_fraction` and `_downstream_object_store_memory`. - # Subtract one from denom to account for input buffer. - f = (1.0 + num_ops_so_far) / max(1.0, num_ops_total - 1.0) - num_ops_so_far += 1 - self._downstream_fraction[op] = min(1.0, f) - self._downstream_object_store_memory[ - op - ] = self._global_usage.object_store_memory - # Update operator's object store usage, which is used by # DatasetStats and updated on the Ray Data dashboard. op._metrics.obj_store_mem_used = op_usage.object_store_memory if self._op_resource_allocator is not None: - self._op_resource_allocator.update_usages() + self._op_resource_allocator.update_budgets( + limits=self._global_limits, + ) def get_global_usage(self) -> ExecutionResources: """Return the global resource usage at the current time.""" @@ -286,24 +275,34 @@ def get_op_usage_str(self, op: PhysicalOperator) -> str: usage_str += ( f", {self._op_running_usages[op].object_store_memory_str()} object store" ) + if self._debug: usage_str += ( f" (in={memory_string(self._mem_op_internal[op])}," f"out={memory_string(self._mem_op_outputs[op])})" ) - if ( - isinstance(self._op_resource_allocator, ReservationOpResourceAllocator) - and op in self._op_resource_allocator._op_budgets - ): - budget = self._op_resource_allocator._op_budgets[op] - usage_str += f", budget=(cpu={budget.cpu:.1f}" - usage_str += f",gpu={budget.gpu:.1f}" - usage_str += f",obj_store={budget.object_store_memory_str()}" - # Remaining memory budget for producing new task outputs. - reserved_for_output = memory_string( - self._op_resource_allocator._output_budgets.get(op, 0) - ) - usage_str += f",out={reserved_for_output})" + if self._op_resource_allocator is not None: + allocation = self._op_resource_allocator.get_allocation(op) + if allocation: + usage_str += f", alloc=(cpu={allocation.cpu:.1f}" + usage_str += f",gpu={allocation.gpu:.1f}" + usage_str += f",obj_store={allocation.object_store_memory_str()})" + + budget = self._op_resource_allocator.get_budget(op) + if budget: + usage_str += f", budget=(cpu={budget.cpu:.1f}" + usage_str += f",gpu={budget.gpu:.1f}" + usage_str += f",obj_store={budget.object_store_memory_str()}" + + # Remaining memory budget for producing new task outputs. + if isinstance( + self._op_resource_allocator, ReservationOpResourceAllocator + ): + reserved_for_output = memory_string( + self._op_resource_allocator._output_budgets.get(op, 0) + ) + usage_str += f",out={reserved_for_output})" + return usage_str def op_resource_allocator_enabled(self) -> bool: @@ -316,12 +315,12 @@ def op_resource_allocator(self) -> "OpResourceAllocator": assert self._op_resource_allocator is not None return self._op_resource_allocator - def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: - """Return the maximum bytes of pending task outputs can be read for - the given operator. None means no limit.""" - if self._op_resource_allocator is None: - return None - return self._op_resource_allocator.max_task_output_bytes_to_read(op) + def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> int: + return self._op_resource_allocator.max_task_output_bytes_to_read( + op, + task_resource_usage=self._op_usages, + output_object_store_usage=self._mem_op_outputs, + ) def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: """Return the budget for the given operator, or None if the operator @@ -392,59 +391,24 @@ def get_op_internal_object_store_usage(self, op: PhysicalOperator) -> int: return self._mem_op_internal[op] -class OpResourceAllocator(ABC): - """An interface for dynamic operator resource allocation. - - This interface allows dynamically allocating available resources to each operator, - limiting how many tasks each operator can submit, and how much data each operator - can read from its running tasks. - """ - - def __init__(self, resource_manager: ResourceManager): - self._resource_manager = resource_manager - - @abstractmethod - def update_usages(self): - """Callback to update resource usages.""" - ... - - @abstractmethod - def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: - """Return the maximum bytes of pending task outputs can be read for - the given operator. None means no limit.""" - ... +def _get_first_pending_shuffle_op(topology: "Topology") -> int: + for idx, op in enumerate(topology): + if _is_shuffle_op(op) and not op.completed(): + return idx - @abstractmethod - def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: - """Return the budget for the given operator, or None if the operator - has unlimited budget.""" - ... + return -1 -class ReservationOpResourceAllocator(OpResourceAllocator): - """An OpResourceAllocator implementation that reserves resources for each operator. +def _is_shuffle_op(op: PhysicalOperator) -> bool: + return isinstance(op, (AllToAllOperator, HashShufflingOperatorBase)) - This class reserves memory and CPU resources for eligible operators, and considers - runtime resource usages to limit the resources that each operator can use. - It works in the following way: - 1. An operator is eligible for resource reservation, if it has enabled throttling - and hasn't completed. Ineligible operators are not throttled, but - their usage will be accounted for their upstream eligible operators. E.g., for - such a dataset "map1->limit->map2->streaming_split", we'll treat "map1->limit" as - a group and "map2->streaming_split" as another group. - 2. For each eligible operator, we reserve `reservation_ratio * global_resources / - num_eligible_ops` resources, half of which is reserved only for the operator - outputs, excluding pending task outputs. - 3. Non-reserved resources are shared among all operators. - 4. In each scheduling iteration, each eligible operator will get "remaining of their - own reserved resources" + "remaining of shared resources / num_eligible_ops" - resources. +class OpResourceAllocator(ABC): + """An interface for dynamic operator resource allocation. - The `reservation_ratio` is set to 50% by default. Users can tune this value to - adjust how aggressive or conservative the resource allocation is. A higher value - will make the resource allocation more even, but may lead to underutilization and - worse performance. And vice versa. + This interface allows dynamically allocating available resources to each operator, + limiting how many tasks each operator can submit, and how much data each operator + can read from its running tasks. """ class IdleDetector: @@ -487,6 +451,7 @@ def detect_idle(self, op: PhysicalOperator): op, cur_time - self.last_output_time[op] ) return True + return False @classmethod @@ -508,10 +473,156 @@ def print_warning_if_idle_for_too_long( " `DataContext.get_current().execution_options.exclude_resources`." " This message will only print once." ) + logger.warning(msg) + def __init__(self, topology: "Topology"): + self._topology = topology + self._idle_detector = self.IdleDetector() + + @abstractmethod + def update_budgets( + self, + *, + limits: ExecutionResources, + ): + """Callback to update resource usages.""" + ... + + @abstractmethod + def can_submit_new_task(self, op: PhysicalOperator) -> bool: + """Return whether the given operator can submit a new task.""" + ... + + @abstractmethod + def max_task_output_bytes_to_read( + self, + op: PhysicalOperator, + *, + task_resource_usage: Dict[PhysicalOperator, ExecutionResources], + output_object_store_usage: Dict[PhysicalOperator, int], + ) -> Optional[int]: + """Return the maximum bytes of pending task outputs can be read for + the given operator. None means no limit.""" + ... + + @abstractmethod + def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: + """Returns the budget for the given operator or `None` if the operator + has unlimited budget. Operator's budget is defined as: + + Budget = Allocation - Usage + """ + ... + + @abstractmethod + def get_output_budget(self, op: PhysicalOperator) -> Optional[int]: + """Returns the budget for operator's outputs (in object store bytes) or + `None` if there's no limit. + """ + ... + + @abstractmethod + def get_allocation(self, op: PhysicalOperator) -> Optional[ExecutionResources]: + """Returns allocation for the given operator or `None` if operator's + allocation is unlimited.""" + ... + + def _get_eligible_ops(self) -> List[PhysicalOperator]: + first_pending_shuffle_op_idx = _get_first_pending_shuffle_op(self._topology) + return [ + op + for idx, op in enumerate(self._topology) + if self._is_op_eligible(op) + and ( + first_pending_shuffle_op_idx == -1 + or idx <= first_pending_shuffle_op_idx + ) + ] + + @staticmethod + def _is_op_eligible(op: PhysicalOperator) -> bool: + """Whether the op is eligible for memory reservation.""" + return ( + not op.throttling_disabled() + # As long as the op has finished execution, even if there are still + # non-taken outputs, we don't need to allocate resources for it. + and not op.execution_finished() + ) + + def _get_downstream_eligible_ops( + self, op: PhysicalOperator + ) -> Iterable[PhysicalOperator]: + """Get the downstream eligible operators of the given operator, ignoring + intermediate ineligible operators. + + E.g., + - "cur_map->downstream_map" will return [downstream_map]. + - "cur_map->limit1->limit2->downstream_map" will return [downstream_map]. + """ + for next_op in op.output_dependencies: + if self._is_op_eligible(next_op): + yield next_op + else: + yield from self._get_downstream_eligible_ops(next_op) + + def _should_unblock_streaming_output_backpressure( + self, op: PhysicalOperator + ) -> bool: + # NOTE: If this operator is a terminal one, extracting outputs from it + # should not be throttled + if not op.output_dependencies: + return True + + # In some edge cases, the downstream operators may have no enough resources to + # launch tasks. Then we should temporarily unblock the streaming output + # backpressure by allowing reading at least 1 block. So the current operator + # can finish at least one task and yield resources to the downstream operators. + for downstream_op in self._get_downstream_eligible_ops(op): + if not self.can_submit_new_task(downstream_op): + # Case 1: the downstream operator hasn't reserved the minimum resources + # to run at least one task. + return True + + # Case 2: the downstream operator has reserved the minimum resources, but + # the resources are preempted by non-Data tasks or actors. + # We don't have a good way to detect this case, so we'll unblock + # backpressure when the downstream operator has been idle for a while. + if self._idle_detector.detect_idle(downstream_op): + return True + + return False + + +class ReservationOpResourceAllocator(OpResourceAllocator): + """An OpResourceAllocator implementation that reserves resources for each operator. + + This class reserves memory and CPU resources for eligible operators, and considers + runtime resource usages to limit the resources that each operator can use. + + It works in the following way: + 1. An operator is eligible for resource reservation, if it has enabled throttling + and hasn't completed. Ineligible operators are not throttled, but + their usage will be accounted for their upstream eligible operators. E.g., for + such a dataset "map1->limit->map2->streaming_split", we'll treat "map1->limit" as + a group and "map2->streaming_split" as another group. + 2. For each eligible operator, we reserve `reservation_ratio * global_resources / + num_eligible_ops` resources, half of which is reserved only for the operator + outputs, excluding pending task outputs. + 3. Non-reserved resources are shared among all operators. + 4. In each scheduling iteration, each eligible operator will get "remaining of their + own reserved resources" + "remaining of shared resources / num_eligible_ops" + resources. + + The `reservation_ratio` is set to 50% by default. Users can tune this value to + adjust how aggressive or conservative the resource allocation is. A higher value + will make the resource allocation more even, but may lead to underutilization and + worse performance. And vice versa. + """ + def __init__(self, resource_manager: ResourceManager, reservation_ratio: float): - super().__init__(resource_manager) + super().__init__(resource_manager._topology) + self._resource_manager = resource_manager self._reservation_ratio = reservation_ratio assert 0.0 <= self._reservation_ratio <= 1.0 # Per-op reserved resources, excluding `_reserved_for_op_outputs`. @@ -556,7 +667,7 @@ def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]: last_completed_ops = [] ops_to_exclude_from_reservation = [] # Traverse operator tree collecting all operators that have already finished - for op in self._resource_manager._topology: + for op in self._topology: if not op.execution_finished(): for dep in op.input_dependencies: if dep.execution_finished(): @@ -571,8 +682,7 @@ def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]: ops_to_exclude_from_reservation.append(op) return list(set(ops_to_exclude_from_reservation)) - def _update_reservation(self): - global_limits = self._resource_manager.get_global_limits().copy() + def _update_reservation(self, limits: ExecutionResources): eligible_ops = self._resource_manager.get_eligible_ops() self._op_reserved.clear() @@ -582,19 +692,11 @@ def _update_reservation(self): if len(eligible_ops) == 0: return - op_to_exclude_from_reservation = self._get_ineligible_ops_with_usage() - for completed_op in op_to_exclude_from_reservation: - global_limits = global_limits.subtract( - self._resource_manager.get_op_usage(completed_op) - ) - global_limits = global_limits.max(ExecutionResources.zero()) - remaining = global_limits.copy() + remaining = limits.copy() # Reserve `reservation_ratio * global_limits / num_ops` resources for each # operator. - default_reserved = global_limits.scale( - self._reservation_ratio / (len(eligible_ops)) - ) + default_reserved = limits.scale(self._reservation_ratio / (len(eligible_ops))) for index, op in enumerate(eligible_ops): # Reserve at least half of the default reserved resources for the outputs. # This makes sure that we will have enough budget to pull blocks from the @@ -603,10 +705,14 @@ def _update_reservation(self): 0, 0, max(default_reserved.object_store_memory / 2, 1) ) - min_resource_usage, max_resource_usage = op.min_max_resource_requirements() reserved_for_tasks = default_reserved.subtract(reserved_for_outputs) - reserved_for_tasks = reserved_for_tasks.max(min_resource_usage) - reserved_for_tasks = reserved_for_tasks.min(max_resource_usage) + + min_resource_usage, max_resource_usage = op.min_max_resource_requirements() + + if min_resource_usage is not None: + reserved_for_tasks = reserved_for_tasks.max(min_resource_usage) + if max_resource_usage is not None: + reserved_for_tasks = reserved_for_tasks.min(max_resource_usage) # Check if the remaining resources are enough for both reserved_for_tasks # and reserved_for_outputs. Note, we only consider CPU and GPU, but not @@ -647,30 +753,30 @@ def _update_reservation(self): self._total_shared = remaining + def can_submit_new_task(self, op: PhysicalOperator) -> bool: + """Return whether the given operator can submit a new task based on budget.""" + budget = self.get_budget(op) + if budget is None: + return True + return op.incremental_resource_usage().satisfies_limit(budget) + def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: return self._op_budgets.get(op) - def _should_unblock_streaming_output_backpressure( - self, op: PhysicalOperator - ) -> bool: - # In some edge cases, the downstream operators may have no enough resources to - # launch tasks. Then we should temporarily unblock the streaming output - # backpressure by allowing reading at least 1 block. So the current operator - # can finish at least one task and yield resources to the downstream operators. - for next_op in self._resource_manager.get_downstream_eligible_ops(op): - if not self._reserved_min_resources[next_op]: - # Case 1: the downstream operator hasn't reserved the minimum resources - # to run at least one task. - return True - # Case 2: the downstream operator has reserved the minimum resources, but - # the resources are preempted by non-Data tasks or actors. - # We don't have a good way to detect this case, so we'll unblock - # backpressure when the downstream operator has been idle for a while. - if self._idle_detector.detect_idle(next_op): - return True - return False + def get_output_budget(self, op: PhysicalOperator) -> Optional[int]: + return self._output_budgets.get(op) + + def get_allocation(self, op: PhysicalOperator) -> Optional[ExecutionResources]: + # TODO fix + return ExecutionResources.zero() - def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: + def max_task_output_bytes_to_read( + self, + op: PhysicalOperator, + *, + task_resource_usage: Dict[PhysicalOperator, ExecutionResources], + output_object_store_usage: Dict[PhysicalOperator, int], + ) -> Optional[int]: if op not in self._op_budgets: return None res = self._op_budgets[op].object_store_memory @@ -678,6 +784,7 @@ def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: op_outputs_usage = ( self._resource_manager.get_op_outputs_object_store_usage_with_downstream(op) ) + res += max(self._reserved_for_op_outputs[op] - op_outputs_usage, 0) if math.isinf(res): self._output_budgets[op] = res @@ -690,8 +797,20 @@ def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: self._output_budgets[op] = res return res - def update_usages(self): - self._update_reservation() + def update_budgets( + self, + *, + limits: ExecutionResources, + ): + op_to_exclude_from_reservation = self._get_ineligible_ops_with_usage() + for completed_op in op_to_exclude_from_reservation: + completed_op_usage = self._resource_manager.get_op_usage(completed_op) + + limits = limits.subtract(completed_op_usage) + limits = limits.max(ExecutionResources.zero()) + + # Remaining resources to be distributed across operators + remaining_shared = self._update_reservation(limits) self._op_budgets.clear() eligible_ops = self._resource_manager.get_eligible_ops() @@ -714,14 +833,17 @@ def update_usages(self): op ) op_mem_usage += max(op_outputs_usage - self._reserved_for_op_outputs[op], 0) + op_usage = self._resource_manager.get_op_usage(op).copy( object_store_memory=op_mem_usage ) + op_reserved = self._op_reserved[op] # How much of the reserved resources are remaining. op_reserved_remaining = op_reserved.subtract(op_usage).max( ExecutionResources.zero() ) + self._op_budgets[op] = op_reserved_remaining # How much of the reserved resources are exceeded. # If exceeded, we need to subtract from the remaining shared resources. @@ -766,8 +888,7 @@ def update_usages(self): # available num of GPUs. # 2. The cluster scales down, and the global limit decreases. target_num_gpu = max( - self._resource_manager.get_global_limits().gpu - - self._resource_manager.get_op_usage(op).gpu, + limits.gpu - self._resource_manager.get_op_usage(op).gpu, 0, ) else: diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 9c9cf589b32b..e5237478ed0e 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -24,7 +24,6 @@ from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer from ray.data._internal.execution.progress_manager import RichExecutionProgressManager from ray.data._internal.execution.resource_manager import ( - ReservationOpResourceAllocator, ResourceManager, ) from ray.data._internal.execution.streaming_executor_state import ( @@ -413,14 +412,13 @@ def _update_max_bytes_to_read_metric( self, op: PhysicalOperator, tags: Dict[str, str] ): if self._resource_manager.op_resource_allocator_enabled(): - ora = self._resource_manager.op_resource_allocator - assert isinstance(ora, ReservationOpResourceAllocator) - if op in ora._output_budgets: - max_bytes_to_read = ora._output_budgets[op] - if math.isinf(max_bytes_to_read): + resource_allocator = self._resource_manager.op_resource_allocator + output_budget_bytes = resource_allocator.get_output_budget(op) + if output_budget_bytes is not None: + if math.isinf(output_budget_bytes): # Convert inf to -1 to represent unlimited bytes to read - max_bytes_to_read = -1 - self._max_bytes_to_read_gauge.set(max_bytes_to_read, tags) + output_budget_bytes = -1 + self._max_bytes_to_read_gauge.set(output_budget_bytes, tags) def get_stats(self): """Return the stats object for the streaming execution. diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index a562d8f9feb2..c49b9d06f521 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -316,6 +316,18 @@ def total_enqueued_input_blocks(self) -> int: def has_pending_bundles(self) -> bool: return any(len(q) > 0 for q in self.input_queues) + def total_enqueued_input_blocks_bytes(self) -> int: + """Total number of bytes occupied by input bundles currently enqueued among: + 1. Input queue(s) pending dispatching (``OpState.input_queues``) + 2. Operator's internal queues (like ``MapOperator``s ref-bundler, etc) + """ + internal_queue_size_bytes = ( + self.op.internal_queue_num_bytes() + if isinstance(self.op, InternalQueueOperatorMixin) + else 0 + ) + return self.input_queue_bytes() + internal_queue_size_bytes + def update_display_metrics(self, resource_manager: ResourceManager): """Update display metrics with current metrics.""" usage = resource_manager.get_op_usage(self.op) @@ -396,7 +408,7 @@ def summary_str(self, resource_manager: ResourceManager) -> str: desc += f"; {_actor_info_summary_str(self.op.get_actor_info())}" # Queued blocks - desc += f"; Queued blocks: {self.total_enqueued_input_blocks()}" + desc += f"; Queued blocks: {self.total_enqueued_input_blocks()} ({memory_string(self.total_enqueued_input_blocks_bytes())})" desc += f"; Resources: {resource_manager.get_op_usage_str(self.op)}" # Any additional operator specific information. @@ -448,7 +460,7 @@ def get_output_blocking(self, output_split_idx: Optional[int]) -> RefBundle: return ref time.sleep(0.01) - def inqueue_memory_usage(self) -> int: + def input_queue_bytes(self) -> int: """Return the object store memory of this operator's inqueue.""" total = 0 for op, inq in zip(self.op.input_dependencies, self.input_queues): @@ -457,14 +469,10 @@ def inqueue_memory_usage(self) -> int: total += inq.memory_usage return total - def outqueue_memory_usage(self) -> int: + def output_queue_bytes(self) -> int: """Return the object store memory of this operator's outqueue.""" return self.output_queue.memory_usage - def outqueue_num_blocks(self) -> int: - """Return the number of blocks in this operator's outqueue.""" - return self.output_queue.num_blocks - def mark_finished(self, exception: Optional[Exception] = None): """Marks this operator as finished. Used for exiting get_output_blocking.""" if exception is None: diff --git a/python/ray/data/tests/test_backpressure_e2e.py b/python/ray/data/tests/test_backpressure_e2e.py index 64ccb8f2fffa..0c49e8196715 100644 --- a/python/ray/data/tests/test_backpressure_e2e.py +++ b/python/ray/data/tests/test_backpressure_e2e.py @@ -291,7 +291,7 @@ def range_(i): launched = ray.get(source.counter.get.remote()) # If backpressure is broken we'll launch 15+. - assert launched <= 10, launched + assert launched <= 12, launched def test_streaming_backpressure_e2e( diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index c1a103503707..cb2b6a5173d1 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -50,7 +50,7 @@ def test_basic(self): map_transformer=MagicMock(), data_context=DataContext.get_current(), input_op=map_op_no_concurrency, - concurrency=concurrency, + max_concurrency=concurrency, ) map_op.metrics.num_tasks_running = 0 map_op.metrics.num_tasks_finished = 0 diff --git a/python/ray/data/tests/test_execution_optimizer_advanced.py b/python/ray/data/tests/test_execution_optimizer_advanced.py index 624028137223..777dc95b06bb 100644 --- a/python/ray/data/tests/test_execution_optimizer_advanced.py +++ b/python/ray/data/tests/test_execution_optimizer_advanced.py @@ -163,7 +163,7 @@ def test_write_operator(ray_start_regular_shared_2_cpus, tmp_path): assert op.name == "Write" assert isinstance(physical_op, TaskPoolMapOperator) - assert physical_op._concurrency == concurrency + assert physical_op._max_concurrency == concurrency assert len(physical_op.input_dependencies) == 1 assert isinstance(physical_op.input_dependencies[0], MapOperator) diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index bc007694ed40..5dad34912b1c 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -1,10 +1,13 @@ import math import time +from typing import Any, Dict, Optional from unittest.mock import MagicMock, PropertyMock, patch import pytest import ray +from ray.data._internal.compute import ComputeStrategy +from ray.data._internal.execution.interfaces import PhysicalOperator from ray.data._internal.execution.interfaces.execution_options import ( ExecutionOptions, ExecutionResources, @@ -27,10 +30,11 @@ def mock_map_op( - input_op, - ray_remote_args=None, - compute_strategy=None, - incremental_resource_usage=None, + input_op: PhysicalOperator, + ray_remote_args: Optional[Dict[str, Any]] = None, + compute_strategy: Optional[ComputeStrategy] = None, + incremental_resource_usage: Optional[ExecutionResources] = None, + name="Map", ): op = MapOperator.create( MagicMock(), @@ -38,6 +42,7 @@ def mock_map_op( DataContext.get_current(), ray_remote_args=ray_remote_args or {}, compute_strategy=compute_strategy, + name=name, ) op.start = MagicMock(side_effect=lambda _: None) if incremental_resource_usage is not None: @@ -230,9 +235,16 @@ def test_update_usage(self): } for op in [o1, o2, o3]: + op.update_resource_usage = MagicMock() op.current_processor_usage = MagicMock( return_value=ExecutionResources(cpu=mock_cpu[op], gpu=0) ) + op.running_processor_usage = MagicMock( + return_value=ExecutionResources(cpu=mock_cpu[op], gpu=0) + ) + op.pending_processor_usage = MagicMock( + return_value=ExecutionResources.zero() + ) op.extra_resource_usage = MagicMock(return_value=ExecutionResources.zero()) op._metrics = MagicMock( obj_store_mem_pending_task_outputs=mock_pending_task_outputs[op], @@ -415,7 +427,9 @@ def mock_get_global_limits(): # Test initial state when no resources are used. global_limits = ExecutionResources(cpu=16, gpu=0, object_store_memory=1000) - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) # +-----+------------------+------------------+--------------+ # | | _op_reserved | _reserved_for | used shared | # | | (used/remaining) | _op_outputs | resources | @@ -441,8 +455,22 @@ def mock_get_global_limits(): assert allocator._op_budgets[o2] == ExecutionResources(8, 0, 375) assert allocator._op_budgets[o3] == ExecutionResources(8, 0, 375) # Test max_task_output_bytes_to_read. - assert allocator.max_task_output_bytes_to_read(o2) == 500 - assert allocator.max_task_output_bytes_to_read(o3) == 500 + assert ( + allocator.max_task_output_bytes_to_read( + o2, + task_resource_usage=op_usages, + output_object_store_usage=op_outputs_usages, + ) + == 500 + ) + assert ( + allocator.max_task_output_bytes_to_read( + o3, + task_resource_usage=op_usages, + output_object_store_usage=op_outputs_usages, + ) + == 500 + ) # Test when each operator uses some resources. op_usages[o2] = ExecutionResources(6, 0, 500) @@ -453,7 +481,9 @@ def mock_get_global_limits(): op_outputs_usages[o3] = 25 op_usages[o4] = ExecutionResources(0, 0, 50) - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) # +-----+------------------+------------------+--------------+ # | | _op_reserved | _reserved_for | used shared | # | | (used/remaining) | _op_outputs | resources | @@ -471,13 +501,29 @@ def mock_get_global_limits(): assert allocator._op_budgets[o3] == ExecutionResources(5, 0, 207) # Test max_task_output_bytes_to_read. # max_task_output_bytes_to_read(o2) = 112.5 + 25 = 138 (rounded up) - assert allocator.max_task_output_bytes_to_read(o2) == 138 + assert ( + allocator.max_task_output_bytes_to_read( + o2, + task_resource_usage=op_usages, + output_object_store_usage=op_outputs_usages, + ) + == 138 + ) # max_task_output_bytes_to_read(o3) = 207.5 + 50 = 257 (rounded down) - assert allocator.max_task_output_bytes_to_read(o3) == 257 + assert ( + allocator.max_task_output_bytes_to_read( + o3, + task_resource_usage=op_usages, + output_object_store_usage=op_outputs_usages, + ) + == 257 + ) # Test global_limits updated. global_limits = ExecutionResources(cpu=12, gpu=0, object_store_memory=800) - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) # +-----+------------------+------------------+--------------+ # | | _op_reserved | _reserved_for | used shared | # | | (used/remaining) | _op_outputs | resources | @@ -503,9 +549,23 @@ def mock_get_global_limits(): assert allocator._op_budgets[o3] == ExecutionResources(2.5, 0, 120) # Test max_task_output_bytes_to_read. # max_task_output_bytes_to_read(o2) = 50 + 0 = 50 - assert allocator.max_task_output_bytes_to_read(o2) == 50 + assert ( + allocator.max_task_output_bytes_to_read( + o2, + task_resource_usage=op_usages, + output_object_store_usage=op_outputs_usages, + ) + == 50 + ) # max_task_output_bytes_to_read(o3) = 120 + 25 = 145 - assert allocator.max_task_output_bytes_to_read(o3) == 145 + assert ( + allocator.max_task_output_bytes_to_read( + o3, + task_resource_usage=op_usages, + output_object_store_usage=op_outputs_usages, + ) + == 145 + ) def test_reserve_incremental_resource_usage(self, restore_data_context): """Test that we'll reserve at least incremental_resource_usage() @@ -521,6 +581,16 @@ def test_reserve_incremental_resource_usage(self, restore_data_context): o3 = mock_map_op(o2, incremental_resource_usage=incremental_usage) o4 = mock_map_op(o3, incremental_resource_usage=incremental_usage) o5 = mock_map_op(o4, incremental_resource_usage=incremental_usage) + + # Set min_max_resource_requirements to use incremental_resource_usage as minimum + for op in [o2, o3, o4, o5]: + op.min_max_resource_requirements = MagicMock( + return_value=( + incremental_usage, + ExecutionResources(cpu=100, gpu=0, object_store_memory=10000), + ) + ) + topo = build_streaming_topology(o5, ExecutionOptions()) resource_manager = ResourceManager( @@ -534,7 +604,9 @@ def test_reserve_incremental_resource_usage(self, restore_data_context): allocator = resource_manager._op_resource_allocator assert isinstance(allocator, ReservationOpResourceAllocator) - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) # incremental_usage should be reserved for o2. assert allocator._op_reserved[o2] == incremental_usage # Remaining resources are CPU = 7 - 3 = 4, object_store_memory = 800 - 500 = 300. @@ -588,7 +660,9 @@ def test_reserve_min_resources_for_gpu_ops( allocator = resource_manager._op_resource_allocator assert isinstance(allocator, ReservationOpResourceAllocator) - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) assert allocator._op_reserved[o2].object_store_memory == 800 @@ -618,7 +692,10 @@ def test_does_not_reserve_more_than_max_resource_usage(self): ) allocator = resource_manager._op_resource_allocator - allocator.update_usages() + global_limits = resource_manager.get_global_limits() + allocator.update_budgets( + limits=global_limits, + ) # The operator's max resource usage is 1 CPU and 1 byte object store memory, so # we'll reserve that despite the large global limits. @@ -650,13 +727,18 @@ def test_only_handle_eligible_ops(self, restore_data_context): allocator = resource_manager._op_resource_allocator assert isinstance(allocator, ReservationOpResourceAllocator) - allocator.update_usages() + global_limits = resource_manager.get_global_limits() + allocator.update_budgets( + limits=global_limits, + ) assert o1 not in allocator._op_budgets assert o2 in allocator._op_budgets assert o3 not in allocator._op_budgets o2.mark_execution_finished() - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) assert o2 not in allocator._op_budgets def test_gpu_allocation(self, restore_data_context): @@ -696,7 +778,9 @@ def test_gpu_allocation(self, restore_data_context): resource_manager.get_global_limits = MagicMock(return_value=global_limits) allocator = resource_manager._op_resource_allocator - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) # Non-GPU operator should get 0 GPU assert allocator._op_budgets[o2].gpu == 0 @@ -738,7 +822,9 @@ def test_multiple_gpu_operators(self, restore_data_context): resource_manager.get_global_limits = MagicMock(return_value=global_limits) allocator = resource_manager._op_resource_allocator - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) # o2: 4 total - 1 used = 3 available assert allocator._op_budgets[o2].gpu == 3 @@ -773,7 +859,9 @@ def test_gpu_usage_exceeds_global_limits(self, restore_data_context): resource_manager.get_global_limits = MagicMock(return_value=global_limits) allocator = resource_manager._op_resource_allocator - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) assert allocator._op_budgets[o2].gpu == 0 @@ -894,7 +982,9 @@ def test_reservation_accounts_for_completed_ops(self, restore_data_context): resource_manager.get_global_limits = MagicMock(return_value=global_limits) allocator = resource_manager._op_resource_allocator - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) # Check that o2's usage was subtracted from remaining resources # global_limits (10 CPU, 250 mem) - o1 usage (0) - o2 usage (2 CPU, 50 mem) = remaining (8 CPU, 200 mem) @@ -993,7 +1083,9 @@ def mock_get_global_limits(): allocator = resource_manager._op_resource_allocator global_limits = ExecutionResources(cpu=20, object_store_memory=2000) - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) """ global_limits (20 CPU, 2000 mem) - o2 usage (2 CPU, 150 mem) - o3 usage (2 CPU, 50 mem) - o5 usage (3 CPU, 100 mem) - o7 usage (1 CPU, 100 mem) = remaining (12 CPU, 1600 mem) +-----+------------------+------------------+--------------+ @@ -1044,7 +1136,9 @@ def mock_get_global_limits(): | op8 | 50/150 | 50/150 | 0 | +-----+------------------+------------------+--------------+ """ - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) assert allocator._op_budgets[o6] == ExecutionResources( cpu=4, object_store_memory=350 ) @@ -1054,7 +1148,9 @@ def mock_get_global_limits(): # Test when completed ops update the usage. op_usages[o5] = ExecutionResources.zero() - allocator.update_usages() + allocator.update_budgets( + limits=global_limits, + ) """ global_limits (20 CPU, 2000 mem) - o2 usage (2 CPU, 150 mem) - o3 usage (2 CPU, 50 mem) - o5 usage (0 CPU, 0 mem) - o7 usage (1 CPU, 100 mem) = remaining (15 CPU, 1700 mem) +-----+------------------+------------------+--------------+ diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index f28cf7419426..3a66d0be8c0e 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -105,6 +105,8 @@ def gen_expected_metrics( "'average_num_outputs_per_task': N", "'average_num_inputs_per_task': N", "'num_output_blocks_per_task_s': N", + "'average_total_task_completion_time_s': N", + "'average_task_completion_excl_backpressure_time_s': N", "'average_bytes_per_output': N", "'obj_store_mem_internal_inqueue': Z", "'obj_store_mem_internal_outqueue': Z", @@ -157,10 +159,8 @@ def gen_expected_metrics( "'block_completion_time': " f"{gen_histogram_values(histogram_buckets_s, 'N')}" ), - ( - "'task_completion_time_without_backpressure': " - f"{'N' if task_backpressure else 'Z'}" - ), + "'task_completion_time_s': N", + "'task_completion_time_excl_backpressure_s': N", ( "'block_size_bytes': " f"{gen_histogram_values(histogram_buckets_bytes, 'N')}" @@ -185,6 +185,8 @@ def gen_expected_metrics( "'average_num_outputs_per_task': None", "'average_num_inputs_per_task': None", "'num_output_blocks_per_task_s': None", + "'average_total_task_completion_time_s': None", + "'average_task_completion_excl_backpressure_time_s': None", "'average_bytes_per_output': None", "'obj_store_mem_internal_inqueue': Z", "'obj_store_mem_internal_outqueue': Z", @@ -237,8 +239,9 @@ def gen_expected_metrics( "'block_completion_time': " f"{gen_histogram_values(histogram_buckets_s, 'N')}" ), + ("'task_completion_time_s': " f"{'N' if task_backpressure else 'Z'}"), ( - "'task_completion_time_without_backpressure': " + "'task_completion_time_excl_backpressure_s': " f"{'N' if task_backpressure else 'Z'}" ), ( @@ -527,7 +530,7 @@ def test_large_args_scheduling_strategy( # ) map_extra_metrics = gen_extra_metrics_str( - LARGE_ARGS_EXTRA_METRICS_TASK_BACKPRESSURE, + LARGE_ARGS_EXTRA_METRICS, verbose_stats_logs, ) # if verbose_stats_logs: @@ -768,6 +771,8 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " average_num_outputs_per_task: N,\n" " average_num_inputs_per_task: N,\n" " num_output_blocks_per_task_s: N,\n" + " average_total_task_completion_time_s: N,\n" + " average_task_completion_excl_backpressure_time_s: N,\n" " average_bytes_per_output: N,\n" " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue: Z,\n" @@ -808,7 +813,8 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " task_output_backpressure_time: Z,\n" f" task_completion_time: {gen_histogram_metrics_value_str(histogram_buckets_s, 'N')},\n" f" block_completion_time: {gen_histogram_metrics_value_str(histogram_buckets_s, 'N')},\n" - " task_completion_time_without_backpressure: N,\n" + " task_completion_time_s: N,\n" + " task_completion_time_excl_backpressure_s: N,\n" f" block_size_bytes: {gen_histogram_metrics_value_str(histogram_buckets_bytes, 'N')},\n" f" block_size_rows: {gen_histogram_metrics_value_str(histogram_bucket_rows, 'N')},\n" " num_alive_actors: Z,\n" @@ -905,6 +911,8 @@ def check_stats(): " average_num_outputs_per_task: N,\n" " average_num_inputs_per_task: N,\n" " num_output_blocks_per_task_s: N,\n" + " average_total_task_completion_time_s: N,\n" + " average_task_completion_excl_backpressure_time_s: N,\n" " average_bytes_per_output: N,\n" " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue: Z,\n" @@ -945,7 +953,8 @@ def check_stats(): " task_output_backpressure_time: Z,\n" f" task_completion_time: {gen_histogram_metrics_value_str(histogram_buckets_s, 'N')},\n" f" block_completion_time: {gen_histogram_metrics_value_str(histogram_buckets_s, 'N')},\n" - " task_completion_time_without_backpressure: N,\n" + " task_completion_time_s: N,\n" + " task_completion_time_excl_backpressure_s: N,\n" f" block_size_bytes: {gen_histogram_metrics_value_str(histogram_buckets_bytes, 'N')},\n" f" block_size_rows: {gen_histogram_metrics_value_str(histogram_bucket_rows, 'N')},\n" " num_alive_actors: Z,\n" @@ -997,6 +1006,8 @@ def check_stats(): " average_num_outputs_per_task: N,\n" " average_num_inputs_per_task: N,\n" " num_output_blocks_per_task_s: N,\n" + " average_total_task_completion_time_s: N,\n" + " average_task_completion_excl_backpressure_time_s: N,\n" " average_bytes_per_output: N,\n" " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue: Z,\n" @@ -1037,7 +1048,8 @@ def check_stats(): " task_output_backpressure_time: Z,\n" f" task_completion_time: {gen_histogram_metrics_value_str(histogram_buckets_s, 'N')},\n" f" block_completion_time: {gen_histogram_metrics_value_str(histogram_buckets_s, 'N')},\n" - " task_completion_time_without_backpressure: N,\n" + " task_completion_time_s: N,\n" + " task_completion_time_excl_backpressure_s: N,\n" f" block_size_bytes: {gen_histogram_metrics_value_str(histogram_buckets_bytes, 'N')},\n" f" block_size_rows: {gen_histogram_metrics_value_str(histogram_bucket_rows, 'N')},\n" " num_alive_actors: Z,\n" @@ -1681,7 +1693,7 @@ def time_to_seconds(time_str): assert total_percent == 100 for time_s, percent in metrics_dict.values(): - assert time_s < total_time + assert time_s <= total_time # Check percentage, this is done with some expected loss of precision # due to rounding in the intital output. assert isclose(percent, time_s / total_time * 100, rel_tol=0.01) diff --git a/python/ray/data/tests/test_task_pool_map_operator.py b/python/ray/data/tests/test_task_pool_map_operator.py index fa1e41452930..700df87c7fef 100644 --- a/python/ray/data/tests/test_task_pool_map_operator.py +++ b/python/ray/data/tests/test_task_pool_map_operator.py @@ -28,7 +28,7 @@ def test_min_max_resource_requirements(ray_start_regular_shared, restore_data_co assert ( # At a minimum, you need enough processors to run one task and enough object # store memory for a pending task. - min_resource_usage_bound == ExecutionResources(cpu=1, object_store_memory=3) + min_resource_usage_bound == ExecutionResources.zero() and max_resource_usage_bound == ExecutionResources.for_limits() )