Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "[Data] Add BundleQueue abstraction (#48503)" (#48612)" #48686

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_bundle_queue",
size = "small",
srcs = ["tests/test_bundle_queue.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_autoscaler",
size = "small",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .bundle_queue import BundleQueue
from .fifo_bundle_queue import FIFOBundleQueue


def create_bundle_queue() -> BundleQueue:
return FIFOBundleQueue()


__all__ = ["BundleQueue", "create_bundle_queue"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import abc
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import RefBundle


class BundleQueue(abc.ABC):
@abc.abstractmethod
def __len__(self) -> int:
"""Return the number of bundles in the queue."""
...

@abc.abstractmethod
def __contains__(self, bundle: "RefBundle") -> bool:
"""Return whether the bundle is in the queue."""
...

@abc.abstractmethod
def add(self, bundle: "RefBundle") -> None:
"""Add a bundle to the queue."""
...

@abc.abstractmethod
def pop(self) -> "RefBundle":
"""Remove and return the head of the queue.

Raises:
IndexError: If the queue is empty.
"""
...

@abc.abstractmethod
def peek(self) -> Optional["RefBundle"]:
"""Return the head of the queue without removing it.

If the queue is empty, return `None`.
"""
...

@abc.abstractmethod
def remove(self, bundle: "RefBundle"):
"""Remove a bundle from the queue."""
...

@abc.abstractmethod
def clear(self):
"""Remove all bundles from the queue."""
...

@abc.abstractmethod
def estimate_size_bytes(self) -> int:
"""Return an estimate of the total size of objects in the queue."""
...

@abc.abstractmethod
def is_empty(self):
"""Return whether this queue and all of its internal data structures are empty.

This method is used for testing.
"""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import TYPE_CHECKING, Dict, List, Optional

from .bundle_queue import BundleQueue

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import RefBundle


@dataclass
class _Node:
value: "RefBundle"
next: Optional["_Node"] = None
prev: Optional["_Node"] = None


class FIFOBundleQueue(BundleQueue):
"""A bundle queue that follows a first-in-first-out policy."""

def __init__(self):
# We manually implement a linked list because we need to remove elements
# efficiently, and Python's built-in data structures have O(n) removal time.
self._head: Optional[_Node] = None
self._tail: Optional[_Node] = None
# We use a dictionary to keep track of the nodes corresponding to each bundle.
# This allows us to remove a bundle from the queue in O(1) time. We need a list
# because a bundle can be added to the queue multiple times. Nodes in each list
# are insertion-ordered.
self._bundle_to_nodes: Dict["RefBundle", List[_Node]] = defaultdict(deque)

self._nbytes = 0
self._num_bundles = 0

def __len__(self) -> int:
return self._num_bundles

def __contains__(self, bundle: "RefBundle") -> bool:
return bundle in self._bundle_to_nodes

def add(self, bundle: "RefBundle") -> None:
"""Add a bundle to the end (right) of the queue."""
new_node = _Node(value=bundle, next=None, prev=self._tail)
# Case 1: The queue is empty.
if self._head is None:
assert self._tail is None
self._head = new_node
self._tail = new_node
# Case 2: The queue has at least one element.
else:
self._tail.next = new_node
self._tail = new_node

self._bundle_to_nodes[bundle].append(new_node)

self._nbytes += bundle.size_bytes()
self._num_bundles += 1

def pop(self) -> "RefBundle":
"""Return the first (left) bundle in the queue."""
# Case 1: The queue is empty.
if not self._head:
raise IndexError("You can't pop from an empty queue")

bundle = self._head.value
self.remove(bundle)

return bundle

def peek(self) -> Optional["RefBundle"]:
"""Return the first (left) bundle in the queue without removing it."""
if self._head is None:
return None

return self._head.value

def remove(self, bundle: "RefBundle"):
"""Remove a bundle from the queue.

If there are multiple instances of the bundle in the queue, this method only
removes the first one.
"""
# Case 1: The queue is empty.
if bundle not in self._bundle_to_nodes:
raise ValueError(f"The bundle {bundle} is not in the queue.")

node = self._bundle_to_nodes[bundle].popleft()
if not self._bundle_to_nodes[bundle]:
del self._bundle_to_nodes[bundle]

# Case 2: The bundle is the only element in the queue.
if self._head is self._tail:
self._head = None
self._tail = None
# Case 3: The bundle is the first element in the queue.
elif node is self._head:
self._head = node.next
self._head.prev = None
# Case 4: The bundle is the last element in the queue.
elif node is self._tail:
self._tail = node.prev
self._tail.next = None
# Case 5: The bundle is in the middle of the queue.
else:
node.prev.next = node.next
node.next.prev = node.prev

self._nbytes -= bundle.size_bytes()
assert self._nbytes >= 0, (
"Expected the total size of objects in the queue to be non-negative, but "
f"got {self._nbytes} bytes instead."
)

self._num_bundles -= 1

return node.value

def clear(self):
self._head = None
self._tail = None
self._bundle_to_nodes.clear()
self._nbytes = 0
self._num_bundles = 0

def estimate_size_bytes(self) -> int:
return self._nbytes

def is_empty(self):
return not self._bundle_to_nodes and self._head is None and self._tail is None
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional

import ray
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.memory_tracing import trace_allocation

Expand Down Expand Up @@ -267,31 +268,11 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
description="Number of blocks in operator's internal input queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_inqueue: int = metric_field(
default=0,
description=(
"Byte size of input blocks in the operator's internal input queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_outqueue_blocks: int = metric_field(
default=0,
description="Number of blocks in the operator's internal output queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_outqueue: int = metric_field(
default=0,
description=(
"Byte size of output blocks in the operator's internal output queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_pending_task_inputs: int = metric_field(
default=0,
description="Byte size of input blocks used by pending tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
map_only=True,
)
obj_store_mem_freed: int = metric_field(
default=0,
description="Byte size of freed memory in object store.",
Expand Down Expand Up @@ -323,6 +304,10 @@ def __init__(self, op: "PhysicalOperator"):
# Start time of current pause due to task submission backpressure
self._task_submission_backpressure_start_time = -1

self._internal_inqueue = create_bundle_queue()
self._internal_outqueue = create_bundle_queue()
self._pending_task_inputs = create_bundle_queue()

@property
def extra_metrics(self) -> Dict[str, Any]:
"""Return a dict of extra metrics."""
Expand Down Expand Up @@ -377,6 +362,30 @@ def average_bytes_per_output(self) -> Optional[float]:
else:
return self.bytes_task_outputs_generated / self.num_task_outputs_generated

@metric_property(
description="Byte size of input blocks in the operator's internal input queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_internal_inqueue(self) -> int:
return self._internal_inqueue.estimate_size_bytes()

@metric_property(
description=(
"Byte size of output blocks in the operator's internal output queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_internal_outqueue(self) -> int:
return self._internal_outqueue.estimate_size_bytes()

@metric_property(
description="Byte size of input blocks used by pending tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
map_only=True,
)
def obj_store_mem_pending_task_inputs(self) -> int:
return self._pending_task_inputs.estimate_size_bytes()

@property
def obj_store_mem_pending_task_outputs(self) -> Optional[float]:
"""Estimated size in bytes of output blocks in Ray generator buffers.
Expand Down Expand Up @@ -454,13 +463,13 @@ def on_input_received(self, input: RefBundle):
def on_input_queued(self, input: RefBundle):
"""Callback when the operator queues an input."""
self.obj_store_mem_internal_inqueue_blocks += len(input.blocks)
self.obj_store_mem_internal_inqueue += input.size_bytes()
self._internal_inqueue.add(input)

def on_input_dequeued(self, input: RefBundle):
"""Callback when the operator dequeues an input."""
self.obj_store_mem_internal_inqueue_blocks -= len(input.blocks)
input_size = input.size_bytes()
self.obj_store_mem_internal_inqueue -= input_size
self._internal_inqueue.remove(input)
assert self.obj_store_mem_internal_inqueue >= 0, (
self._op,
self.obj_store_mem_internal_inqueue,
Expand All @@ -470,13 +479,13 @@ def on_input_dequeued(self, input: RefBundle):
def on_output_queued(self, output: RefBundle):
"""Callback when an output is queued by the operator."""
self.obj_store_mem_internal_outqueue_blocks += len(output.blocks)
self.obj_store_mem_internal_outqueue += output.size_bytes()
self._internal_outqueue.add(output)

def on_output_dequeued(self, output: RefBundle):
"""Callback when an output is dequeued by the operator."""
self.obj_store_mem_internal_outqueue_blocks -= len(output.blocks)
output_size = output.size_bytes()
self.obj_store_mem_internal_outqueue -= output_size
self._internal_outqueue.remove(output)
assert self.obj_store_mem_internal_outqueue >= 0, (
self._op,
self.obj_store_mem_internal_outqueue,
Expand Down Expand Up @@ -504,7 +513,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle):
self.num_tasks_submitted += 1
self.num_tasks_running += 1
self.bytes_inputs_of_submitted_tasks += inputs.size_bytes()
self.obj_store_mem_pending_task_inputs += inputs.size_bytes()
self._pending_task_inputs.add(inputs)
self._running_tasks[task_index] = RunningTaskInfo(inputs, 0, 0)

def on_task_output_generated(self, task_index: int, output: RefBundle):
Expand Down Expand Up @@ -544,7 +553,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
total_input_size = inputs.size_bytes()
self.bytes_task_inputs_processed += total_input_size
input_size = inputs.size_bytes()
self.obj_store_mem_pending_task_inputs -= input_size
self._pending_task_inputs.remove(inputs)
assert self.obj_store_mem_pending_task_inputs >= 0, (
self._op,
self.obj_store_mem_pending_task_inputs,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import collections
import logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
Expand All @@ -8,6 +7,7 @@
from ray.core.generated import gcs_pb2
from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.execution.autoscaler import AutoscalingActorPool
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__(

self._actor_pool = _ActorPool(compute_strategy, self._start_actor)
# A queue of bundles awaiting dispatch to actors.
self._bundle_queue = collections.deque()
self._bundle_queue = create_bundle_queue()
# Cached actor class.
self._cls = None
# Whether no more submittable bundles will be added.
Expand Down Expand Up @@ -175,7 +175,7 @@ def _task_done_callback(res_ref):
return actor, res_ref

def _add_bundled_input(self, bundle: RefBundle):
self._bundle_queue.append(bundle)
self._bundle_queue.add(bundle)
self._metrics.on_input_queued(bundle)
# Try to dispatch all bundles in the queue, including this new bundle.
self._dispatch_tasks()
Expand All @@ -191,14 +191,14 @@ def _dispatch_tasks(self):
while self._bundle_queue:
# Pick an actor from the pool.
if self._actor_locality_enabled:
actor = self._actor_pool.pick_actor(self._bundle_queue[0])
actor = self._actor_pool.pick_actor(self._bundle_queue.peek())
else:
actor = self._actor_pool.pick_actor()
if actor is None:
# No actors available for executing the next task.
break
# Submit the map task.
bundle = self._bundle_queue.popleft()
bundle = self._bundle_queue.pop()
self._metrics.on_input_dequeued(bundle)
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(
Expand Down
Loading
Loading