Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ def __init__(self, *args, **kwargs):

# Initialize caps from operators (infinite if unset)
for op, _ in self._topology.items():
if isinstance(op, TaskPoolMapOperator) and op.get_concurrency() is not None:
self._concurrency_caps[op] = op.get_concurrency()
if (
isinstance(op, TaskPoolMapOperator)
and op.get_max_concurrency_limit() is not None
):
self._concurrency_caps[op] = op.get_max_concurrency_limit()
else:
self._concurrency_caps[op] = float("inf")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class ResourceBudgetBackpressurePolicy(BackpressurePolicy):
"""A backpressure policy based on resource budgets in ResourceManager."""

def can_add_input(self, op: "PhysicalOperator") -> bool:
budget = self._resource_manager.get_budget(op)
if budget is None:
return True
return op.incremental_resource_usage().satisfies_limit(budget)
if self._resource_manager._op_resource_allocator is not None:
return self._resource_manager._op_resource_allocator.can_submit_new_task(op)

return True

def max_task_output_bytes_to_read(self, op: "PhysicalOperator") -> Optional[int]:
"""Determine maximum bytes to read based on the resource budgets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,12 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
metrics_type=MetricsType.Histogram,
metrics_args={"boundaries": histogram_buckets_s},
)
task_completion_time_without_backpressure: float = metric_field(
task_completion_time_s: float = metric_field(
default=0,
description="Time spent running tasks to completion.",
metrics_group=MetricsGroup.TASKS,
)
task_completion_time_excl_backpressure_s: float = metric_field(
default=0,
description="Time spent running tasks to completion without backpressure.",
metrics_group=MetricsGroup.TASKS,
Expand Down Expand Up @@ -678,6 +683,30 @@ def num_output_blocks_per_task_s(self) -> Optional[float]:
else:
return self.num_task_outputs_generated / self.block_generation_time

@metric_property(
description="Average task's completion time in seconds (including throttling).",
metrics_group=MetricsGroup.TASKS,
)
def average_total_task_completion_time_s(self) -> Optional[float]:
"""Average task's completion time in seconds (including throttling)"""
if self.num_tasks_finished == 0:
return None
else:
return self.task_completion_time_s / self.num_tasks_finished

@metric_property(
description="Average task's completion time in seconds (excluding throttling).",
metrics_group=MetricsGroup.TASKS,
)
def average_task_completion_excl_backpressure_time_s(self) -> Optional[float]:
"""Average task's completion time in seconds (excluding throttling)"""
if self.num_tasks_finished == 0:
return None
else:
return (
self.task_completion_time_excl_backpressure_s / self.num_tasks_finished
)

@metric_property(
description="Average size of task output in bytes.",
metrics_group=MetricsGroup.OUTPUTS,
Expand Down Expand Up @@ -981,8 +1010,10 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
self.bytes_outputs_of_finished_tasks += task_info.bytes_outputs
self.rows_outputs_of_finished_tasks += task_info.num_rows_produced

task_time_delta = time.perf_counter() - task_info.start_time
self.task_completion_time_s += task_time_delta

with self._histogram_thread_lock:
task_time_delta = time.perf_counter() - task_info.start_time
bucket_index = find_bucket_index(histogram_buckets_s, task_time_delta)
self.task_completion_time[bucket_index] += 1

Expand All @@ -997,7 +1028,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
# NOTE: This is used for Issue Detection
self._op_task_duration_stats.add_duration(task_time_delta)

self.task_completion_time_without_backpressure += task_info.cum_block_gen_time
self.task_completion_time_excl_backpressure_s += task_info.cum_block_gen_time
inputs = self._running_tasks[task_index].inputs
self.num_task_inputs_processed += len(inputs)
total_input_size = inputs.size_bytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,13 +702,12 @@ def pending_processor_usage(self) -> ExecutionResources:
def min_max_resource_requirements(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
"""Returns the min and max resources to start the operator and make progress.
"""Returns lower/upper boundary of resource requirements for this operator:

For example, an operator that creates an actor pool requiring 8 GPUs could
return ExecutionResources(gpu=8) as its minimum usage.

This method is used by the resource manager to reserve minimum resources and to
ensure that it doesn't over-provision resources.
- Minimal: lower bound (min) of resources required to start this operator
(for most operators this is 0, except the ones that utilize actors)
- Maximum: upper bound (max) of how many resources this operator could
utilize.
"""
return ExecutionResources.zero(), ExecutionResources.inf()

Expand Down Expand Up @@ -806,6 +805,11 @@ def upstream_op_num_outputs(self):
)
return upstream_op_num_outputs

def get_max_concurrency_limit(self) -> Optional[int]:
"""Max value of how many tasks this operator could run
concurrently (if limited)"""
return None


class ReportsExtraResourceUsage(abc.ABC):
@abc.abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def __init__(
self._actor_cls = None
# Whether no more submittable bundles will be added.
self._inputs_done = False
self._actor_locality_enabled: Optional[bool] = None

# Locality metrics
self._locality_hits = 0
Expand Down Expand Up @@ -496,14 +497,13 @@ def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
def per_task_resource_allocation(
self: "PhysicalOperator",
) -> ExecutionResources:
max_concurrency = self._ray_remote_args.get("max_concurrency", 1)
# For Actor tasks resource allocation is determined as:
# - Per actor resource allocation divided by
# - Actor's max task concurrency
max_concurrency = self._actor_pool.max_actor_concurrency()
per_actor_resource_usage = self._actor_pool.per_actor_resource_usage()
return per_actor_resource_usage.scale(1 / max_concurrency)

def max_task_concurrency(self: "PhysicalOperator") -> Optional[int]:
max_concurrency = self._ray_remote_args.get("max_concurrency", 1)
return max_concurrency * self._actor_pool.max_size()

def min_scheduling_resources(
self: "PhysicalOperator",
) -> ExecutionResources:
Expand Down Expand Up @@ -531,6 +531,9 @@ def get_actor_info(self) -> _ActorPoolInfo:
"""Returns Actor counts for Alive, Restarting and Pending Actors."""
return self._actor_pool.get_actor_info()

def get_max_concurrency_limit(self) -> Optional[int]:
return self._actor_pool.max_size() * self._actor_pool.max_actor_concurrency()


class _MapWorker:
"""An actor worker for MapOperator."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def create(
name=name,
target_max_block_size_override=target_max_block_size_override,
min_rows_per_bundle=min_rows_per_bundle,
concurrency=compute_strategy.size,
max_concurrency=compute_strategy.size,
supports_fusion=supports_fusion,
map_task_kwargs=map_task_kwargs,
ray_remote_args_fn=ray_remote_args_fn,
Expand Down Expand Up @@ -518,12 +518,6 @@ def current_processor_usage(self) -> ExecutionResources:
def pending_processor_usage(self) -> ExecutionResources:
raise NotImplementedError

@abstractmethod
def min_max_resource_requirements(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
...

@abstractmethod
def incremental_resource_usage(self) -> ExecutionResources:
raise NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import warnings
from typing import Any, Callable, Dict, Optional, Tuple
from typing import Any, Callable, Dict, Optional

from ray.data._internal.execution.interfaces import (
ExecutionResources,
Expand All @@ -24,7 +24,7 @@ def __init__(
name: str = "TaskPoolMap",
target_max_block_size_override: Optional[int] = None,
min_rows_per_bundle: Optional[int] = None,
concurrency: Optional[int] = None,
max_concurrency: Optional[int] = None,
supports_fusion: bool = True,
map_task_kwargs: Optional[Dict[str, Any]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
Expand All @@ -41,7 +41,7 @@ def __init__(
transform_fn, or None to use the block size. Setting the batch size is
important for the performance of GPU-accelerated transform functions.
The actual rows passed may be less if the dataset is small.
concurrency: The maximum number of Ray tasks to use concurrently,
max_concurrency: The maximum number of Ray tasks to use concurrently,
or None to use as many tasks as possible.
supports_fusion: Whether this operator supports fusion with other operators.
map_task_kwargs: A dictionary of kwargs to pass to the map task. You can
Expand All @@ -66,7 +66,11 @@ def __init__(
ray_remote_args_fn,
ray_remote_args,
)
self._concurrency = concurrency

if max_concurrency is not None and max_concurrency <= 0:
raise ValueError(f"max_concurrency have to be > 0 (got {max_concurrency})")

self._max_concurrency = max_concurrency

# NOTE: Unlike static Ray remote args, dynamic arguments extracted from the
# blocks themselves are going to be passed inside `fn.options(...)`
Expand Down Expand Up @@ -115,11 +119,6 @@ def _add_bundled_input(self, bundle: RefBundle):
def progress_str(self) -> str:
return ""

def min_max_resource_requirements(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
return self.incremental_resource_usage(), ExecutionResources.for_limits()

def current_processor_usage(self) -> ExecutionResources:
num_active_workers = self.num_active_tasks()
return ExecutionResources(
Expand All @@ -131,40 +130,37 @@ def pending_processor_usage(self) -> ExecutionResources:
return ExecutionResources()

def incremental_resource_usage(self) -> ExecutionResources:
return self.per_task_resource_allocation().copy(
object_store_memory=(
self._metrics.obj_store_mem_max_pending_output_per_task or 0
),
)

def per_task_resource_allocation(self) -> ExecutionResources:
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0),
gpu=self._ray_remote_args.get("num_gpus", 0),
memory=self._ray_remote_args.get("memory", 0),
object_store_memory=self._metrics.obj_store_mem_max_pending_output_per_task
or 0,
)

def per_task_resource_allocation(
self: "PhysicalOperator",
) -> ExecutionResources:
return self.incremental_resource_usage()

def max_task_concurrency(self: "PhysicalOperator") -> Optional[int]:
return self._concurrency

def min_scheduling_resources(
self: "PhysicalOperator",
) -> ExecutionResources:
return self.incremental_resource_usage()

def get_concurrency(self) -> Optional[int]:
return self._concurrency
def get_max_concurrency_limit(self) -> Optional[int]:
return self._max_concurrency

def all_inputs_done(self):
super().all_inputs_done()

if (
self._concurrency is not None
and self._metrics.num_inputs_received < self._concurrency
self._max_concurrency is not None
and self._metrics.num_inputs_received < self._max_concurrency
):
warnings.warn(
f"The maximum number of concurrent tasks for '{self.name}' is set to "
f"{self._concurrency}, but the operator only received "
f"{self._max_concurrency}, but the operator only received "
f"{self._metrics.num_inputs_received} input(s). This means that the "
f"operator can launch at most {self._metrics.num_inputs_received} "
"task(s), which is less than the concurrency limit. You might be able "
Expand Down
Loading