forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Data] Add
BundleQueue
abstraction (ray-project#48503)
Two reasons for adding this abstraction: 1. It allows us to customize how we pop bundles from OpState and internal queues. Currently, we pop the first-in bundle, but you might want a less naive strategy. 2. It allows us to keep track of bundles in OpRuntimeMetrics so that we can refresh stale bundle sizes. --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
- Loading branch information
1 parent
5baf0a3
commit 4cd8b7c
Showing
11 changed files
with
440 additions
and
71 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from .bundle_queue import BundleQueue | ||
from .fifo_bundle_queue import FIFOBundleQueue | ||
|
||
|
||
def create_bundle_queue() -> BundleQueue: | ||
return FIFOBundleQueue() | ||
|
||
|
||
__all__ = ["BundleQueue", "create_bundle_queue"] |
59 changes: 59 additions & 0 deletions
59
python/ray/data/_internal/execution/bundle_queue/bundle_queue.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import abc | ||
from typing import TYPE_CHECKING, Optional | ||
|
||
if TYPE_CHECKING: | ||
from ray.data._internal.execution.interfaces import RefBundle | ||
|
||
|
||
class BundleQueue(abc.ABC): | ||
@abc.abstractmethod | ||
def __len__(self) -> int: | ||
"""Return the number of bundles in the queue.""" | ||
... | ||
|
||
@abc.abstractmethod | ||
def __contains__(self, bundle: "RefBundle") -> bool: | ||
"""Return whether the bundle is in the queue.""" | ||
... | ||
|
||
@abc.abstractmethod | ||
def add(self, bundle: "RefBundle") -> None: | ||
"""Add a bundle to the queue.""" | ||
... | ||
|
||
@abc.abstractmethod | ||
def pop(self) -> "RefBundle": | ||
"""Remove and return the head of the queue. | ||
Raises: | ||
IndexError: If the queue is empty. | ||
""" | ||
... | ||
|
||
@abc.abstractmethod | ||
def peek(self) -> Optional["RefBundle"]: | ||
"""Return the head of the queue without removing it. | ||
If the queue is empty, return `None`. | ||
""" | ||
... | ||
|
||
@abc.abstractmethod | ||
def remove(self, bundle: "RefBundle"): | ||
"""Remove a bundle from the queue.""" | ||
... | ||
|
||
@abc.abstractmethod | ||
def clear(self): | ||
"""Remove all bundles from the queue.""" | ||
... | ||
|
||
@abc.abstractmethod | ||
def estimate_size_bytes(self) -> int: | ||
"""Return an estimate of the total size of objects in the queue.""" | ||
... | ||
|
||
@abc.abstractmethod | ||
def is_empty(self): | ||
"""Return whether this queue and all of its internal data structures are empty. | ||
This method is used for testing. | ||
""" | ||
... |
129 changes: 129 additions & 0 deletions
129
python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
from collections import defaultdict, deque | ||
from dataclasses import dataclass | ||
from typing import TYPE_CHECKING, Dict, List, Optional | ||
|
||
from .bundle_queue import BundleQueue | ||
|
||
if TYPE_CHECKING: | ||
from ray.data._internal.execution.interfaces import RefBundle | ||
|
||
|
||
@dataclass | ||
class _Node: | ||
value: "RefBundle" | ||
next: Optional["_Node"] = None | ||
prev: Optional["_Node"] = None | ||
|
||
|
||
class FIFOBundleQueue(BundleQueue): | ||
"""A bundle queue that follows a first-in-first-out policy.""" | ||
|
||
def __init__(self): | ||
# We manually implement a linked list because we need to remove elements | ||
# efficiently, and Python's built-in data structures have O(n) removal time. | ||
self._head: Optional[_Node] = None | ||
self._tail: Optional[_Node] = None | ||
# We use a dictionary to keep track of the nodes corresponding to each bundle. | ||
# This allows us to remove a bundle from the queue in O(1) time. We need a list | ||
# because a bundle can be added to the queue multiple times. Nodes in each list | ||
# are insertion-ordered. | ||
self._bundle_to_nodes: Dict["RefBundle", List[_Node]] = defaultdict(deque) | ||
|
||
self._nbytes = 0 | ||
self._num_bundles = 0 | ||
|
||
def __len__(self) -> int: | ||
return self._num_bundles | ||
|
||
def __contains__(self, bundle: "RefBundle") -> bool: | ||
return bundle in self._bundle_to_nodes | ||
|
||
def add(self, bundle: "RefBundle") -> None: | ||
"""Add a bundle to the end (right) of the queue.""" | ||
new_node = _Node(value=bundle, next=None, prev=self._tail) | ||
# Case 1: The queue is empty. | ||
if self._head is None: | ||
assert self._tail is None | ||
self._head = new_node | ||
self._tail = new_node | ||
# Case 2: The queue has at least one element. | ||
else: | ||
self._tail.next = new_node | ||
self._tail = new_node | ||
|
||
self._bundle_to_nodes[bundle].append(new_node) | ||
|
||
self._nbytes += bundle.size_bytes() | ||
self._num_bundles += 1 | ||
|
||
def pop(self) -> "RefBundle": | ||
"""Return the first (left) bundle in the queue.""" | ||
# Case 1: The queue is empty. | ||
if not self._head: | ||
raise IndexError("You can't pop from an empty queue") | ||
|
||
bundle = self._head.value | ||
self.remove(bundle) | ||
|
||
return bundle | ||
|
||
def peek(self) -> Optional["RefBundle"]: | ||
"""Return the first (left) bundle in the queue without removing it.""" | ||
if self._head is None: | ||
return None | ||
|
||
return self._head.value | ||
|
||
def remove(self, bundle: "RefBundle"): | ||
"""Remove a bundle from the queue. | ||
If there are multiple instances of the bundle in the queue, this method only | ||
removes the first one. | ||
""" | ||
# Case 1: The queue is empty. | ||
if bundle not in self._bundle_to_nodes: | ||
raise ValueError(f"The bundle {bundle} is not in the queue.") | ||
|
||
node = self._bundle_to_nodes[bundle].popleft() | ||
if not self._bundle_to_nodes[bundle]: | ||
del self._bundle_to_nodes[bundle] | ||
|
||
# Case 2: The bundle is the only element in the queue. | ||
if self._head is self._tail: | ||
self._head = None | ||
self._tail = None | ||
# Case 3: The bundle is the first element in the queue. | ||
elif node is self._head: | ||
self._head = node.next | ||
self._head.prev = None | ||
# Case 4: The bundle is the last element in the queue. | ||
elif node is self._tail: | ||
self._tail = node.prev | ||
self._tail.next = None | ||
# Case 5: The bundle is in the middle of the queue. | ||
else: | ||
node.prev.next = node.next | ||
node.next.prev = node.prev | ||
|
||
self._nbytes -= bundle.size_bytes() | ||
assert self._nbytes >= 0, ( | ||
"Expected the total size of objects in the queue to be non-negative, but " | ||
f"got {self._nbytes} bytes instead." | ||
) | ||
|
||
self._num_bundles -= 1 | ||
|
||
return node.value | ||
|
||
def clear(self): | ||
self._head = None | ||
self._tail = None | ||
self._bundle_to_nodes.clear() | ||
self._nbytes = 0 | ||
self._num_bundles = 0 | ||
|
||
def estimate_size_bytes(self) -> int: | ||
return self._nbytes | ||
|
||
def is_empty(self): | ||
return not self._bundle_to_nodes and self._head is None and self._tail is None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.