From 4a9c424deadaecbd880fe4ddd4343dca364447c2 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 6 Nov 2024 17:10:28 -0800 Subject: [PATCH] Revert "[Data] Add `BundleQueue` abstraction (#48503)" (#48612) Signed-off-by: Balaji Veeramani --- python/ray/data/BUILD | 8 -- .../execution/bundle_queue/__init__.py | 9 -- .../execution/bundle_queue/bundle_queue.py | 62 --------- .../bundle_queue/fifo_bundle_queue.py | 129 ----------------- .../interfaces/op_runtime_metrics.py | 61 ++++---- .../operators/actor_pool_map_operator.py | 10 +- .../execution/operators/map_operator.py | 45 ++---- .../execution/streaming_executor_state.py | 22 +-- python/ray/data/tests/test_bundle_queue.py | 130 ------------------ python/ray/data/tests/test_operators.py | 10 +- python/ray/data/tests/test_stats.py | 28 ++-- 11 files changed, 71 insertions(+), 443 deletions(-) delete mode 100644 python/ray/data/_internal/execution/bundle_queue/__init__.py delete mode 100644 python/ray/data/_internal/execution/bundle_queue/bundle_queue.py delete mode 100644 python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py delete mode 100644 python/ray/data/tests/test_bundle_queue.py diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index d232ab352ba0..9a193e369a26 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -587,14 +587,6 @@ py_test( deps = ["//:ray_lib", ":conftest"], ) -py_test( - name = "test_bundle_queue", - size = "small", - srcs = ["tests/test_bundle_queue.py"], - tags = ["team:data", "exclusive"], - deps = ["//:ray_lib", ":conftest"], -) - py_test( name = "test_autoscaler", size = "small", diff --git a/python/ray/data/_internal/execution/bundle_queue/__init__.py b/python/ray/data/_internal/execution/bundle_queue/__init__.py deleted file mode 100644 index aa51465258e2..000000000000 --- a/python/ray/data/_internal/execution/bundle_queue/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from .bundle_queue import BundleQueue -from .fifo_bundle_queue import FIFOBundleQueue - - -def create_bundle_queue() -> BundleQueue: - return FIFOBundleQueue() - - -__all__ = ["BundleQueue", "create_bundle_queue"] diff --git a/python/ray/data/_internal/execution/bundle_queue/bundle_queue.py b/python/ray/data/_internal/execution/bundle_queue/bundle_queue.py deleted file mode 100644 index f11bacf14c33..000000000000 --- a/python/ray/data/_internal/execution/bundle_queue/bundle_queue.py +++ /dev/null @@ -1,62 +0,0 @@ -import abc -from typing import TYPE_CHECKING, Optional - -if TYPE_CHECKING: - from ray.data._internal.execution.interfaces import RefBundle - - -class BundleQueue(abc.ABC): - @abc.abstractmethod - def __len__(self) -> int: - """Return the number of bundles in the queue.""" - ... - - @abc.abstractmethod - def __contains__(self, bundle: "RefBundle") -> bool: - """Return whether the bundle is in the queue.""" - ... - - @abc.abstractmethod - def add(self, bundle: "RefBundle") -> None: - """Add a bundle to the queue.""" - ... - - @abc.abstractmethod - def pop(self) -> "RefBundle": - """Remove and return the head of the queue. - - Raises: - IndexError: If the queue is empty. - """ - ... - - @abc.abstractmethod - def peek(self) -> Optional["RefBundle"]: - """Return the head of the queue without removing it. - - If the queue is empty, return `None`. - """ - ... - - @abc.abstractmethod - def remove(self, bundle: "RefBundle"): - """Remove a bundle from the queue.""" - ... - - @abc.abstractmethod - def clear(self): - """Remove all bundles from the queue.""" - ... - - @abc.abstractmethod - def estimate_size_bytes(self) -> int: - """Return an estimate of the total size of objects in the queue.""" - ... - - @abc.abstractmethod - def is_empty(self): - """Return whether this queue and all of its internal data structures are empty. - - This method is used for testing. - """ - ... diff --git a/python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py b/python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py deleted file mode 100644 index 4422c8798eac..000000000000 --- a/python/ray/data/_internal/execution/bundle_queue/fifo_bundle_queue.py +++ /dev/null @@ -1,129 +0,0 @@ -from collections import defaultdict, deque -from dataclasses import dataclass -from typing import TYPE_CHECKING, Dict, List, Optional - -from .bundle_queue import BundleQueue - -if TYPE_CHECKING: - from ray.data._internal.execution.interfaces import RefBundle - - -@dataclass -class _Node: - value: "RefBundle" - next: Optional["_Node"] = None - prev: Optional["_Node"] = None - - -class FIFOBundleQueue(BundleQueue): - """A bundle queue that follows a first-in-first-out policy.""" - - def __init__(self): - # We manually implement a linked list because we need to remove elements - # efficiently, and Python's built-in data structures have O(n) removal time. - self._head: Optional[_Node] = None - self._tail: Optional[_Node] = None - # We use a dictionary to keep track of the nodes corresponding to each bundle. - # This allows us to remove a bundle from the queue in O(1) time. We need a list - # because a bundle can be added to the queue multiple times. Nodes in each list - # are insertion-ordered. - self._bundle_to_nodes: Dict["RefBundle", List[_Node]] = defaultdict(deque) - - self._nbytes = 0 - self._num_bundles = 0 - - def __len__(self) -> int: - return self._num_bundles - - def __contains__(self, bundle: "RefBundle") -> bool: - return bundle in self._bundle_to_nodes - - def add(self, bundle: "RefBundle") -> None: - """Add a bundle to the end (right) of the queue.""" - new_node = _Node(value=bundle, next=None, prev=self._tail) - # Case 1: The queue is empty. - if self._head is None: - assert self._tail is None - self._head = new_node - self._tail = new_node - # Case 2: The queue has at least one element. - else: - self._tail.next = new_node - self._tail = new_node - - self._bundle_to_nodes[bundle].append(new_node) - - self._nbytes += bundle.size_bytes() - self._num_bundles += 1 - - def pop(self) -> "RefBundle": - """Return the first (left) bundle in the queue.""" - # Case 1: The queue is empty. - if not self._head: - raise IndexError("You can't pop from an empty queue") - - bundle = self._head.value - self.remove(bundle) - - return bundle - - def peek(self) -> Optional["RefBundle"]: - """Return the first (left) bundle in the queue without removing it.""" - if self._head is None: - return None - - return self._head.value - - def remove(self, bundle: "RefBundle"): - """Remove a bundle from the queue. - - If there are multiple instances of the bundle in the queue, this method only - removes the first one. - """ - # Case 1: The queue is empty. - if bundle not in self._bundle_to_nodes: - raise ValueError(f"The bundle {bundle} is not in the queue.") - - node = self._bundle_to_nodes[bundle].popleft() - if not self._bundle_to_nodes[bundle]: - del self._bundle_to_nodes[bundle] - - # Case 2: The bundle is the only element in the queue. - if self._head is self._tail: - self._head = None - self._tail = None - # Case 3: The bundle is the first element in the queue. - elif node is self._head: - self._head = node.next - self._head.prev = None - # Case 4: The bundle is the last element in the queue. - elif node is self._tail: - self._tail = node.prev - self._tail.next = None - # Case 5: The bundle is in the middle of the queue. - else: - node.prev.next = node.next - node.next.prev = node.prev - - self._nbytes -= bundle.size_bytes() - assert self._nbytes >= 0, ( - "Expected the total size of objects in the queue to be non-negative, but " - f"got {self._nbytes} bytes instead." - ) - - self._num_bundles -= 1 - - return node.value - - def clear(self): - self._head = None - self._tail = None - self._bundle_to_nodes.clear() - self._nbytes = 0 - self._num_bundles = 0 - - def estimate_size_bytes(self) -> int: - return self._nbytes - - def is_empty(self): - return not self._bundle_to_nodes and self._head is None and self._tail is None diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index cea2645102ae..c42a02aab7d6 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -4,7 +4,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional import ray -from ray.data._internal.execution.bundle_queue import create_bundle_queue from ray.data._internal.execution.interfaces.ref_bundle import RefBundle from ray.data._internal.memory_tracing import trace_allocation @@ -268,11 +267,31 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta): description="Number of blocks in operator's internal input queue.", metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) + obj_store_mem_internal_inqueue: int = metric_field( + default=0, + description=( + "Byte size of input blocks in the operator's internal input queue." + ), + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, + ) obj_store_mem_internal_outqueue_blocks: int = metric_field( default=0, description="Number of blocks in the operator's internal output queue.", metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, ) + obj_store_mem_internal_outqueue: int = metric_field( + default=0, + description=( + "Byte size of output blocks in the operator's internal output queue." + ), + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, + ) + obj_store_mem_pending_task_inputs: int = metric_field( + default=0, + description="Byte size of input blocks used by pending tasks.", + metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, + map_only=True, + ) obj_store_mem_freed: int = metric_field( default=0, description="Byte size of freed memory in object store.", @@ -304,10 +323,6 @@ def __init__(self, op: "PhysicalOperator"): # Start time of current pause due to task submission backpressure self._task_submission_backpressure_start_time = -1 - self._internal_inqueue = create_bundle_queue() - self._internal_outqueue = create_bundle_queue() - self._pending_task_inputs = create_bundle_queue() - @property def extra_metrics(self) -> Dict[str, Any]: """Return a dict of extra metrics.""" @@ -362,30 +377,6 @@ def average_bytes_per_output(self) -> Optional[float]: else: return self.bytes_task_outputs_generated / self.num_task_outputs_generated - @metric_property( - description="Byte size of input blocks in the operator's internal input queue.", - metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, - ) - def obj_store_mem_internal_inqueue(self) -> int: - return self._internal_inqueue.estimate_size_bytes() - - @metric_property( - description=( - "Byte size of output blocks in the operator's internal output queue." - ), - metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, - ) - def obj_store_mem_internal_outqueue(self) -> int: - return self._internal_outqueue.estimate_size_bytes() - - @metric_property( - description="Byte size of input blocks used by pending tasks.", - metrics_group=MetricsGroup.OBJECT_STORE_MEMORY, - map_only=True, - ) - def obj_store_mem_pending_task_inputs(self) -> int: - return self._pending_task_inputs.estimate_size_bytes() - @property def obj_store_mem_pending_task_outputs(self) -> Optional[float]: """Estimated size in bytes of output blocks in Ray generator buffers. @@ -463,13 +454,13 @@ def on_input_received(self, input: RefBundle): def on_input_queued(self, input: RefBundle): """Callback when the operator queues an input.""" self.obj_store_mem_internal_inqueue_blocks += len(input.blocks) - self._internal_inqueue.add(input) + self.obj_store_mem_internal_inqueue += input.size_bytes() def on_input_dequeued(self, input: RefBundle): """Callback when the operator dequeues an input.""" self.obj_store_mem_internal_inqueue_blocks -= len(input.blocks) input_size = input.size_bytes() - self._internal_inqueue.remove(input) + self.obj_store_mem_internal_inqueue -= input_size assert self.obj_store_mem_internal_inqueue >= 0, ( self._op, self.obj_store_mem_internal_inqueue, @@ -479,13 +470,13 @@ def on_input_dequeued(self, input: RefBundle): def on_output_queued(self, output: RefBundle): """Callback when an output is queued by the operator.""" self.obj_store_mem_internal_outqueue_blocks += len(output.blocks) - self._internal_outqueue.add(output) + self.obj_store_mem_internal_outqueue += output.size_bytes() def on_output_dequeued(self, output: RefBundle): """Callback when an output is dequeued by the operator.""" self.obj_store_mem_internal_outqueue_blocks -= len(output.blocks) output_size = output.size_bytes() - self._internal_outqueue.remove(output) + self.obj_store_mem_internal_outqueue -= output_size assert self.obj_store_mem_internal_outqueue >= 0, ( self._op, self.obj_store_mem_internal_outqueue, @@ -513,7 +504,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle): self.num_tasks_submitted += 1 self.num_tasks_running += 1 self.bytes_inputs_of_submitted_tasks += inputs.size_bytes() - self._pending_task_inputs.add(inputs) + self.obj_store_mem_pending_task_inputs += inputs.size_bytes() self._running_tasks[task_index] = RunningTaskInfo(inputs, 0, 0) def on_task_output_generated(self, task_index: int, output: RefBundle): @@ -553,7 +544,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]): total_input_size = inputs.size_bytes() self.bytes_task_inputs_processed += total_input_size input_size = inputs.size_bytes() - self._pending_task_inputs.remove(inputs) + self.obj_store_mem_pending_task_inputs -= input_size assert self.obj_store_mem_pending_task_inputs >= 0, ( self._op, self.obj_store_mem_pending_task_inputs, diff --git a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py index a0c4eb524916..1c46f768d2bd 100644 --- a/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py +++ b/python/ray/data/_internal/execution/operators/actor_pool_map_operator.py @@ -1,3 +1,4 @@ +import collections import logging from dataclasses import dataclass from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union @@ -7,7 +8,6 @@ from ray.core.generated import gcs_pb2 from ray.data._internal.compute import ActorPoolStrategy from ray.data._internal.execution.autoscaler import AutoscalingActorPool -from ray.data._internal.execution.bundle_queue import create_bundle_queue from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -109,7 +109,7 @@ def __init__( self._actor_pool = _ActorPool(compute_strategy, self._start_actor) # A queue of bundles awaiting dispatch to actors. - self._bundle_queue = create_bundle_queue() + self._bundle_queue = collections.deque() # Cached actor class. self._cls = None # Whether no more submittable bundles will be added. @@ -175,7 +175,7 @@ def _task_done_callback(res_ref): return actor, res_ref def _add_bundled_input(self, bundle: RefBundle): - self._bundle_queue.add(bundle) + self._bundle_queue.append(bundle) self._metrics.on_input_queued(bundle) # Try to dispatch all bundles in the queue, including this new bundle. self._dispatch_tasks() @@ -191,14 +191,14 @@ def _dispatch_tasks(self): while self._bundle_queue: # Pick an actor from the pool. if self._actor_locality_enabled: - actor = self._actor_pool.pick_actor(self._bundle_queue.peek()) + actor = self._actor_pool.pick_actor(self._bundle_queue[0]) else: actor = self._actor_pool.pick_actor() if actor is None: # No actors available for executing the next task. break # Submit the map task. - bundle = self._bundle_queue.pop() + bundle = self._bundle_queue.popleft() self._metrics.on_input_dequeued(bundle) input_blocks = [block for block, _ in bundle.blocks] ctx = TaskContext( diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index f5018b2aeb5d..6a42e0c760af 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -4,18 +4,7 @@ import logging from abc import ABC, abstractmethod from collections import defaultdict, deque -from typing import ( - Any, - Callable, - Deque, - Dict, - Iterator, - List, - Optional, - Set, - Tuple, - Union, -) +from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union import ray from ray import ObjectRef @@ -244,22 +233,15 @@ def __call__(self, args): def _add_input_inner(self, refs: RefBundle, input_index: int): assert input_index == 0, input_index - # Add RefBundle to the bundler. self._block_ref_bundler.add_bundle(refs) self._metrics.on_input_queued(refs) - if self._block_ref_bundler.has_bundle(): - # The ref bundler combines one or more RefBundles into a new larger - # RefBundle. Rather than dequeuing the new RefBundle, which was never - # enqueued in the first place, we dequeue the original RefBundles. - input_refs, bundled_input = self._block_ref_bundler.get_next_bundle() - for bundle in input_refs: - self._metrics.on_input_dequeued(bundle) - # If the bundler has a full bundle, add it to the operator's task submission - # queue - self._add_bundled_input(bundled_input) + # queue. + bundle = self._block_ref_bundler.get_next_bundle() + self._metrics.on_input_dequeued(bundle) + self._add_bundled_input(bundle) def _get_runtime_ray_remote_args( self, input_bundle: Optional[RefBundle] = None @@ -393,8 +375,8 @@ def all_inputs_done(self): self._block_ref_bundler.done_adding_bundles() if self._block_ref_bundler.has_bundle(): # Handle any leftover bundles in the bundler. - _, bundled_input = self._block_ref_bundler.get_next_bundle() - self._add_bundled_input(bundled_input) + bundle = self._block_ref_bundler.get_next_bundle() + self._add_bundled_input(bundle) super().all_inputs_done() def has_next(self) -> bool: @@ -519,13 +501,8 @@ def has_bundle(self) -> bool: or (self._finalized and self._bundle_buffer_size > 0) ) - def get_next_bundle(self) -> Tuple[List[RefBundle], RefBundle]: - """Gets the next bundle. - - Returns: - A two-tuple. The first element is a list of bundles that were combined into - the output bundle. The second element is the output bundle. - """ + def get_next_bundle(self) -> RefBundle: + """Gets the next bundle.""" assert self.has_bundle() if self._min_rows_per_bundle is None: # Short-circuit if no bundle row target was defined. @@ -533,7 +510,7 @@ def get_next_bundle(self) -> Tuple[List[RefBundle], RefBundle]: bundle = self._bundle_buffer[0] self._bundle_buffer = [] self._bundle_buffer_size = 0 - return [bundle], bundle + return bundle leftover = [] output_buffer = [] output_buffer_size = 0 @@ -561,7 +538,7 @@ def get_next_bundle(self) -> Tuple[List[RefBundle], RefBundle]: self._bundle_buffer_size = sum( self._get_bundle_size(bundle) for bundle in leftover ) - return list(output_buffer), _merge_ref_bundles(*output_buffer) + return _merge_ref_bundles(*output_buffer) def done_adding_bundles(self): """Indicate that no more RefBundles will be added to this bundler.""" diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index 8ca0247dd393..9f44a0f6cc7a 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -14,7 +14,6 @@ import ray from ray.data._internal.execution.autoscaler import Autoscaler from ray.data._internal.execution.backpressure_policy import BackpressurePolicy -from ray.data._internal.execution.bundle_queue import create_bundle_queue from ray.data._internal.execution.interfaces import ( ExecutionOptions, ExecutionResources, @@ -48,8 +47,9 @@ class OpBufferQueue: """ def __init__(self): + self._memory_usage = 0 self._num_blocks = 0 - self._queue = create_bundle_queue() + self._queue = deque() self._num_per_split = defaultdict(int) self._lock = threading.Lock() # Used to buffer output RefBundles indexed by output splits. @@ -60,7 +60,7 @@ def __init__(self): def memory_usage(self) -> int: """The total memory usage of the queue in bytes.""" with self._lock: - return self._queue.estimate_size_bytes() + return self._memory_usage @property def num_blocks(self) -> int: @@ -69,8 +69,7 @@ def num_blocks(self) -> int: return self._num_blocks def __len__(self): - with self._lock: - return len(self._queue) + return len(self._queue) def has_next(self, output_split_idx: Optional[int] = None) -> bool: """Whether next RefBundle is available. @@ -80,16 +79,16 @@ def has_next(self, output_split_idx: Optional[int] = None) -> bool: given output split. """ if output_split_idx is None: - with self._lock: - return len(self._queue) > 0 + return len(self._queue) > 0 else: with self._lock: return self._num_per_split[output_split_idx] > 0 def append(self, ref: RefBundle): """Append a RefBundle to the queue.""" + self._queue.append(ref) with self._lock: - self._queue.add(ref) + self._memory_usage += ref.size_bytes() self._num_blocks += len(ref.blocks) if ref.output_split_idx is not None: self._num_per_split[ref.output_split_idx] += 1 @@ -105,8 +104,7 @@ def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]: ret = None if output_split_idx is None: try: - with self._lock: - ret = self._queue.pop() + ret = self._queue.popleft() except IndexError: pass else: @@ -121,7 +119,7 @@ def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]: # preserve the order of ref bundles with different output splits. with self._lock: while len(self._queue) > 0: - ref = self._queue.pop() + ref = self._queue.popleft() self._outputs_by_split[ref.output_split_idx].append(ref) try: ret = split_queue.popleft() @@ -130,6 +128,7 @@ def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]: if ret is None: return None with self._lock: + self._memory_usage -= ret.size_bytes() self._num_blocks -= len(ret.blocks) if ret.output_split_idx is not None: self._num_per_split[ret.output_split_idx] -= 1 @@ -138,6 +137,7 @@ def pop(self, output_split_idx: Optional[int] = None) -> Optional[RefBundle]: def clear(self): with self._lock: self._queue.clear() + self._memory_usage = 0 self._num_blocks = 0 self._num_per_split.clear() diff --git a/python/ray/data/tests/test_bundle_queue.py b/python/ray/data/tests/test_bundle_queue.py deleted file mode 100644 index 4d06b74189f6..000000000000 --- a/python/ray/data/tests/test_bundle_queue.py +++ /dev/null @@ -1,130 +0,0 @@ -from typing import Any - -import pyarrow as pa -import pytest - -import ray -from ray.data._internal.execution.bundle_queue import create_bundle_queue -from ray.data._internal.execution.interfaces import RefBundle -from ray.data.block import BlockAccessor - - -def _create_bundle(data: Any) -> RefBundle: - """Create a RefBundle with a single row with the given data.""" - block = pa.Table.from_pydict({"data": [data]}) - block_ref = ray.put(block) - metadata = BlockAccessor.for_block(block).get_metadata() - return RefBundle([(block_ref, metadata)], owns_blocks=False) - - -# CVGA-start -def test_add_and_length(): - queue = create_bundle_queue() - queue.add(_create_bundle("test1")) - queue.add(_create_bundle("test2")) - assert len(queue) == 2 - - -def test_pop(): - queue = create_bundle_queue() - bundle1 = _create_bundle("test1") - queue.add(bundle1) - bundle2 = _create_bundle("test2") - queue.add(bundle2) - - popped_bundle = queue.pop() - assert popped_bundle is bundle1 - assert len(queue) == 1 - - -def test_peek(): - queue = create_bundle_queue() - bundle1 = _create_bundle("test1") - queue.add(bundle1) - bundle2 = _create_bundle("test2") - queue.add(bundle2) - - peeked_bundle = queue.peek() - assert peeked_bundle is bundle1 - assert len(queue) == 2 # Length should remain unchanged - - -def test_pop_empty_queue(): - queue = create_bundle_queue() - with pytest.raises(IndexError): - queue.pop() - - -def test_pop_does_not_leak_objects(): - queue = create_bundle_queue() - bundle1 = _create_bundle("test1") - queue.add(bundle1) - queue.pop() - assert queue.is_empty() - - -def test_peek_empty_queue(): - queue = create_bundle_queue() - assert queue.peek() is None - assert queue.is_empty() - - -def test_remove(): - queue = create_bundle_queue() - bundle1 = _create_bundle("test1") - bundle2 = _create_bundle("test2") - queue.add(bundle1) - queue.add(bundle2) - - queue.remove(bundle1) - assert len(queue) == 1 - assert queue.peek() is bundle2 - - -def test_remove_does_not_leak_objects(): - queue = create_bundle_queue() - bundle1 = _create_bundle("test1") - queue.add(bundle1) - queue.remove(bundle1) - assert queue.is_empty() - - -def test_add_and_remove_duplicates(): - queue = create_bundle_queue() - bundle1 = _create_bundle("test1") - bundle2 = _create_bundle("test2") - queue.add(bundle1) - queue.add(bundle2) - queue.add(bundle1) - - assert len(queue) == 3 - queue.remove(bundle1) - assert len(queue) == 2 - assert queue.peek() is bundle2 - - -def test_clear(): - queue = create_bundle_queue() - queue.add(_create_bundle("test1")) - queue.add(_create_bundle("test2")) - queue.clear() - assert len(queue) == 0 - assert queue.estimate_size_bytes() == 0 - assert queue.is_empty() - - -def test_estimate_size_bytes(): - queue = create_bundle_queue() - bundle1 = _create_bundle("test1") - bundle2 = _create_bundle("test2") - queue.add(bundle1) - queue.add(bundle2) - assert queue.estimate_size_bytes() == bundle1.size_bytes() + bundle2.size_bytes() - - -# CVGA-end - -if __name__ == "__main__": - import sys - - sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index 04656c7adfc3..153fc38fb7eb 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -788,11 +788,11 @@ def test_block_ref_bundler_basic(target, in_bundles, expected_bundles): for bundle in bundles: bundler.add_bundle(bundle) while bundler.has_bundle(): - out_bundle = _get_bundles(bundler.get_next_bundle()[1]) + out_bundle = _get_bundles(bundler.get_next_bundle()) out_bundles.append(out_bundle) bundler.done_adding_bundles() if bundler.has_bundle(): - out_bundle = _get_bundles(bundler.get_next_bundle()[1]) + out_bundle = _get_bundles(bundler.get_next_bundle()) out_bundles.append(out_bundle) assert len(out_bundles) == len(expected_bundles) for bundle, expected in zip(out_bundles, expected_bundles): @@ -820,12 +820,10 @@ def test_block_ref_bundler_uniform( for bundle in bundles: bundler.add_bundle(bundle) while bundler.has_bundle(): - _, out_bundle = bundler.get_next_bundle() - out_bundles.append(out_bundle) + out_bundles.append(bundler.get_next_bundle()) bundler.done_adding_bundles() if bundler.has_bundle(): - _, out_bundle = bundler.get_next_bundle() - out_bundles.append(out_bundle) + out_bundles.append(bundler.get_next_bundle()) assert len(out_bundles) == num_out_bundles for out_bundle in out_bundles: assert out_bundle.num_rows() == out_bundle_size diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index d8d85515092c..28caa6f6773d 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -43,9 +43,6 @@ def gen_expected_metrics( metrics = [ "'average_num_outputs_per_task': N", "'average_bytes_per_output': N", - "'obj_store_mem_internal_inqueue': Z", - "'obj_store_mem_internal_outqueue': Z", - "'obj_store_mem_pending_task_inputs': Z", "'average_bytes_inputs_per_task': N", "'average_bytes_outputs_per_task': N", "'num_inputs_received': N", @@ -71,7 +68,10 @@ def gen_expected_metrics( f"{'N' if task_backpressure else 'Z'}" ), "'obj_store_mem_internal_inqueue_blocks': Z", + "'obj_store_mem_internal_inqueue': Z", "'obj_store_mem_internal_outqueue_blocks': Z", + "'obj_store_mem_internal_outqueue': Z", + "'obj_store_mem_pending_task_inputs': Z", "'obj_store_mem_freed': N", f"""'obj_store_mem_spilled': {"N" if spilled else "Z"}""", "'obj_store_mem_used': A", @@ -80,8 +80,6 @@ def gen_expected_metrics( ] else: metrics = [ - "'obj_store_mem_internal_inqueue': Z", - "'obj_store_mem_internal_outqueue': Z", "'num_inputs_received': N", "'bytes_inputs_received': N", "'num_outputs_taken': N", @@ -91,7 +89,9 @@ def gen_expected_metrics( f"{'N' if task_backpressure else 'Z'}" ), "'obj_store_mem_internal_inqueue_blocks': Z", + "'obj_store_mem_internal_inqueue': Z", "'obj_store_mem_internal_outqueue_blocks': Z", + "'obj_store_mem_internal_outqueue': Z", "'obj_store_mem_used': A", "'cpu_usage': Z", "'gpu_usage': Z", @@ -537,9 +537,6 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " extra_metrics={\n" " average_num_outputs_per_task: N,\n" " average_bytes_per_output: N,\n" - " obj_store_mem_internal_inqueue: Z,\n" - " obj_store_mem_internal_outqueue: Z,\n" - " obj_store_mem_pending_task_inputs: Z,\n" " average_bytes_inputs_per_task: N,\n" " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" @@ -562,7 +559,10 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context): " block_generation_time: N,\n" " task_submission_backpressure_time: N,\n" " obj_store_mem_internal_inqueue_blocks: Z,\n" + " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue_blocks: Z,\n" + " obj_store_mem_internal_outqueue: Z,\n" + " obj_store_mem_pending_task_inputs: Z,\n" " obj_store_mem_freed: N,\n" " obj_store_mem_spilled: Z,\n" " obj_store_mem_used: A,\n" @@ -651,9 +651,6 @@ def check_stats(): " extra_metrics={\n" " average_num_outputs_per_task: N,\n" " average_bytes_per_output: N,\n" - " obj_store_mem_internal_inqueue: Z,\n" - " obj_store_mem_internal_outqueue: Z,\n" - " obj_store_mem_pending_task_inputs: Z,\n" " average_bytes_inputs_per_task: N,\n" " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" @@ -676,7 +673,10 @@ def check_stats(): " block_generation_time: N,\n" " task_submission_backpressure_time: N,\n" " obj_store_mem_internal_inqueue_blocks: Z,\n" + " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue_blocks: Z,\n" + " obj_store_mem_internal_outqueue: Z,\n" + " obj_store_mem_pending_task_inputs: Z,\n" " obj_store_mem_freed: N,\n" " obj_store_mem_spilled: Z,\n" " obj_store_mem_used: A,\n" @@ -720,9 +720,6 @@ def check_stats(): " extra_metrics={\n" " average_num_outputs_per_task: N,\n" " average_bytes_per_output: N,\n" - " obj_store_mem_internal_inqueue: Z,\n" - " obj_store_mem_internal_outqueue: Z,\n" - " obj_store_mem_pending_task_inputs: Z,\n" " average_bytes_inputs_per_task: N,\n" " average_bytes_outputs_per_task: N,\n" " num_inputs_received: N,\n" @@ -745,7 +742,10 @@ def check_stats(): " block_generation_time: N,\n" " task_submission_backpressure_time: N,\n" " obj_store_mem_internal_inqueue_blocks: Z,\n" + " obj_store_mem_internal_inqueue: Z,\n" " obj_store_mem_internal_outqueue_blocks: Z,\n" + " obj_store_mem_internal_outqueue: Z,\n" + " obj_store_mem_pending_task_inputs: Z,\n" " obj_store_mem_freed: N,\n" " obj_store_mem_spilled: Z,\n" " obj_store_mem_used: A,\n"