-
Notifications
You must be signed in to change notification settings - Fork 7k
[Data] Sample finalized partitions randomly to avoid lensing finalization on a single node #58456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the finalization logic in the hash shuffle operator to randomly sample partitions instead of processing them sequentially. This is a valuable change that should help distribute the finalization load more evenly across the cluster and avoid potential node hotspots. My review includes one suggestion to optimize the new sampling logic for better performance.
| target_partition_ids = random.sample( | ||
| list(self._pending_finalization_partition_ids), next_batch_size | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The random.sample function can operate directly on sets, so converting self._pending_finalization_partition_ids to a list is unnecessary. Removing the list() conversion will improve performance by avoiding the creation of a new list in each call, which can be expensive if the number of pending partitions is large.
| target_partition_ids = random.sample( | |
| list(self._pending_finalization_partition_ids), next_batch_size | |
| ) | |
| target_partition_ids = random.sample( | |
| self._pending_finalization_partition_ids, next_batch_size | |
| ) |
| # and avoid effect of "sliding lense" effect where we finalize the batch of | ||
| # N *adjacent* partitions that may be co-located on the same node: | ||
| # | ||
| # - Adjacent partitions i and i+1 are handled by adjacent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait is this true? if module N = num actors, then partition i and i + 1 must necessarily be in different actors. Oh wait nvm, i see what your saying
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
| # - Adjacent aggregators have high likelihood of running on the | ||
| # same node (when num aggregators > num nodes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this necessarily true? your default strategy is spread, and each aggregator is scheduled with same num of resources, so aggregator i and i + 1 have as much of a chance of scheduling on the same node as aggregator i and j. please correct my assumptions if im wrong
| # | ||
| # NOTE: This doesn't affect determinism, since this only impacts order | ||
| # of finalization (hence not required to be seeded) | ||
| target_partition_ids = random.sample( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So wouldn't a better strategy be to check how much each agg actor is currently consuming relative to the node's capacity and schedule the finalization if there's remaining capacity?
I just find the randomization strategy harder to reason in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it's a function of partition size, so ideally if we can get metadata about the partition before scheduling the finalize() that would be even better.
…tion on a single node (ray-project#58456) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description Currently, finalization is scheduled in batches sequentially -- ie batch of N adjacent partitions is finalized at once (in a sliding window). This creates a lensing effect since: 1. Adjacent partitions i and i+1 get scheduled onto adjacent aggregators j and j+i (since membership is determined as j = i % num_aggregators) 2. Adjacent aggregators have high likelihood of getting scheduled on the same node (due to similarly being scheduled at about the same time in sequence) To address that this change applies random sampling when choosing next partitions to finalize to make sure partitions are chosen uniformly reducing concurrent finalization of the adjacent partitions. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…tion on a single node (ray-project#58456) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description Currently, finalization is scheduled in batches sequentially -- ie batch of N adjacent partitions is finalized at once (in a sliding window). This creates a lensing effect since: 1. Adjacent partitions i and i+1 get scheduled onto adjacent aggregators j and j+i (since membership is determined as j = i % num_aggregators) 2. Adjacent aggregators have high likelihood of getting scheduled on the same node (due to similarly being scheduled at about the same time in sequence) To address that this change applies random sampling when choosing next partitions to finalize to make sure partitions are chosen uniformly reducing concurrent finalization of the adjacent partitions. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…tion on a single node (ray-project#58456) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description Currently, finalization is scheduled in batches sequentially -- ie batch of N adjacent partitions is finalized at once (in a sliding window). This creates a lensing effect since: 1. Adjacent partitions i and i+1 get scheduled onto adjacent aggregators j and j+i (since membership is determined as j = i % num_aggregators) 2. Adjacent aggregators have high likelihood of getting scheduled on the same node (due to similarly being scheduled at about the same time in sequence) To address that this change applies random sampling when choosing next partitions to finalize to make sure partitions are chosen uniformly reducing concurrent finalization of the adjacent partitions. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
…tion on a single node (ray-project#58456) > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description Currently, finalization is scheduled in batches sequentially -- ie batch of N adjacent partitions is finalized at once (in a sliding window). This creates a lensing effect since: 1. Adjacent partitions i and i+1 get scheduled onto adjacent aggregators j and j+i (since membership is determined as j = i % num_aggregators) 2. Adjacent aggregators have high likelihood of getting scheduled on the same node (due to similarly being scheduled at about the same time in sequence) To address that this change applies random sampling when choosing next partitions to finalize to make sure partitions are chosen uniformly reducing concurrent finalization of the adjacent partitions. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
Description
Currently, finalization is scheduled in batches sequentially -- ie batch of N adjacent partitions is finalized at once (in a sliding window).
This creates a lensing effect since:
To address that this change applies random sampling when choosing next partitions to finalize to make sure partitions are chosen uniformly reducing concurrent finalization of the adjacent partitions.
Related issues
Additional information