-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[train] Eager Data Resource Cleanup on Train Run Failures and Aborts #58325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[train] Eager Data Resource Cleanup on Train Run Failures and Aborts #58325
Conversation
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This PR correctly renames DatasetsSetupCallback to DatasetsCallback. I've added a few suggestions to improve consistency in related parts of the code, such as a variable name, a test function name, and a docstring, to fully align with this change.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com>
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
python/ray/train/v2/_internal/execution/worker_group/worker_group.py
Outdated
Show resolved
Hide resolved
…creation Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
Co-authored-by: Justin Yu <justinvyu@anyscale.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com>
python/ray/train/v2/_internal/execution/worker_group/worker_group.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Justin Yu <justinvyu@anyscale.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com>
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
justinvyu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Should be good after this. Nice tests
| # Two coordinator actors, one for each sharded dataset | ||
| coordinator_actors = callback._coordinator_actors | ||
| assert len(coordinator_actors) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also add a call to after_worker_group_shutdown/abort and check the liveness of the actors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callbacks don't kill the SplitCoordinator actors as of now, they only shutdown their data executors. Ref counting should take care of them. That said, I added two more tests for the after_worker_group_shutdown/abort hooks.
Co-authored-by: Justin Yu <justinvyu@anyscale.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com>
Co-authored-by: Justin Yu <justinvyu@anyscale.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com>
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
justinvyu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
| """Eagerly shutdown the data executors of the split coordinator actors.""" | ||
| self._shutdown_refs = [ | ||
| coord.shutdown_executor.remote() for coord in self._coordinator_actors | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Shutdown refs overwritten across worker group restarts
When a worker group is restarted due to failure or rescaling, _shutdown_data_executors is called for each worker group instance, but it overwrites self._shutdown_refs instead of appending to it. This means shutdown refs from previous worker group attempts are lost and never awaited in before_controller_shutdown, potentially causing incomplete resource cleanup from earlier failed attempts.
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
| with self._lock: | ||
| # Call shutdown on the executor | ||
| if self._executor is not None: | ||
| self._executor.shutdown(force=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to shutdown multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is. That said, we only ever call it once per SplitCoordinator.
…ject#58325) Following a worker failure or a user abort during a Train job, the execution of sharded datasets (provided through get_dataset_shard) is ungracefully shutdown. Consequently, any ongoing resource request made by a sharded dataset's SplitCoordinator to the AutoscalingRequester is not cancelled. This can result in resources being held for a preset timeout, leading to inefficient cluster utilization and slower train job turnarounds. - Implements an eager shutdown path to cleanup resource requests made to the AutoscalingRequester (depicted below) - Adds new WorkerGroupCallback hooks(`after_worker_group_abort` and `after_worker_group_shutdown`) to DatasetsSetupCallback for the new shutdown path - Implements tests for the new cleanup path --------- Signed-off-by: JasonLi1909 <jasli1909@gmail.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Justin Yu <justinvyu@anyscale.com>
…ject#58325) Following a worker failure or a user abort during a Train job, the execution of sharded datasets (provided through get_dataset_shard) is ungracefully shutdown. Consequently, any ongoing resource request made by a sharded dataset's SplitCoordinator to the AutoscalingRequester is not cancelled. This can result in resources being held for a preset timeout, leading to inefficient cluster utilization and slower train job turnarounds. - Implements an eager shutdown path to cleanup resource requests made to the AutoscalingRequester (depicted below) - Adds new WorkerGroupCallback hooks(`after_worker_group_abort` and `after_worker_group_shutdown`) to DatasetsSetupCallback for the new shutdown path - Implements tests for the new cleanup path --------- Signed-off-by: JasonLi1909 <jasli1909@gmail.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Justin Yu <justinvyu@anyscale.com>
PR #58325 adds shutdown and abort hooks to enhance resource-cleanup logic in DatasetsSetupCallback, the callback’s responsibilities have expanded beyond initial setup. Accordingly, this PR renames it to DatasetsCallback for better alignment with its behavior. Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
…#59423) PR ray-project#58325 adds shutdown and abort hooks to enhance resource-cleanup logic in DatasetsSetupCallback, the callback’s responsibilities have expanded beyond initial setup. Accordingly, this PR renames it to DatasetsCallback for better alignment with its behavior. Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
…ject#58325) Following a worker failure or a user abort during a Train job, the execution of sharded datasets (provided through get_dataset_shard) is ungracefully shutdown. Consequently, any ongoing resource request made by a sharded dataset's SplitCoordinator to the AutoscalingRequester is not cancelled. This can result in resources being held for a preset timeout, leading to inefficient cluster utilization and slower train job turnarounds. - Implements an eager shutdown path to cleanup resource requests made to the AutoscalingRequester (depicted below) - Adds new WorkerGroupCallback hooks(`after_worker_group_abort` and `after_worker_group_shutdown`) to DatasetsSetupCallback for the new shutdown path - Implements tests for the new cleanup path --------- Signed-off-by: JasonLi1909 <jasli1909@gmail.com> Signed-off-by: Jason Li <57246540+JasonLi1909@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Justin Yu <justinvyu@anyscale.com>
Following a worker failure or a user abort during a Train job, the execution of sharded datasets (provided through get_dataset_shard) is ungracefully shutdown. Consequently, any ongoing resource request made by a sharded dataset's SplitCoordinator to the AutoscalingRequester is not cancelled. This can result in resources being held for a preset timeout, leading to inefficient cluster utilization and slower train job turnarounds.
To address the issue, this PR:
after_worker_group_abortandafter_worker_group_shutdown) to DatasetsSetupCallback for the new shutdown pathNote on new WorkerGroupCallback hooks:
The new WorkerGroupCallback hooks
after_worker_group_abortandafter_worker_group_shutdownwere added to ensure that the StreamingExecutor shutdown logic ran prior to the shutdown of train workers. This helps to avoid any race conditions and additional complexity related to timing executor shutdown while train workers are still alive.Diagram of the new cleanup path: