Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Revisiting make_async_gen to address issues with concurrency control for sequences of varying lengths #51661

Merged
merged 32 commits into from
Mar 27, 2025

Conversation

alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Mar 25, 2025

Why are these changes needed?

This change addresses potential deadlocks inside make_async_gen when used in with functions producing sequences of wildly varying in lengths.

Fundamentally make_async_gen was trying to solve 2 problems respective solutions for which never actually overlapped:

  1. Implement parallel processing based on transforming an input iterator into an output one, while preserving back-pressure semantic, where input iterator should not be outpacing output iterator being consumed.

  2. Implement parallel processing allowing ordering of the input iterator being preserved.

These requirements coupled with the fact the transformation is expected to received and produce iterators are what led to erroneous deduction that it could be implemented:

  • Transforming iterators is very different from bijective mapping: we actually don't know how many input elements will result into a single output element (ie transformation is a black box that could be anything from 1-to-1 to many-to-many)
  • Preserving ordering of the transformation of iterators requires N input and output queues (1 per worker) as well as bot h producer and consumer fill/draw these queues in the same consistent order (without skipping!)
  • Because there could be no skipping (to preserve the order) there could be a case where some input AND output queues get full at the same time getting both producer and consumer stuck and not able to make progress

To resolve that problem fundamentally we decoupling this 2 use-cases into

  1. Preserving order: has N input and output queues, with the input queues being uncapped (while output queues still being capped at queue_buffer_size), meaning that incoming iterator will be unrolled eagerly by the producer (till exhaustion)

  2. Not preserving order: has 1 input queue and N output queues, with both input and output queues being capped in size based queue_buffer_size configuration. This allows to implement back-pressure semantic where consumption speed will limit production speed (and amount of buffered data)

Changes

  • Added stress-test successfully repro-ing deadlocks on the current impl
  • Added preserve_ordering param
  • Adjusted semantic to handle preserve_ordering=True/False scenarios separately
  • Beefed up existing tests
  • Tidying up

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin requested a review from a team as a code owner March 25, 2025 03:53
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@@ -915,49 +918,14 @@ def make_async_gen(
base_iterator: Iterator[T],
fn: Callable[[Iterator[T]], Iterator[U]],
num_workers: int = 1,
queue_buffer_size: int = 2,
queue_buffer_size: Optional[int] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raulchen i'd recommend reviewing in isolation as this was written from scratch

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin changed the title [WIP][Data] Revisiting make_async_gen to address issues with concurrency control for sequences of varying lengths [Data] Revisiting make_async_gen to address issues with concurrency control for sequences of varying lengths Mar 25, 2025
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Mar 25, 2025
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Fixed fixture

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

iter(range(3)),
_transform_b,
):
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, maybe use multiple workers and test that transform fn is entered at most once per worker.
and for simplicity in the multi-threading case, we can just use a counter instead of capturing the logs.

@raulchen
Copy link
Contributor

Can you update the PR description as the fix has changed?

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
… on whether ordering has to be preserved;

Updated docs

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
… queue instead of N

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@raulchen raulchen merged commit b7dae2a into master Mar 27, 2025
5 checks passed
@raulchen raulchen deleted the ak/asnc-gen-ddlk-fix branch March 27, 2025 18:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants