Skip to content

Conversation

@srinathk10
Copy link
Contributor

@srinathk10 srinathk10 commented Oct 22, 2025

…e buildup

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] ConcurrencyCapBackpressurePolicy - Handle internal output

queue buildup

Issue

  • When there is internal output queue buildup specifically when
    preserve_order is set, we don't limit task concurrency in streaming
    executor and just honor static concurrency cap.
  • When concurrency cap is unlimited, we keep queuing more Blocks into
    internal output queue leading to spill and steep spill curve.

Solution

In ConcurrencyCapBackpressurePolicy, detect internal output queue
buildup and then limit the concurrency of the tasks.

  • Keep the internal output queue history and detect trends in percentage
    & size in GBs. Based on trends, increase/decrease the concurrency cap.
  • Given queue based buffering is needed for preserve_order, allow
    adaptive queuing threshold. This would result in Spill, but would
    flatten out the Spill curve and not cause run away buffering queue
    growth.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

…e buildup

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 requested a review from a team as a code owner October 22, 2025 17:00
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a sophisticated adaptive backpressure policy, ConcurrencyCapBackpressurePolicy, to dynamically manage operator concurrency based on queue pressure. This is a significant enhancement, moving from a static concurrency cap to a dynamic one using EWMA for queue level and deviation, an adaptive threshold, and a quantized step controller. The changes also include refactoring ResourceManager to centralize operator eligibility and downstream traversal logic, and adding a new context flag to enable this feature.

The new backpressure logic is complex but well-documented with extensive comments and examples. The accompanying unit tests are very thorough, which is excellent.

I've found one critical bug in the controller logic that makes one of the control branches unreachable, and another minor issue with what appears to be dead code in the threshold update logic. After addressing these points, this will be a solid improvement to Ray Data's execution engine.

cursor[bot]

This comment was marked as outdated.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Oct 22, 2025
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Oct 22, 2025
Comment on lines 400 to 403
return self.get_mem_op_internal(op) + self.get_op_outputs_usage_with_downstream(
op
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead expose _get_op_internal_object_store_usage

else:
yield from self.get_downstream_eligible_ops(next_op)

def get_op_outputs_usage_with_downstream(self, op: PhysicalOperator) -> float:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def get_op_outputs_usage_with_downstream(self, op: PhysicalOperator) -> float:
def _get_op_outputs_object_store_usage_with_downstream(self, op: PhysicalOperator) -> float:

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10
Copy link
Contributor Author

Training Benchmark results

Baseline vs After

+-----------------------------------------------+-----------+--------+
| Benchmark                                     | Baseline  | After  |
+-----------------------------------------------+-----------+--------+
| skip_training.jpeg.local_fs_multi_gpus        | 2649      | 2617   |
| skip_training.jpeg.local_fs_multi_gpus.preserve_order | 2600      | 2589   |
| skip_training.jpeg.local_fs                   | 2040      | 2063   |
| skip_training.jpeg.local_fs.preserve_order    | 2025      | 2024   |
| skip_training.jpeg                            | 3297      | 3212   |
| skip_training.jpeg.preserve_order             | 1641      | 1461   |
| skip_training.parquet                         | 11645     | 10359  |
| skip_training.parquet.preserve_order          | 9831      | 11330  |
+-----------------------------------------------+-----------+--------+

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@alexeykudinkin alexeykudinkin merged commit c650eaf into master Oct 23, 2025
6 checks passed
@alexeykudinkin alexeykudinkin deleted the srinathk10/concurrency_cap_backpressure_policy branch October 23, 2025 00:12
xinyuangui2 pushed a commit to xinyuangui2/ray that referenced this pull request Oct 27, 2025
…e buildup (ray-project#57996)

…e buildup

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Handle internal output
queue buildup

**Issue**

- When there is internal output queue buildup specifically when
preserve_order is set, we don't limit task concurrency in streaming
executor and just honor static concurrency cap.
- When concurrency cap is unlimited, we keep queuing more Blocks into
internal output queue leading to spill and steep spill curve.

**Solution**

In ConcurrencyCapBackpressurePolicy, detect internal output queue
buildup and then limit the concurrency of the tasks.

- Keep the internal output queue history and detect trends in percentage
& size in GBs. Based on trends, increase/decrease the concurrency cap.
- Given queue based buffering is needed for `preserve_order`, allow
adaptive queuing threshold. This would result in Spill, but would
flatten out the Spill curve and not cause run away buffering queue
growth.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…e buildup (ray-project#57996)

…e buildup

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Handle internal output
queue buildup

**Issue**

- When there is internal output queue buildup specifically when
preserve_order is set, we don't limit task concurrency in streaming
executor and just honor static concurrency cap.
- When concurrency cap is unlimited, we keep queuing more Blocks into
internal output queue leading to spill and steep spill curve.


**Solution**

In ConcurrencyCapBackpressurePolicy, detect internal output queue
buildup and then limit the concurrency of the tasks.

- Keep the internal output queue history and detect trends in percentage
& size in GBs. Based on trends, increase/decrease the concurrency cap.
- Given queue based buffering is needed for `preserve_order`, allow
adaptive queuing threshold. This would result in Spill, but would
flatten out the Spill curve and not cause run away buffering queue
growth.


## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…e buildup (ray-project#57996)

…e buildup

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Handle internal output
queue buildup

**Issue**

- When there is internal output queue buildup specifically when
preserve_order is set, we don't limit task concurrency in streaming
executor and just honor static concurrency cap.
- When concurrency cap is unlimited, we keep queuing more Blocks into
internal output queue leading to spill and steep spill curve.

**Solution**

In ConcurrencyCapBackpressurePolicy, detect internal output queue
buildup and then limit the concurrency of the tasks.

- Keep the internal output queue history and detect trends in percentage
& size in GBs. Based on trends, increase/decrease the concurrency cap.
- Given queue based buffering is needed for `preserve_order`, allow
adaptive queuing threshold. This would result in Spill, but would
flatten out the Spill curve and not cause run away buffering queue
growth.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

3 participants