Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support bulk deletes in object_store #4060

Merged
merged 11 commits into from
May 27, 2023

Conversation

wjones127
Copy link
Member

Which issue does this PR close?

Closes #2615.

Rationale for this change

Provides methods for quickly deleting large numbers of objects, such as when dropping a Parquet table.

What changes are included in this PR?

Introduces two new methods on ObjectStore, each with a default implementation. One provides the bulk deletion method. Another provides the number of objects that can be deleted in one underlying call. The latter can be used if the user wants to control the parallelism themselves or if they want to implement progress tracking.

Are there any user-facing changes?

Adds new APIs, with inline documentation.

@github-actions github-actions bot added the object-store Object Store Interface label Apr 12, 2023
/// this method to customize the parallelism or provide a progress indicator.
///
/// This method may create multiple threads to perform the deletions in parallel.
async fn delete_all(&self, locations: Vec<Path>) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making this instead something like

Suggested change
async fn delete_all(&self, locations: Vec<Path>) -> Result<()> {
async fn delete_all(&self, locations: Vec<Path>) -> Result<BoxStream<'_, Result<()>>> {

This would allow for granular error and progress reporting? One could even go so far as to make the input also a BoxStream 🤔

@wjones127
Copy link
Member Author

I think to implement Azure blob store and GCS, we'll need to do work upstream:

seanmonstar/reqwest#1241

They both used the mixed/multipart request type and to do proper error handling we'll want to have robust implementations of it. So I'm going to wait on those two.

@wjones127 wjones127 marked this pull request as ready for review May 21, 2023 03:05
@wjones127
Copy link
Member Author

✅ I've run the integration tests against AWS S3 and Cloudflare R2. Both pass 👍 (although Cloudflare fails the get_opts test right now).

@wjones127 wjones127 requested a review from tustvold May 21, 2023 03:23
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this, I left some comments

@@ -129,6 +153,43 @@ struct MultipartPart {
part_number: usize,
}

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
struct BatchDeleteResponse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we seem to have enabled a feature to get this to deserialize correctly, could we perhaps get a basic test of this deserialization logic - i.e. given a payload with mixed success and failures, it deserialized correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


let inner_body = paths
.iter()
.map(|path| format!("<Object><Key>{}</Key></Object>", path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might run into weirdness due to XML escaping, could we use quick-xml to serialize this payload instead of string formatting?

I think Path allows '&' for example, a test of this would be superb...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test with characters special to XML

object_store/src/lib.rs Outdated Show resolved Hide resolved
object_store/src/util.rs Outdated Show resolved Hide resolved
@@ -170,6 +178,22 @@ fn merge_ranges(
ret
}

/// Common implementation for delete_all
#[allow(dead_code)]
pub(crate) fn delete_all_helper<'a>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could follow the pattern in #4220 of using extension traits for this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I think the helper turned out not to be necessary, so I removed it entirely.

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Path>,
) -> BoxStream<'a, BoxFuture<'a, Result<Vec<Result<Path>>>>> {
Copy link
Contributor

@tustvold tustvold May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This signature whilst it does provide the maximum flexibility to the upstreams, is kind of obnoxious to use

What do you think of returning BoxStream<'a, Result<Path>> and letting the individual stores control the concurrency, much like we do for coalesce_ranges? This would have the advantage of letting them choose an appropriate value

It would also avoid overheads for stores that don't support bulk deletes, FWIW

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this signature is fine, but how would I implement limit store with it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither signature would really let you do this meaningfully, as neither exposes the granularity of the requests. I personally think this is fine, I suspect we may add request limiting to ClientOptions eventually to handle this

object_store/src/limit.rs Outdated Show resolved Hide resolved
Comment on lines 1581 to 1583
let paths = flatten_list_stream(storage, None).await.unwrap();

for f in &paths {
storage.delete(f).await.unwrap();
}
storage
.delete_stream(futures::stream::iter(paths).boxed())
Copy link
Contributor

@tustvold tustvold May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this use-case, I wonder if the input should be a fallible stream, what do you think? You could then feed the output of the list operation directly into the bulk delete API

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the API a little funky but that might be fine. In most of my downstream use cases, I'm probably passing in a Vec or iterator, so I'm already going to wrap in future::stream::iter. Having to add a .map(Ok) seems fine.

Copy link
Contributor

@tustvold tustvold May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the alternative would be to just pass in Vec<Path>, but I think if we're going to go the path of providing a streaming interface we should at least make it usable. I would be happy with either tbh, Vec<Path> does have the advantage of being simpler... Up to you 😄

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking really nice, thank you for sticking with this. I think this is good to go, but left one final thought on the signature that I'd be interested to here your take on

wjones127 and others added 2 commits May 23, 2023 07:47
Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
) -> BoxStream<'a, Result<Path>> {
locations
.chunks(1_000)
.map(move |locations| async {
let locations: Vec<Path> =
locations.into_iter().collect::<Result<_>>()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to use try_chunks, as it will short-circuit on the first error

@@ -1578,9 +1578,15 @@ mod tests {
}

async fn delete_fixtures(storage: &DynObjectStore) {
let paths = flatten_list_stream(storage, None).await.unwrap();
// let paths = flatten_list_stream(storage, None).await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// let paths = flatten_list_stream(storage, None).await.unwrap();

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking nice 😄

@alamb
Copy link
Contributor

alamb commented May 25, 2023

Shall we merge it?

@wjones127
Copy link
Member Author

I'll do some final clean ups later today, then it should be ready.

@wjones127 wjones127 requested a review from tustvold May 27, 2023 01:34
@tustvold tustvold merged commit 770e241 into apache:master May 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
object-store Object Store Interface
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Bulk delete support for object-store
3 participants