Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use object_store:BufWriter to replace put_multipart #9648

Merged
merged 5 commits into from
Mar 20, 2024

Conversation

yyy1000
Copy link
Contributor

@yyy1000 yyy1000 commented Mar 17, 2024

Which issue does this PR close?

Closes #9614 .

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Mar 17, 2024
@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 17, 2024

It seems that AbortableWrite needs MultiPart information. 🤔
https://github.com/apache/arrow-datafusion/blob/37253e57beb25f0f1a4412b75421a489c2cb3c6a/datafusion/core/src/datasource/file_format/write/mod.rs#L155-L167
I'm reading the code to see whether put_multipart here can be replaced.

@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 17, 2024

Close and reopen to rerun CI

@yyy1000 yyy1000 closed this Mar 17, 2024
@yyy1000 yyy1000 reopened this Mar 17, 2024
@devinjdangelo
Copy link
Contributor

devinjdangelo commented Mar 17, 2024

It seems that AbortableWrite needs MultiPart information. 🤔

https://github.com/apache/arrow-datafusion/blob/37253e57beb25f0f1a4412b75421a489c2cb3c6a/datafusion/core/src/datasource/file_format/write/mod.rs#L155-L167

I'm reading the code to see whether put_multipart here can be replaced.

We could drop the AbortableWrite construct entirely and no longer make a manual effort to abort on failure. The manual cleanup will not work in all cases anyway, and users can configure automatic cleanup with their cloud provider, e.g.: https://aws.amazon.com/blogs/aws-cloud-financial-management/discovering-and-deleting-incomplete-multipart-uploads-to-lower-amazon-s3-costs/.

I believe this is in line with what @tustvold is thinking with ongoing improvements of ObjectStore apache/arrow-rs#5500 (comment).

@devinjdangelo
Copy link
Contributor

Close and reopen to rerun CI

Another option is to push an empty commit.

git commit --allow-empty -m 'retry ci'

@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 17, 2024

Thanks for your review! @devinjdangelo
I will look into the code to see how to drop AbortableWrite. :)

@tustvold
Copy link
Contributor

I will look into the code to see how to drop AbortableWrite. :)

FWIW the multipart ID is exposed but it might take some more type plumbing. IMO removing AbortableWrite is probably a simpler path forward

Copy link
Contributor Author

@yyy1000 yyy1000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review! @devinjdangelo @tustvold
I updated the PR to address your comment, let me know whether it needs further improvement :)

))
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
let buf_writer = BufWriter::new(object_store, location.clone());
Ok(file_compression_type.convert_async_writer(buf_writer)?)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think create_writer could still be saved cause it create the writer with compression?

}
false => {
writer.shutdown()
writer.shutdown()
.await
.map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know whether it's proper to let all just shutdown here. But I think according to #9648 (comment), it's OK? 👀

Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also removed struct MultiPart since they're also not used anymore.

@@ -152,7 +153,7 @@ impl FileCompressionType {
/// according to this `FileCompressionType`.
pub fn convert_async_writer(
&self,
w: Box<dyn AsyncWrite + Send + Unpin>,
w: BufWriter,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking whether it's OK to change the param type here cause it's public, but keeping Box<dyn AsyncWrite + Send + Unpin> makes the type incompatible. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on if this code is always used with object_store (aka if DataFusion code always writes output using the object_store API). If this is the case, then switching to BufWriter here makes sense to me

BTW I think we need to update the comments on this function to match the new implementation

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much @yyy1000 -- this looks like a good improvement to me

I would like a few more opinions about the change in abort behavior, but as I understand the implications it seems a reasonable change to me.

With a few more updates to doc comments I think this PR will be ready to go in my view

cc @wiedld @tustvold

@@ -152,7 +153,7 @@ impl FileCompressionType {
/// according to this `FileCompressionType`.
pub fn convert_async_writer(
&self,
w: Box<dyn AsyncWrite + Send + Unpin>,
w: BufWriter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on if this code is always used with object_store (aka if DataFusion code always writes output using the object_store API). If this is the case, then switching to BufWriter here makes sense to me

BTW I think we need to update the comments on this function to match the new implementation

@@ -69,79 +66,6 @@ impl Write for SharedBuffer {
}
}

/// Stores data needed during abortion of MultiPart writers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to say that the implications of removing AbortableWrite is that if certain (larger) writes to object store fail / abort for some reason, "garbage" (unreferenced partial uploads) may be left around indefinitely on the provider?

While I understand that some object stores (maybe all) can be configured to automatically clean up such parts, I think reverting the "try to cleanup on failure" behavior is worth reconsidering.

I think I could be convinced with an argument like "the software can't ensure clean up anyways (for example, if it is SIGKILLed) for some reason, and thus we don't even try to clean up in paths we could", but if we go that route I think we should explicitly document the behavior and rationale in comments somewhere

I think @metesynnada or @mustafasrepo originally added this code (though I may be wrong) so perhaps they have some perspective to share

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think according to #9648 (comment), 'garbage' cleanup will be only on cloud provider if removing AbortableWrite 🤔, also @devinjdangelo , is it right?

Copy link
Contributor

@devinjdangelo devinjdangelo Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to say that the implications of removing AbortableWrite is that if certain (larger) writes to object store fail / abort for some reason, "garbage" (unreferenced partial uploads) may be left around indefinitely on the provider?

Yes. I have mixed feelings about removing any attempt to clean up on failure.

I think I could be convinced with an argument like "the software can't ensure clean up anyways (for example, if it is SIGKILLed) for some reason, and thus we don't even try to clean up in paths we could", but if we go that route I think we should explicitly document the behavior and rationale in comments somewhere

This argument is valid. A hardware/network fault will prevent any cleanup code we write from working, so to truly protect against partial writes would require logic outside of DataFusion's process (e.g. on the cloud service provider side).

On the other hand, this change may be annoying for simple failures when writing to a local file system. Encountering any execution error will leave dangling files when before they often could be cleaned up.

I think this is a case where one will draw different conclusions depending on if they are imagining an individual user of something like datafusion-cli vs. a production database system implemented on top DataFusion. The latter user will have little use for our attempts at clean up (they will need much better anyway), but the former may appreciate it.

Copy link
Contributor

@tustvold tustvold Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local file system automatically cleans up on drop, or at least makes a best effort to do so. FWIW this same mechanism is used for ALL uploads, even the non-multipart ones so as to provide atomicity. Given nobody has complained about this, I suspect it is doing a fairly good job

I am not aware of a cloud provider that provides multipart uploads without some automated way to reap aborted uploads after a given time

Copy link
Contributor

@alamb alamb Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I vote we leave the code as is (no attempt to explicitly abort in write) and add a note in the documentation. If it turns out this is an important behavior, we can add it back in

@yyy1000 can you handle adding the note in the documentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added it around create_writer function, would it be enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me -- thank you

/// with the specified compression
pub(crate) async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let (multipart_id, writer) = object_store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For anyone else following along, the BufWriter internally does a multi-part put when appropriate

@metesynnada
Copy link
Contributor

I can check this PR as well tomorrow.

@tustvold tustvold merged commit dbfb153 into apache:main Mar 20, 2024
24 checks passed
@yyy1000 yyy1000 deleted the BufWriter branch March 20, 2024 22:13
tillrohrmann pushed a commit to restatedev/datafusion that referenced this pull request Jul 25, 2024
* feat: use BufWriter to replace put_multipart

* feat: remove AbortableWrite

* fix clippy

* fix: add doc comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use object_store:BufWriter instead of put_multipart
5 participants