Skip to content

Conversation

@iamjustinhsu
Copy link
Contributor

@iamjustinhsu iamjustinhsu commented Dec 1, 2025

Description

Currently we support many types of bundle queues inside operators:

  • BlockRefBundler
  • FIFOBundleQueue
  • UnorderedOutputQueue
  • OrderedOutputQueue
  • List[RefBundle]
  • Deque[RefBundle]
  • StreamingRepartitionRefBundler

All these independent classes makes it very awkward to record queue metrics within operators. In fact, operators themselves have to manage it themselves. This PR introduces a base class called BaseBundleQueue that hopes to unify all the independent classes together.

class BaseBundleQueue:
    """Base class for storing bundles."""

    @abc.abstractmethod
    def add(self, bundle: RefBundle, **kwargs: Any) -> None:
        ...

    @abc.abstractmethod
    def get_next(self) -> RefBundle:
        ...

    @abc.abstractmethod
    def peek_next(self) -> Optional[RefBundle]:
        ...

    @abc.abstractmethod
    def has_next(self) -> bool:
        ...

    @abc.abstractmethod
    def clear(self):
        ...

    @abc.abstractmethod
    def done_adding_bundles(self, **kwargs: Any):
        ...

With this approach, I hope to

  • Simplify design of future operators with InternalQueueOperatorMixin, that will automatically get size of queues, as well as clearing queues for free in the mixin
  • Dedicated queue metrics, so that operators themselves don't need to track
  • Lower cognitive overhead of trying to understand what type of queue is being used within each operator. You now know it's always a BaseBundleQueue that stores RefBundles.

Future Work

This PR is a stepping stone to completely refractor how metrics are collected, stored, and exported. I'd like to refractor OpRuntimeMetrics because it assumes that all operators are MapOperators, which isn't true. After this PR, I have a few proposals to refractor OpRuntimeMetrics

  1. Create a BaseOpMetrics class, so that individual operators can override BaseOpMetrics. For operators with internal queues, we can automatically hook in QueueOpMetrics(BaseOpMetrics) too
  2. Allow classes themselves to report to _StatsActor directly through a something like a MetricsRegistry interface. This will require a lot of work, and I will have to repurpose the _StatsActor to read from that Registry.
  3. I'm also thinking we can just completely remove _StatsActor and _StatsManager, but I'm not sure if that is very clean.

Update on 12/2

Here is a summary of extensions
bundle_queue

Tests

TBD

Related issues

None

Additional information

None

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner December 1, 2025 19:30
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a BaseBundleQueue abstraction to unify the various queue implementations within Ray Data operators. This is a significant and valuable refactoring that simplifies operator design and centralizes queue metrics. The overall approach is solid, and the new queue implementations in fifo.py, hash_link.py, and ordered.py are well-designed.

However, I've found several issues with the implementation that need to be addressed:

  • There are a few critical correctness bugs, including a missing import in bundle_queue/__init__.py and incorrect buffer handling in OutputSplitter.
  • The rebundling queues (RebundleQueue and ExactRebundleQueue) have issues: one violates the peek_next contract by having side effects, and the other has a metric leak in get_next_with_original.
  • The transition to queue-based metrics is incomplete in MapOperator, leading to redundant and confusing metric tracking.
  • There are also some minor issues with incorrect type hints and misleading docstrings.

I've left detailed comments on each of these points. Addressing them will ensure the new abstraction is robust and correctly implemented.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Dec 2, 2025
Copy link
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly left comments surrounding the high level method names / APIs. I think solid start but some opportunities for tightening the naming up since we are already doing a big shake up. LMK if you need another review.

@@ -1,75 +0,0 @@
import abc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to fully scrap BundleQueue in favor of BaseBundleQueue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's mostly a rename into the base.py (i'm gonna make clear what is renamed and what is not after this comment), the only difference is that the base also contains a method called done_adding_bundles. This method is used for BlockRefBundler, StreamingRepartitionRefBunlder, and OrderedOutputQueue

"""Protocol for storing bundles AND supporting remove(bundle)
and contains(bundle) operations quickly."""

def __contains__(self, bundle: RefBundle) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is __contains__ not supported for the other queues? Should we maybe specify in the comment that this should both implement the method and be efficient or there might be degraded performance?

Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it can be supported for most queues, but I opted to not move it into the base implementation because

  • our existing code infrequently uses __contains__
  • BlockRefBundler and StreamingRefBundler have weird semantics for __contains__, because one moment the bundle is there, and the next it's gone due to "rebundling".
    On that note of bullet # 2, rather than have SupportsRebundling be a protocol, we can have it become SupportsRebundling(BaseBundleQueue), which only supports rebundling, and won't contain a __contains__ method. Haven't thought too deeply into this approach

Copy link
Contributor

@omatthew98 omatthew98 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yeah the current approach seems reasonable to me then. I think fine to keep the protocols as is rather than creating a more piecemeal approach where we have very tiny protocols.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested review from a team as code owners December 2, 2025 23:11
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/unify-internal-queue-abstraction branch from 2ddb01b to 1a58f57 Compare December 2, 2025 23:32
from ray.data._internal.execution.interfaces import RefBundle


class RebundleQueue(BaseBundleQueue, SupportsRebundling):
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is moved from map_operator.py. However, I augmented this class with peek.*, get_next_with_original, and clear methods below. The previous done_adding_bundles has been renamed to finalized



class StreamingRepartitionRefBundler(BaseRefBundler):
class ExactRebundleQueue(BaseBundleQueue, SupportsRebundling):
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was also moved from streaming_repartition.py. However, I augmented this class with get_next_with_original and clear methods below. The previous done_adding_bundles has been renamed to finalized

from ray.data._internal.execution.interfaces import RefBundle


class FIFOBundleQueue(BaseBundleQueue, SupportsDeque):
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is basically a wrapper around deque([]) of ref bundles. It is NOT the original FIFOBundleQueue. The original FIFOBundleQueue has been renamed to HashLinkedQueue


class FIFOBundleQueue(BundleQueue):
"""A bundle queue that follows a first-in-first-out policy."""
class HashLinkedQueue(BaseBundleQueue, SupportsIndexing, SupportsDeque):
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed from FIFOBundleQueue. I added peek_last, get_last, and add_to_front methods, as specified by the SupportsDeque protocol.

from ray.data._internal.execution.interfaces import RefBundle


class OrderedBundleQueue(BaseBundleQueue):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was moved from map_operator.py

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/unify-internal-queue-abstraction branch from 6c2d56b to 6c2e17b Compare December 3, 2025 19:36
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Copy link
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for churning those comments, one minor question but looks super solid.

that the # of blocks, rows, and bytes must be in remain unchanged
before and after this method call.
If queue.has_next() == False, return `None`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should raise a ValueError to differentiate between head of the queue is None and queue is empty like get_next? Not sure if this would be a problem but if we guard both get_next and peek_next with has_next that seems sane and consistent to me?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omatthew98

between head of the queue is None and queue is empty like get_next?

Wait, am i correct in understanding that some bundles are flat out None? I think ideally I would only store non None bundles.

if we guard both get_next and peek_next with has_next that seems sane and consistent to me?

I'm not entirely following, are you referring to a specific sub class? These are the semantics in my head

  • has_next()==True => get_next() and peek_next() are RefBundle
  • has_next()==False => get_next() raise error, and peek_next() is None
  • it's not possible for get_next() to return None

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants