Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stale blobs deletion during snapshot delete #2159

Closed
wants to merge 2 commits into from
Closed

Parallelize stale blobs deletion during snapshot delete #2159

wants to merge 2 commits into from

Conversation

piyushdaftary
Copy link
Contributor

@piyushdaftary piyushdaftary commented Feb 17, 2022

Signed-off-by: Piyush Daftary piyush.besu@gmail.com

Description

Currently during snapshot delete, deletion of unlinked shard level blob is single threaded using SNAPSHOT threadpool. Hence if there is huge number of unlinked shard level blob flies, it will take considerable amount of time to clean them.

Hence I propose to make unlinked shard level blob deletion multi threaded delete the same way we do for cleaning up of stale indices, to speedup the overall snapshot deletion process.

Issues Resolved

#2156

Check List

  • [Y] New functionality includes testing.
    • [Y] All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • [Y] Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Piyush Daftary <piyush.besu@gmail.com>
@piyushdaftary piyushdaftary requested a review from a team as a code owner February 17, 2022 21:01
@opensearch-ci-bot
Copy link
Collaborator

Can one of the admins verify this patch?

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Check success 9a48605
Log 2519

Reports 2519

}));
if (partition.isEmpty() == false) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify the above logic as below to avoid the clear of partition list since we anyways have to create a new list for each batch.

final List<String> partition = new ArrayList<>();
for (String key : filesToDelete) {
   partition.add(key);
   if (maxStaleBlobDeleteBatch == partition.size()) {
        staleFilesToDeleteInBatch.add(partition);
        partition = new ArrayList<>();
   }
}

if (partition.isEmpty() == false) {
    staleFilesToDeleteInBatch.add(partition);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above mentioned code is doing shallow copy of partition list, which will result in erroneous contents of staleFilesToDeleteInBatch BlockingQueue. The pt code do deep copy, thus is been implemented in such way.

);

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), filesToDelete.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use staleFilesToDeleteInBatch instead of filesToDelete to compute the worker count. For cases where there are < maxStaleBlobDeleteBatch count of files, then it will create only 1 batch whereas snapshot pool can have more than 1 max worker.

executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
}

} catch (Exception e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like all the exception is passed through the listener model so no really sure when this top level exception will be thrown here. But either way, it seems like we should invoke the listener passed in this method in case of this top level exception as well. Any reason to not do that ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ActionRunnable.wrap catches the exception and invokes l.onFailure(), so this will not be reached.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right that's what I thought, so we should remove this catch block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this catch statement should never be reached as we have ActionRunnable.wrap . But to be on safe side, I think it's better to blanket catch the exception and call listener.OnFailure from the catch block incase of any runtime exception.

Do you see any concern with the catch block ? @AmiStrn @sohami ?

// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale blobs", snapshotIds), e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the log message be .... during cleanup of unreferenced shard blobs

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), filesToDelete.size());
for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we are submitting the batches to delete in this fashion ? At high level, it is dividing files to delete in batches. Then each of those batches needs to be deleted in parallel by all the snapshot threadpool workers. We can achieve this by simply:

  1. creating the batches as done in staleFilesToDeleteInBatch. It can be a regular list instead of BlockingQueue
  2. Having a for loop for number of batches and submitting work to the snapshot threadpool. The threadpool will handle spawning multiple threads and assigning work to each of the thread up to the max number as needed.
private void executeStaleShardDelete(List<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener) throws InterruptedException {
        for (List<String> batchToDelete :  staleFilesToDeleteInBatch) {
               threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
                try {
                    deleteFromContainer(blobContainer(), batchToDelete);
                    l.onResponse(null);
                } catch (Exception e) {
                    logger.warn(() -> new ParameterizedMessage("{} Failed to delete blobs during snapshot delete", metadata.name()), e);
                    throw e;
                }
            }));
         }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I can chime in here - the method @piyushdaftary is using is a work-stealing method. there are only about 5 threads in the threadpool so the for loop will initialize only the number of threads available for this action. then each thread is going to steal work from the queue. this is very efficient and is done in the other methods in this class where parallelism was introduced.
therefore, the queue must be blocking, and the loop makes sense as well, the way it is now.

This ensures that the threadpool queue is never used, an important thing to achieve in order to avoid the queue being filled by this task in case of huge amount of shards to delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmiStrn : Thanks for sharing the context. Here for each task the batching is already done with 1k shards per batch. So if there are 50k shards to delete it will have overall 50 tasks to execute instead of 50k tasks. This behavior is same in both the ways (implemented vs suggested).

The benefit which the current implementation has over the suggested one is at any point in time it will submit only 5 tasks for this deletion in the queue. Once threads are available to execute those submitted ones then it will submit one more to the threadpool. This way we don't add 50 tasks all at once and give chance to other snapshot related tasks which are added in this common pool to be executed as well. It seems to me we are trying to provide fairness/priority to different snapshot tasks using the same threadpool. If that is the goal I think we should separate the threadpools for different snapshot tasks. Thoughts ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be ok as well and would simplify some things, but I would suggest that is done separately and refactored as part of another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sohami : Here filesToDelete is the list of repository files to delete (Shard Blobs) not the shards, so for 50K shards, the number of actual files may be way higher.

One of the reason submitting in batches in this fashion is to not let snapshot restore wait because of snapshot deletion taking long time to complete.

I don't see any immediate concern with current implementation and agree with @AmiStrn to do refactoring of the blocking queue and execute delete refactoring be done as separate PR with modification of the logic at all the places i.e snapshot restore, snapshot shard and snapshot delete stale indices.

*/
public static final Setting<Integer> MAX_STALE_BLOB_DELETE_BATCH_SIZE = Setting.intSetting(
"max_stale_blob_delete_batch_size",
1000,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this batch size is used for deleting the unreferenced shard blobs because of snapshot deletion. So probably a better name would be max_shard_blob_delete_batch_size. Also is default of 1000 chosen based on some experiment ?

Copy link
Contributor Author

@piyushdaftary piyushdaftary Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated variable name. Default size of 1000 is chosen because for almost all remote storage, max batch size of bulk delete object is 1000
References:
S3 : https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
And internally GCP and Azure have 1000 and 100 object delete limit internally.

Copy link
Contributor

@AmiStrn AmiStrn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could really help the efforts to reduce deletion time!
The GroupedActionListener needs to be handled carefully so that the countdown latch always reaches its goal.

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), filesToDelete.size());
for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I can chime in here - the method @piyushdaftary is using is a work-stealing method. there are only about 5 threads in the threadpool so the for loop will initialize only the number of threads available for this action. then each thread is going to steal work from the queue. this is very efficient and is done in the other methods in this class where parallelism was introduced.
therefore, the queue must be blocking, and the loop makes sense as well, the way it is now.

This ensures that the threadpool queue is never used, an important thing to achieve in order to avoid the queue being filled by this task in case of huge amount of shards to delete.

executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
}

} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ActionRunnable.wrap catches the exception and invokes l.onFailure(), so this will not be reached.

Comment on lines +963 to +965
}));

executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
Copy link
Contributor

@AmiStrn AmiStrn Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part missing for the work-stealing method used throughout this file. Why did you choose to have the recursion outside the initial thread? this is a (minor) problem since it doesn't correspond with the for loop initializing the threads.

Suggested change
}));
executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
}));

Also, this means you would need to change the way you handle errors. The exception would terminate the thread, so potentially you could reach a situation where the GroupedActionListener is waiting forever and the process gets stuck, for example:

  • X threads start running
  • Eventually, they all get an exception and stop the recursion before the queue is empty!
  • the GroupedActionListener is stuck waiting for the tasks to be done...

I suggest not throwing the exception, but invoking l.onFailue(e) if there is an exception. this way the other shards may still be handled.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionRunnable which is of type AbstractRunnable invokes the l.onFailure(e) when runnable throws any exception. Ref here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no reason not to simply invoke onFailure with the exception in it -> the same outcome, except that the exception would stop current recursion and process could hang.

@dblock
Copy link
Member

dblock commented Mar 1, 2022

Thanks for the PR @piyushdaftary and for your thorough review @sohami @AmiStrn! Looking forward to the next iteration.

@piyushdaftary Do fix DCO when you amend (git commit -s), please.

@opensearch-ci-bot
Copy link
Collaborator

✅   Gradle Check success 9a48605
Log 5488

Reports 5488

@dreamer-89
Copy link
Member

@piyushdaftary: Can you please fix the DCO check and address review comments.

@piyushdaftary piyushdaftary requested a review from reta as a code owner June 30, 2022 08:40
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

github-actions bot commented Jul 7, 2022

Gradle Check (Jenkins) Run Completed with:

@piyushdaftary piyushdaftary changed the base branch from main to 2.1 July 7, 2022 16:53
@piyushdaftary piyushdaftary changed the base branch from 2.1 to main July 7, 2022 16:54
@piyushdaftary
Copy link
Contributor Author

Unable to update the PR. Closing this PR and creating new one : #3796

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants