Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Oct 16, 2025

Description

This change primarily converts OpResourceAllocator APIs to make data flow explicit by exposing required params in the APIs.

Additionally:

  1. Abstracting common methods inside OpResourceAllocator base-class.
  2. Adding allocation to progress bar in verbose mode logging budgets & allocations.
  3. Adding byte-size of all enqueued blocks to the progress bar

Related issues

Types of change

  • Bug fix 🐛
  • New feature ✨
  • Enhancement 🚀
  • Code refactoring 🔧
  • Documentation update 📖
  • Chore 🧹
  • Style 🎨

Checklist

Does this PR introduce breaking changes?

  • Yes ⚠️
  • No

Testing:

  • Added/updated tests for my changes
  • Tested the changes manually
  • This PR is not tested ❌ (please explain why)

Code Quality:

  • Signed off every commit (git commit -s)
  • Ran pre-commit hooks (setup guide)

Documentation:

  • Updated documentation (if applicable) (contribution guide)
  • Added new APIs to doc/source/ (if applicable)

Additional context

@alexeykudinkin alexeykudinkin requested a review from a team as a code owner October 16, 2025 06:22
@gemini-code-assist
Copy link
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Oct 16, 2025
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Oct 16, 2025
@alexeykudinkin alexeykudinkin changed the title [WIP][Data] Cleaning up OpResourceAllocator APIs [Data] Cleaning up OpResourceAllocator APIs Oct 17, 2025
@alexeykudinkin alexeykudinkin changed the title [Data] Cleaning up OpResourceAllocator APIs [Data] Revisiting OpResourceAllocator to make data flow explicit Oct 17, 2025
return self._actor_pool.get_actor_info()

def get_max_concurrency_limit(self) -> Optional[int]:
return self._actor_pool.max_size() * self._actor_pool.max_actor_concurrency()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope for this PR since this is an existing issue, but if self._actor_pool.max_size() is float("inf"), I think we'd probably want to return None rather than float("inf") for consistency with the return type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call

  • Looked t/h the code and we need to holistically clean this up (since we define max_size as int)

5000.0,
]
task_completion_time: float = metric_field(
task_completion_time_s: float = metric_field(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update test_stats.py and the dashboard code after renaming these metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will do

else 0
)

return self._pending_dispatch_input_bundles_count() + internal_queue_size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the internal queue size to represent blocks rather than bundles, then total_enqueued_input_bundles will return incorrect values, and DownstreamCapacityBackpressurePolicy will break.

I think even if we update total_enqueued_input_bundles to represent blocks, we'd still need to update the DownstreamCapacityBackpressurePolicy logic:

avg_inputs_per_task = (
output_dependency.metrics.num_task_inputs_processed
/ max(output_dependency.metrics.num_tasks_finished, 1)
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we need to fix that across the board

Comment on lines 19 to 21
"""Returns Operator's internal queue size"""
"""Returns Operator's internal queue size (in blocks)"""
...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we hoping to achieve by changing the unit of internal_queue_size from bundles to blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that we're assuming that every bundle holds just 1 block, which is not enforced

def __init__(self, topology: "Topology"):
self._topology = topology
self._idle_detector = self.IdleDetector()
self._ticker = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is updated in update_budgets, but is it used anywhere else?

Are subclasses required to increment this? If so, I think this should be an explicit part of the interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed to clean up

Comment on lines +455 to +495
@abstractmethod
def can_submit_new_task(self, op: PhysicalOperator) -> bool:
"""Return whether the given operator can submit a new task."""
...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for copying this from the backpressure policy interface to here? Would the implementation ever be non-trivial?

If the implementation of this method is always going to be like below, it might be better to remove the method to make the OpResourceAllocator interface deeper and simpler

def can_submit_new_task(self, op):
    return op.incremental_resource_usage().satisfies_limit(budget)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea here is that the logic whether task can be scheduled should live w/ Resource Allocator (it will be more complicated than the one you referred above)

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

@alexeykudinkin alexeykudinkin requested a review from a team as a code owner October 23, 2025 05:02
@alexeykudinkin alexeykudinkin changed the base branch from master to ak/bndl-blk-fix October 23, 2025 05:20
cursor[bot]

This comment was marked as outdated.

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me.

Let's merge #58030 first to minimize size of the diff, and then merge this one?

@ray.remote
def test_import():
import file_module

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated?

cursor[bot]

This comment was marked as outdated.

@alexeykudinkin alexeykudinkin deleted the branch ray-project:master October 27, 2025 20:31
@alexeykudinkin alexeykudinkin changed the base branch from ak/bndl-blk-fix to master October 27, 2025 20:37
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>

# Conflicts:
#	python/ray/data/_internal/execution/streaming_executor_state.py

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>

# Conflicts:
#	python/ray/data/tests/test_autoscaler.py

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
op,
task_resource_usage=self._op_usages,
output_object_store_usage=self._mem_op_outputs,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Inconsistent Return Types in Resource Management

Type mismatch bug: ResourceManager.max_task_output_bytes_to_read() declares return type as int but calls self._op_resource_allocator.max_task_output_bytes_to_read() which returns Optional[int]. The abstract method in OpResourceAllocator and its implementation in ReservationOpResourceAllocator can return None, but the wrapper method signature promises to always return int. This will cause runtime type errors when None is returned but an int is expected by callers.

Fix in Cursor Fix in Web

@alexeykudinkin alexeykudinkin merged commit 95b011f into ray-project:master Oct 27, 2025
6 checks passed
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…ay-project#57788)

<!-- Thank you for contributing to Ray! 🚀 -->
<!-- Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->
<!-- 💡 Tip: Mark as draft if you want early feedback, or ready for
review when it's complete -->

## Description

This change primarily converts `OpResourceAllocator` APIs to make data
flow explicit by exposing required params in the APIs.

Additionally:

1. Abstracting common methods inside `OpResourceAllocator` base-class.
2. Adding allocation to progress bar in verbose mode logging budgets &
allocations.
3. Adding byte-size of all enqueued blocks to the progress bar

## Related issues

<!-- Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234" -->

## Types of change

- [ ] Bug fix 🐛
- [ ] New feature ✨
- [ ] Enhancement 🚀
- [ ] Code refactoring 🔧
- [ ] Documentation update 📖
- [ ] Chore 🧹
- [ ] Style 🎨

## Checklist

**Does this PR introduce breaking changes?**
- [ ] Yes ⚠️
- [ ] No
<!-- If yes, describe what breaks and how users should migrate -->

**Testing:**
- [ ] Added/updated tests for my changes
- [ ] Tested the changes manually
- [ ] This PR is not tested ❌ _(please explain why)_

**Code Quality:**
- [ ] Signed off every commit (`git commit -s`)
- [ ] Ran pre-commit hooks ([setup
guide](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))

**Documentation:**
- [ ] Updated documentation (if applicable) ([contribution
guide](https://docs.ray.io/en/latest/ray-contribute/docs.html))
- [ ] Added new APIs to `doc/source/` (if applicable)

## Additional context

<!-- Optional: Add screenshots, examples, performance impact, breaking
change details -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…ay-project#57788)

<!-- Thank you for contributing to Ray! 🚀 -->
<!-- Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->
<!-- 💡 Tip: Mark as draft if you want early feedback, or ready for
review when it's complete -->

## Description

This change primarily converts `OpResourceAllocator` APIs to make data
flow explicit by exposing required params in the APIs.

Additionally:

1. Abstracting common methods inside `OpResourceAllocator` base-class.
2. Adding allocation to progress bar in verbose mode logging budgets &
allocations.
3. Adding byte-size of all enqueued blocks to the progress bar

## Related issues

<!-- Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234" -->

## Types of change

- [ ] Bug fix 🐛
- [ ] New feature ✨
- [ ] Enhancement 🚀
- [ ] Code refactoring 🔧
- [ ] Documentation update 📖
- [ ] Chore 🧹
- [ ] Style 🎨

## Checklist

**Does this PR introduce breaking changes?**
- [ ] Yes ⚠️
- [ ] No
<!-- If yes, describe what breaks and how users should migrate -->

**Testing:**
- [ ] Added/updated tests for my changes
- [ ] Tested the changes manually
- [ ] This PR is not tested ❌ _(please explain why)_

**Code Quality:**
- [ ] Signed off every commit (`git commit -s`)
- [ ] Ran pre-commit hooks ([setup
guide](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))

**Documentation:**
- [ ] Updated documentation (if applicable) ([contribution
guide](https://docs.ray.io/en/latest/ray-contribute/docs.html))
- [ ] Added new APIs to `doc/source/` (if applicable)

## Additional context

<!-- Optional: Add screenshots, examples, performance impact, breaking
change details -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants