From 4cd8b7c1c5bc4a6521f3746420767a2264de7a9b Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 4 Nov 2024 14:52:11 -0800 Subject: [PATCH] [Data] Add `BundleQueue` abstraction (#48503) Two reasons for adding this abstraction: 1. It allows us to customize how we pop bundles from OpState and internal queues. Currently, we pop the first-in bundle, but you might want a less naive strategy. 2. It allows us to keep track of bundles in OpRuntimeMetrics so that we can refresh stale bundle sizes. --------- Signed-off-by: Balaji Veeramani --- python/ray/data/BUILD | 8 ++ .../execution/bundle_queue/__init__.py | 9 ++ .../execution/bundle_queue/bundle_queue.py | 59 ++++++++ .../bundle_queue/fifo_bundle_queue.py | 129 +++++++++++++++++ .../interfaces/op_runtime_metrics.py | 61 ++++---- .../operators/actor_pool_map_operator.py | 10 +- .../execution/operators/map_operator.py | 45 ++++-- .../execution/streaming_executor_state.py | 22 +-- python/ray/data/tests/test_bundle_queue.py | 130 ++++++++++++++++++ python/ray/data/tests/test_operators.py | 10 +- python/ray/data/tests/test_stats.py | 28 ++-- 11 files changed, 440 insertions(+), 71 deletions(-) create mode 100644 python/ray/data/_internal/execution/bundle_queue/__init__.py create mode 100644 python/ray/data/_internal/execution/bundle_queue/bundle_queue.py create mode 100644 python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py create mode 100644 python/ray/data/tests/test_bundle_queue.py diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index 9a193e369a26..d232ab352ba0 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -587,6 +587,14 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) +py_test( + name = "test_bundle_queue", + size = "small", + srcs = ["tests/test_bundle_queue.py"], + tags = ["team:data", "exclusive"], + deps = ["//:ray_lib", ":conftest"], +) + py_test( name = "test_autoscaler", size = "small", diff --git a/python/ray/data/_internal/execution/bundle_queue/__init__.py b/python/ray/data/_internal/execution/bundle_queue/__init__.py new file mode 100644 index 000000000000..aa51465258e2 --- /dev/null +++ b/python/ray/data/_internal/execution/bundle_queue/__init__.py @@ -0,0 +1,9 @@ +from .bundle_queue import BundleQueue +from .fifo_bundle_queue import FIFOBundleQueue + + +def create_bundle_queue() -> BundleQueue: + return FIFOBundleQueue() + + +__all__ = ["BundleQueue", "create_bundle_queue"] diff --git a/python/ray/data/_internal/execution/bundle_queue/bundle_queue.py b/python/ray/data/_internal/execution/bundle_queue/bundle_queue.py new file mode 100644 index 000000000000..59fa5c16f737 --- /dev/null +++ b/python/ray/data/_internal/execution/bundle_queue/bundle_queue.py @@ -0,0 +1,59 @@ +import abc +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from ray.data._internal.execution.interfaces import RefBundle + + +class BundleQueue(abc.ABC): + @abc.abstractmethod + def __len__(self) -> int: + """Return the number of bundles in the queue.""" + ... + + @abc.abstractmethod + def __contains__(self, bundle: "RefBundle") -> bool: + """Return whether the bundle is in the queue.""" + ... + + @abc.abstractmethod + def add(self, bundle: "RefBundle") -> None: + """Add a bundle to the queue.""" + ... + + @abc.abstractmethod + def pop(self) -> "RefBundle": + """Remove and return the head of the queue. + Raises: + IndexError: If the queue is empty. + """ + ... + + @abc.abstractmethod + def peek(self) -> Optional["RefBundle"]: + """Return the head of the queue without removing it. + If the queue is empty, return `None`. + """ + ... + + @abc.abstractmethod + def remove(self, bundle: "RefBundle"): + """Remove a bundle from the queue.""" + ... + + @abc.abstractmethod + def clear(self): + """Remove all bundles from the queue.""" + ... + + @abc.abstractmethod + def estimate_size_bytes(self) -> int: + """Return an estimate of the total size of objects in the queue.""" + ... + + @abc.abstractmethod + def is_empty(self): + """Return whether this queue and all of its internal data structures are empty. + This method is used for testing. + """ + ... diff --git a/python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py b/python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py new file mode 100644 index 000000000000..4422c8798eac --- /dev/null +++ b/python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py @@ -0,0 +1,129 @@ +from collections import defaultdict, deque +from dataclasses import dataclass +from typing import TYPE_CHECKING, Dict, List, Optional + +from .bundle_queue import BundleQueue + +if TYPE_CHECKING: + from ray.data._internal.execution.interfaces import RefBundle + + +@dataclass +class _Node: + value: "RefBundle" + next: Optional["_Node"] = None + prev: Optional["_Node"] = None + + +class FIFOBundleQueue(BundleQueue): + """A bundle queue that follows a first-in-first-out policy.""" + + def __init__(self): + # We manually implement a linked list because we need to remove elements + # efficiently, and Python's built-in data structures have O(n) removal time. + self._head: Optional[_Node] = None + self._tail: Optional[_Node] = None + # We use a dictionary to keep track of the nodes corresponding to each bundle. + # This allows us to remove a bundle from the queue in O(1) time. We need a list + # because a bundle can be added to the queue multiple times. Nodes in each list + # are insertion-ordered. + self._bundle_to_nodes: Dict["RefBundle", List[_Node]] = defaultdict(deque) + + self._nbytes = 0 + self._num_bundles = 0 + + def __len__(self) -> int: + return self._num_bundles + + def __contains__(self, bundle: "RefBundle") -> bool: + return bundle in self._bundle_to_nodes + + def add(self, bundle: "RefBundle") -> None: + """Add a bundle to the end (right) of the queue.""" + new_node = _Node(value=bundle, next=None, prev=self._tail) + # Case 1: The queue is empty. + if self._head is None: + assert self._tail is None + self._head = new_node + self._tail = new_node + # Case 2: The queue has at least one element. + else: + self._tail.next = new_node + self._tail = new_node + + self._bundle_to_nodes[bundle].append(new_node) + + self._nbytes += bundle.size_bytes() + self._num_bundles += 1 + + def pop(self) -> "RefBundle": + """Return the first (left) bundle in the queue.""" + # Case 1: The queue is empty. + if not self._head: + raise IndexError("You can't pop from an empty queue") + + bundle = self._head.value + self.remove(bundle) + + return bundle + + def peek(self) -> Optional["RefBundle"]: + """Return the first (left) bundle in the queue without removing it.""" + if self._head is None: + return None + + return self._head.value + + def remove(self, bundle: "RefBundle"): + """Remove a bundle from the queue. + + If there are multiple instances of the bundle in the queue, this method only + removes the first one. + """ + # Case 1: The queue is empty. + if bundle not in self._bundle_to_nodes: + raise ValueError(f"The bundle {bundle} is not in the queue.") + + node = self._bundle_to_nodes[bundle].popleft() + if not self._bundle_to_nodes[bundle]: + del self._bundle_to_nodes[bundle] + + # Case 2: The bundle is the only element in the queue. + if self._head is self._tail: + self._head = None + self._tail = None + # Case 3: The bundle is the first element in the queue. + elif node is self._head: + self._head = node.next + self._head.prev = None + # Case 4: The bundle is the last element in the queue. + elif node is self._tail: + self._tail = node.prev + self._tail.next = None + # Case 5: The bundle is in the middle of the queue. + else: + node.prev.next = node.next + node.next.prev = node.prev + + self._nbytes -= bundle.size_bytes() + assert self._nbytes >= 0, ( + "Expected the total size of objects in the queue to be non-negative, but " + f"got {self._nbytes} bytes instead." + ) + + self._num_bundles -= 1 + + return node.value + + def clear(self): + self._head = None + self._tail = None + self._bundle_to_nodes.clear() + self._nbytes = 0 + self._num_bundles = 0 + + def estimate_size_bytes(self) -> int: + return self._nbytes + + def is_empty(self): + return not self._bundle_to_nodes and self._head is None and self._tail is None diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index c42a02aab7d6..cea2645102ae 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional import ray +from ray.data._internal.execution.bundle_queue import create_bundle_queue from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.data._internal.memory_tracing import trace_allocation @@ -267,31 +268,11 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta): description="Number of blocks in operator's internal input queue.", metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) - obj_store_mem_internal_inqueue: int = metric_field( - default=0, - description=( - "Byte size of input blocks in the operator's internal input queue." - ), - metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, - ) obj_store_mem_internal_outqueue_blocks: int = metric_field( default=0, description="Number of blocks in the operator's internal output queue.", metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) - obj_store_mem_internal_outqueue: int = metric_field( - default=0, - description=( - "Byte size of output blocks in the operator's internal output queue." - ), - metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, - ) - obj_store_mem_pending_task_inputs: int = metric_field( - default=0, - description="Byte size of input blocks used by pending tasks.", - metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, - map_only=True, - ) obj_store_mem_freed: int = metric_field( default=0, description="Byte size of freed memory in object store.", @@ -323,6 +304,10 @@ def __init__(self, op: "PhysicalOperator"): # Start time of current pause due to task submission backpressure self._task_submission_backpressure_start_time = -1 + self._internal_inqueue = create_bundle_queue() + self._internal_outqueue = create_bundle_queue() + self._pending_task_inputs = create_bundle_queue() + @property def extra_metrics(self) -> Dict[str, Any]: """Return a dict of extra metrics.""" @@ -377,6 +362,30 @@ def average_bytes_per_output(self) -> Optional[float]: else: return self.bytes_task_outputs_generated / self.num_task_outputs_generated + @metric_property( + description="Byte size of input blocks in the operator's internal input queue.", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, + ) + def obj_store_mem_internal_inqueue(self) -> int: + return self._internal_inqueue.estimate_size_bytes() + + @metric_property( + description=( + "Byte size of output blocks in the operator's internal output queue." + ), + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, + ) + def obj_store_mem_internal_outqueue(self) -> int: + return self._internal_outqueue.estimate_size_bytes() + + @metric_property( + description="Byte size of input blocks used by pending tasks.", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, + map_only=True, + ) + def obj_store_mem_pending_task_inputs(self) -> int: + return self._pending_task_inputs.estimate_size_bytes() + @property def obj_store_mem_pending_task_outputs(self) -> Optional[float]: """Estimated size in bytes of output blocks in Ray generator buffers. @@ -454,13 +463,13 @@ def on_input_received(self, input: RefBundle): def on_input_queued(self, input: RefBundle): """Callback when the operator queues an input.""" self.obj_store_mem_internal_inqueue_blocks += len(input.blocks) - self.obj_store_mem_internal_inqueue += input.size_bytes() + self._internal_inqueue.add(input) def on_input_dequeued(self, input: RefBundle): """Callback when the operator dequeues an input.""" self.obj_store_mem_internal_inqueue_blocks -= len(input.blocks) input_size = input.size_bytes() - self.obj_store_mem_internal_inqueue -= input_size + self._internal_inqueue.remove(input) assert self.obj_store_mem_internal_inqueue >= 0, ( self._op, self.obj_store_mem_internal_inqueue, @@ -470,13 +479,13 @@ def on_input_dequeued(self, input: RefBundle): def on_output_queued(self, output: RefBundle): """Callback when an output is queued by the operator.""" self.obj_store_mem_internal_outqueue_blocks += len(output.blocks) - self.obj_store_mem_internal_outqueue += output.size_bytes() + self._internal_outqueue.add(output) def on_output_dequeued(self, output: RefBundle): """Callback when an output is dequeued by the operator.""" self.obj_store_mem_internal_outqueue_blocks -= len(output.blocks) output_size = output.size_bytes() - self.obj_store_mem_internal_outqueue -= output_size + self._internal_outqueue.remove(output) assert self.obj_store_mem_internal_outqueue >= 0, ( self._op, self.obj_store_mem_internal_outqueue, @@ -504,7 +513,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle): self.num_tasks_submitted += 1 self.num_tasks_running += 1 self.bytes_inputs_of_submitted_tasks += inputs.size_bytes() - self.obj_store_mem_pending_task_inputs += inputs.size_bytes() + self._pending_task_inputs.add(inputs) self._running_tasks[task_index] = RunningTaskInfo(inputs, 0, 0) def on_task_output_generated(self, task_index: int, output: RefBundle): @@ -544,7 +553,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]): total_input_size = inputs.size_bytes() self.bytes_task_inputs_processed += total_input_size input_size = inputs.size_bytes() - self.obj_store_mem_pending_task_inputs -= input_size + self._pending_task_inputs.remove(inputs) assert self.obj_store_mem_pending_task_inputs >= 0, ( self._op, self.obj_store_mem_pending_task_inputs, diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index 1c46f768d2bd..a0c4eb524916 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -1,4 +1,3 @@ -import collections import logging from dataclasses import dataclass from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union @@ -8,6 +7,7 @@ from ray.core.generated import gcs_pb2 from ray.data._internal.compute import ActorPoolStrategy from ray.data._internal.execution.autoscaler import AutoscalingActorPool +from ray.data._internal.execution.bundle_queue import create_bundle_queue from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -109,7 +109,7 @@ def __init__( self._actor_pool = _ActorPool(compute_strategy, self._start_actor) # A queue of bundles awaiting dispatch to actors. - self._bundle_queue = collections.deque() + self._bundle_queue = create_bundle_queue() # Cached actor class. self._cls = None # Whether no more submittable bundles will be added. @@ -175,7 +175,7 @@ def _task_done_callback(res_ref): return actor, res_ref def _add_bundled_input(self, bundle: RefBundle): - self._bundle_queue.append(bundle) + self._bundle_queue.add(bundle) self._metrics.on_input_queued(bundle) # Try to dispatch all bundles in the queue, including this new bundle. self._dispatch_tasks() @@ -191,14 +191,14 @@ def _dispatch_tasks(self): while self._bundle_queue: # Pick an actor from the pool. if self._actor_locality_enabled: - actor = self._actor_pool.pick_actor(self._bundle_queue[0]) + actor = self._actor_pool.pick_actor(self._bundle_queue.peek()) else: actor = self._actor_pool.pick_actor() if actor is None: # No actors available for executing the next task. break # Submit the map task. - bundle = self._bundle_queue.popleft() + bundle = self._bundle_queue.pop() self._metrics.on_input_dequeued(bundle) input_blocks = [block for block, _ in bundle.blocks] ctx = TaskContext( diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 6a42e0c760af..f5018b2aeb5d 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -4,7 +4,18 @@ import logging from abc import ABC, abstractmethod from collections import defaultdict, deque -from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union +from typing import ( + Any, + Callable, + Deque, + Dict, + Iterator, + List, + Optional, + Set, + Tuple, + Union, +) import ray from ray import ObjectRef @@ -233,15 +244,22 @@ def __call__(self, args): def _add_input_inner(self, refs: RefBundle, input_index: int): assert input_index == 0, input_index + # Add RefBundle to the bundler. self._block_ref_bundler.add_bundle(refs) self._metrics.on_input_queued(refs) + if self._block_ref_bundler.has_bundle(): + # The ref bundler combines one or more RefBundles into a new larger + # RefBundle. Rather than dequeuing the new RefBundle, which was never + # enqueued in the first place, we dequeue the original RefBundles. + input_refs, bundled_input = self._block_ref_bundler.get_next_bundle() + for bundle in input_refs: + self._metrics.on_input_dequeued(bundle) + # If the bundler has a full bundle, add it to the operator's task submission - # queue. - bundle = self._block_ref_bundler.get_next_bundle() - self._metrics.on_input_dequeued(bundle) - self._add_bundled_input(bundle) + # queue + self._add_bundled_input(bundled_input) def _get_runtime_ray_remote_args( self, input_bundle: Optional[RefBundle] = None @@ -375,8 +393,8 @@ def all_inputs_done(self): self._block_ref_bundler.done_adding_bundles() if self._block_ref_bundler.has_bundle(): # Handle any leftover bundles in the bundler. - bundle = self._block_ref_bundler.get_next_bundle() - self._add_bundled_input(bundle) + _, bundled_input = self._block_ref_bundler.get_next_bundle() + self._add_bundled_input(bundled_input) super().all_inputs_done() def has_next(self) -> bool: @@ -501,8 +519,13 @@ def has_bundle(self) -> bool: or (self._finalized and self._bundle_buffer_size > 0) ) - def get_next_bundle(self) -> RefBundle: - """Gets the next bundle.""" + def get_next_bundle(self) -> Tuple[List[RefBundle], RefBundle]: + """Gets the next bundle. + + Returns: + A two-tuple. The first element is a list of bundles that were combined into + the output bundle. The second element is the output bundle. + """ assert self.has_bundle() if self._min_rows_per_bundle is None: # Short-circuit if no bundle row target was defined. @@ -510,7 +533,7 @@ def get_next_bundle(self) -> RefBundle: bundle = self._bundle_buffer[0] self._bundle_buffer = [] self._bundle_buffer_size = 0 - return bundle + return [bundle], bundle leftover = [] output_buffer = [] output_buffer_size = 0 @@ -538,7 +561,7 @@ def get_next_bundle(self) -> RefBundle: self._bundle_buffer_size = sum( self._get_bundle_size(bundle) for bundle in leftover ) - return _merge_ref_bundles(*output_buffer) + return list(output_buffer), _merge_ref_bundles(*output_buffer) def done_adding_bundles(self): """Indicate that no more RefBundles will be added to this bundler.""" diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 9f44a0f6cc7a..8ca0247dd393 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -14,6 +14,7 @@ import ray from ray.data._internal.execution.autoscaler import Autoscaler from ray.data._internal.execution.backpressure_policy import BackpressurePolicy +from ray.data._internal.execution.bundle_queue import create_bundle_queue from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -47,9 +48,8 @@ class OpBufferQueue: """ def __init__(self): - self._memory_usage = 0 self._num_blocks = 0 - self._queue = deque() + self._queue = create_bundle_queue() self._num_per_split = defaultdict(int) self._lock = threading.Lock() # Used to buffer output RefBundles indexed by output splits. @@ -60,7 +60,7 @@ def __init__(self): def memory_usage(self) -> int: """The total memory usage of the queue in bytes.""" with self._lock: - return self._memory_usage + return self._queue.estimate_size_bytes() @property def num_blocks(self) -> int: @@ -69,7 +69,8 @@ def num_blocks(self) -> int: return self._num_blocks def __len__(self): - return len(self._queue) + with self._lock: + return len(self._queue) def has_next(self, output_split_idx: Optional[int] = None) -> bool: """Whether next RefBundle is available. @@ -79,16 +80,16 @@ def has_next(self, output_split_idx: Optional[int] = None) -> bool: given output split. """ if output_split_idx is None: - return len(self._queue) > 0 + with self._lock: + return len(self._queue) > 0 else: with self._lock: return self._num_per_split[output_split_idx] > 0 def append(self, ref: RefBundle): """Append a RefBundle to the queue.""" - self._queue.append(ref) with self._lock: - self._memory_usage += ref.size_bytes() + self._queue.add(ref) self._num_blocks += len(ref.blocks) if ref.output_split_idx is not None: self._num_per_split[ref.output_split_idx] += 1 @@ -104,7 +105,8 @@ def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]: ret = None if output_split_idx is None: try: - ret = self._queue.popleft() + with self._lock: + ret = self._queue.pop() except IndexError: pass else: @@ -119,7 +121,7 @@ def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]: # preserve the order of ref bundles with different output splits. with self._lock: while len(self._queue) > 0: - ref = self._queue.popleft() + ref = self._queue.pop() self._outputs_by_split[ref.output_split_idx].append(ref) try: ret = split_queue.popleft() @@ -128,7 +130,6 @@ def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]: if ret is None: return None with self._lock: - self._memory_usage -= ret.size_bytes() self._num_blocks -= len(ret.blocks) if ret.output_split_idx is not None: self._num_per_split[ret.output_split_idx] -= 1 @@ -137,7 +138,6 @@ def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]: def clear(self): with self._lock: self._queue.clear() - self._memory_usage = 0 self._num_blocks = 0 self._num_per_split.clear() diff --git a/python/ray/data/tests/test_bundle_queue.py b/python/ray/data/tests/test_bundle_queue.py new file mode 100644 index 000000000000..4d06b74189f6 --- /dev/null +++ b/python/ray/data/tests/test_bundle_queue.py @@ -0,0 +1,130 @@ +from typing import Any + +import pyarrow as pa +import pytest + +import ray +from ray.data._internal.execution.bundle_queue import create_bundle_queue +from ray.data._internal.execution.interfaces import RefBundle +from ray.data.block import BlockAccessor + + +def _create_bundle(data: Any) -> RefBundle: + """Create a RefBundle with a single row with the given data.""" + block = pa.Table.from_pydict({"data": [data]}) + block_ref = ray.put(block) + metadata = BlockAccessor.for_block(block).get_metadata() + return RefBundle([(block_ref, metadata)], owns_blocks=False) + + +# CVGA-start +def test_add_and_length(): + queue = create_bundle_queue() + queue.add(_create_bundle("test1")) + queue.add(_create_bundle("test2")) + assert len(queue) == 2 + + +def test_pop(): + queue = create_bundle_queue() + bundle1 = _create_bundle("test1") + queue.add(bundle1) + bundle2 = _create_bundle("test2") + queue.add(bundle2) + + popped_bundle = queue.pop() + assert popped_bundle is bundle1 + assert len(queue) == 1 + + +def test_peek(): + queue = create_bundle_queue() + bundle1 = _create_bundle("test1") + queue.add(bundle1) + bundle2 = _create_bundle("test2") + queue.add(bundle2) + + peeked_bundle = queue.peek() + assert peeked_bundle is bundle1 + assert len(queue) == 2 # Length should remain unchanged + + +def test_pop_empty_queue(): + queue = create_bundle_queue() + with pytest.raises(IndexError): + queue.pop() + + +def test_pop_does_not_leak_objects(): + queue = create_bundle_queue() + bundle1 = _create_bundle("test1") + queue.add(bundle1) + queue.pop() + assert queue.is_empty() + + +def test_peek_empty_queue(): + queue = create_bundle_queue() + assert queue.peek() is None + assert queue.is_empty() + + +def test_remove(): + queue = create_bundle_queue() + bundle1 = _create_bundle("test1") + bundle2 = _create_bundle("test2") + queue.add(bundle1) + queue.add(bundle2) + + queue.remove(bundle1) + assert len(queue) == 1 + assert queue.peek() is bundle2 + + +def test_remove_does_not_leak_objects(): + queue = create_bundle_queue() + bundle1 = _create_bundle("test1") + queue.add(bundle1) + queue.remove(bundle1) + assert queue.is_empty() + + +def test_add_and_remove_duplicates(): + queue = create_bundle_queue() + bundle1 = _create_bundle("test1") + bundle2 = _create_bundle("test2") + queue.add(bundle1) + queue.add(bundle2) + queue.add(bundle1) + + assert len(queue) == 3 + queue.remove(bundle1) + assert len(queue) == 2 + assert queue.peek() is bundle2 + + +def test_clear(): + queue = create_bundle_queue() + queue.add(_create_bundle("test1")) + queue.add(_create_bundle("test2")) + queue.clear() + assert len(queue) == 0 + assert queue.estimate_size_bytes() == 0 + assert queue.is_empty() + + +def test_estimate_size_bytes(): + queue = create_bundle_queue() + bundle1 = _create_bundle("test1") + bundle2 = _create_bundle("test2") + queue.add(bundle1) + queue.add(bundle2) + assert queue.estimate_size_bytes() == bundle1.size_bytes() + bundle2.size_bytes() + + +# CVGA-end + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index 153fc38fb7eb..04656c7adfc3 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -788,11 +788,11 @@ def test_block_ref_bundler_basic(target, in_bundles, expected_bundles): for bundle in bundles: bundler.add_bundle(bundle) while bundler.has_bundle(): - out_bundle = _get_bundles(bundler.get_next_bundle()) + out_bundle = _get_bundles(bundler.get_next_bundle()[1]) out_bundles.append(out_bundle) bundler.done_adding_bundles() if bundler.has_bundle(): - out_bundle = _get_bundles(bundler.get_next_bundle()) + out_bundle = _get_bundles(bundler.get_next_bundle()[1]) out_bundles.append(out_bundle) assert len(out_bundles) == len(expected_bundles) for bundle, expected in zip(out_bundles, expected_bundles): @@ -820,10 +820,12 @@ def test_block_ref_bundler_uniform( for bundle in bundles: bundler.add_bundle(bundle) while bundler.has_bundle(): - out_bundles.append(bundler.get_next_bundle()) + _, out_bundle = bundler.get_next_bundle() + out_bundles.append(out_bundle) bundler.done_adding_bundles() if bundler.has_bundle(): - out_bundles.append(bundler.get_next_bundle()) + _, out_bundle = bundler.get_next_bundle() + out_bundles.append(out_bundle) assert len(out_bundles) == num_out_bundles for out_bundle in out_bundles: assert out_bundle.num_rows() == out_bundle_size diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 28caa6f6773d..d8d85515092c 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -43,6 +43,9 @@ def gen_expected_metrics( metrics = [ "'average_num_outputs_per_task': N", "'average_bytes_per_output': N", + "'obj_store_mem_internal_inqueue': Z", + "'obj_store_mem_internal_outqueue': Z", + "'obj_store_mem_pending_task_inputs': Z", "'average_bytes_inputs_per_task': N", "'average_bytes_outputs_per_task': N", "'num_inputs_received': N", @@ -68,10 +71,7 @@ def gen_expected_metrics( f"{'N' if task_backpressure else 'Z'}" ), "'obj_store_mem_internal_inqueue_blocks': Z", - "'obj_store_mem_internal_inqueue': Z", "'obj_store_mem_internal_outqueue_blocks': Z", - "'obj_store_mem_internal_outqueue': Z", - "'obj_store_mem_pending_task_inputs': Z", "'obj_store_mem_freed': N", f"""'obj_store_mem_spilled': {"N" if spilled else "Z"}""", "'obj_store_mem_used': A", @@ -80,6 +80,8 @@ def gen_expected_metrics( ] else: metrics = [ + "'obj_store_mem_internal_inqueue': Z", + "'obj_store_mem_internal_outqueue': Z", "'num_inputs_received': N", "'bytes_inputs_received': N", "'num_outputs_taken': N", @@ -89,9 +91,7 @@ def gen_expected_metrics( f"{'N' if task_backpressure else 'Z'}" ), "'obj_store_mem_internal_inqueue_blocks': Z", - "'obj_store_mem_internal_inqueue': Z", "'obj_store_mem_internal_outqueue_blocks': Z", - "'obj_store_mem_internal_outqueue': Z", "'obj_store_mem_used': A", "'cpu_usage': Z", "'gpu_usage': Z", @@ -537,6 +537,9 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " extra_metrics={\n" " average_num_outputs_per_task: N,\n" " average_bytes_per_output: N,\n" + " obj_store_mem_internal_inqueue: Z,\n" + " obj_store_mem_internal_outqueue: Z,\n" + " obj_store_mem_pending_task_inputs: Z,\n" " average_bytes_inputs_per_task: N,\n" " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" @@ -559,10 +562,7 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " block_generation_time: N,\n" " task_submission_backpressure_time: N,\n" " obj_store_mem_internal_inqueue_blocks: Z,\n" - " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue_blocks: Z,\n" - " obj_store_mem_internal_outqueue: Z,\n" - " obj_store_mem_pending_task_inputs: Z,\n" " obj_store_mem_freed: N,\n" " obj_store_mem_spilled: Z,\n" " obj_store_mem_used: A,\n" @@ -651,6 +651,9 @@ def check_stats(): " extra_metrics={\n" " average_num_outputs_per_task: N,\n" " average_bytes_per_output: N,\n" + " obj_store_mem_internal_inqueue: Z,\n" + " obj_store_mem_internal_outqueue: Z,\n" + " obj_store_mem_pending_task_inputs: Z,\n" " average_bytes_inputs_per_task: N,\n" " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" @@ -673,10 +676,7 @@ def check_stats(): " block_generation_time: N,\n" " task_submission_backpressure_time: N,\n" " obj_store_mem_internal_inqueue_blocks: Z,\n" - " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue_blocks: Z,\n" - " obj_store_mem_internal_outqueue: Z,\n" - " obj_store_mem_pending_task_inputs: Z,\n" " obj_store_mem_freed: N,\n" " obj_store_mem_spilled: Z,\n" " obj_store_mem_used: A,\n" @@ -720,6 +720,9 @@ def check_stats(): " extra_metrics={\n" " average_num_outputs_per_task: N,\n" " average_bytes_per_output: N,\n" + " obj_store_mem_internal_inqueue: Z,\n" + " obj_store_mem_internal_outqueue: Z,\n" + " obj_store_mem_pending_task_inputs: Z,\n" " average_bytes_inputs_per_task: N,\n" " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" @@ -742,10 +745,7 @@ def check_stats(): " block_generation_time: N,\n" " task_submission_backpressure_time: N,\n" " obj_store_mem_internal_inqueue_blocks: Z,\n" - " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue_blocks: Z,\n" - " obj_store_mem_internal_outqueue: Z,\n" - " obj_store_mem_pending_task_inputs: Z,\n" " obj_store_mem_freed: N,\n" " obj_store_mem_spilled: Z,\n" " obj_store_mem_used: A,\n"