Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use object_store:BufWriter to replace put_multipart #9648

Merged
merged 5 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.9.0", default-features = false }
object_store = { version = "0.9.1", default-features = false }
parking_lot = "0.12"
parquet = { version = "50.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
use object_store::buffered::BufWriter;
use tokio::io::AsyncWrite;
#[cfg(feature = "compression")]
use tokio_util::io::{ReaderStream, StreamReader};
Expand Down Expand Up @@ -152,7 +153,7 @@ impl FileCompressionType {
/// according to this `FileCompressionType`.
pub fn convert_async_writer(
&self,
w: Box<dyn AsyncWrite + Send + Unpin>,
w: BufWriter,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking whether it's OK to change the param type here cause it's public, but keeping Box<dyn AsyncWrite + Send + Unpin> makes the type incompatible. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on if this code is always used with object_store (aka if DataFusion code always writes output using the object_store API). If this is the case, then switching to BufWriter here makes sense to me

BTW I think we need to update the comments on this function to match the new implementation

) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
Expand All @@ -169,7 +170,7 @@ impl FileCompressionType {
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => w,
UNCOMPRESSED => Box::new(w),
})
}

Expand Down
19 changes: 7 additions & 12 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, AbortableWrite, SharedBuffer};
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
Expand Down Expand Up @@ -56,6 +56,7 @@ use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use object_store::buffered::BufWriter;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn,
Expand Down Expand Up @@ -616,20 +617,14 @@ impl ParquetSink {
location: &Path,
object_store: Arc<dyn ObjectStore>,
parquet_props: WriterProperties,
) -> Result<
AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send + Unpin>>,
> {
let (_, multipart_writer) = object_store
.put_multipart(location)
.await
.map_err(DataFusionError::ObjectStore)?;
) -> Result<AsyncArrowWriter<BufWriter>> {
let buf_writer = BufWriter::new(object_store, location.clone());
let writer = AsyncArrowWriter::try_new(
multipart_writer,
buf_writer,
self.get_writer_schema(),
PARQUET_WRITER_BUFFER_SIZE,
Some(parquet_props),
)?;

Ok(writer)
}

Expand Down Expand Up @@ -947,7 +942,7 @@ async fn concatenate_parallel_row_groups(
mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
) -> Result<FileMetaData> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

Expand Down Expand Up @@ -989,7 +984,7 @@ async fn concatenate_parallel_row_groups(
/// task then stitches these independent RowGroups together and streams this large
/// single parquet file to an ObjectStore in multiple parts.
async fn output_single_parquet_file_parallelized(
object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
data: Receiver<RecordBatch>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
Expand Down
96 changes: 7 additions & 89 deletions datafusion/core/src/datasource/file_format/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@
//! Module containing helper methods/traits related to enabling
//! write support for the various file formats

use std::io::{Error, Write};
use std::pin::Pin;
use std::io::Write;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::error::Result;

use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;

use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::{MultipartId, ObjectStore};
use object_store::ObjectStore;
use tokio::io::AsyncWrite;

pub(crate) mod demux;
Expand Down Expand Up @@ -69,79 +66,6 @@ impl Write for SharedBuffer {
}
}

/// Stores data needed during abortion of MultiPart writers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to say that the implications of removing AbortableWrite is that if certain (larger) writes to object store fail / abort for some reason, "garbage" (unreferenced partial uploads) may be left around indefinitely on the provider?

While I understand that some object stores (maybe all) can be configured to automatically clean up such parts, I think reverting the "try to cleanup on failure" behavior is worth reconsidering.

I think I could be convinced with an argument like "the software can't ensure clean up anyways (for example, if it is SIGKILLed) for some reason, and thus we don't even try to clean up in paths we could", but if we go that route I think we should explicitly document the behavior and rationale in comments somewhere

I think @metesynnada or @mustafasrepo originally added this code (though I may be wrong) so perhaps they have some perspective to share

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think according to #9648 (comment), 'garbage' cleanup will be only on cloud provider if removing AbortableWrite 🤔, also @devinjdangelo , is it right?

Copy link
Contributor

@devinjdangelo devinjdangelo Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to say that the implications of removing AbortableWrite is that if certain (larger) writes to object store fail / abort for some reason, "garbage" (unreferenced partial uploads) may be left around indefinitely on the provider?

Yes. I have mixed feelings about removing any attempt to clean up on failure.

I think I could be convinced with an argument like "the software can't ensure clean up anyways (for example, if it is SIGKILLed) for some reason, and thus we don't even try to clean up in paths we could", but if we go that route I think we should explicitly document the behavior and rationale in comments somewhere

This argument is valid. A hardware/network fault will prevent any cleanup code we write from working, so to truly protect against partial writes would require logic outside of DataFusion's process (e.g. on the cloud service provider side).

On the other hand, this change may be annoying for simple failures when writing to a local file system. Encountering any execution error will leave dangling files when before they often could be cleaned up.

I think this is a case where one will draw different conclusions depending on if they are imagining an individual user of something like datafusion-cli vs. a production database system implemented on top DataFusion. The latter user will have little use for our attempts at clean up (they will need much better anyway), but the former may appreciate it.

Copy link
Contributor

@tustvold tustvold Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local file system automatically cleans up on drop, or at least makes a best effort to do so. FWIW this same mechanism is used for ALL uploads, even the non-multipart ones so as to provide atomicity. Given nobody has complained about this, I suspect it is doing a fairly good job

I am not aware of a cloud provider that provides multipart uploads without some automated way to reap aborted uploads after a given time

Copy link
Contributor

@alamb alamb Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I vote we leave the code as is (no attempt to explicitly abort in write) and add a note in the documentation. If it turns out this is an important behavior, we can add it back in

@yyy1000 can you handle adding the note in the documentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I added it around create_writer function, would it be enough?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable to me -- thank you

#[derive(Clone)]
pub(crate) struct MultiPart {
/// A shared reference to the object store
store: Arc<dyn ObjectStore>,
multipart_id: MultipartId,
location: Path,
}

impl MultiPart {
/// Create a new `MultiPart`
pub fn new(
store: Arc<dyn ObjectStore>,
multipart_id: MultipartId,
location: Path,
) -> Self {
Self {
store,
multipart_id,
location,
}
}
}

/// A wrapper struct with abort method and writer
pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
writer: W,
multipart: MultiPart,
}

impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
/// Create a new `AbortableWrite` instance with the given writer, and write mode.
pub(crate) fn new(writer: W, multipart: MultiPart) -> Self {
Self { writer, multipart }
}

/// handling of abort for different write modes
pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
let multi = self.multipart.clone();
Ok(Box::pin(async move {
multi
.store
.abort_multipart(&multi.location, &multi.multipart_id)
.await
.map_err(DataFusionError::ObjectStore)
}))
}
}

impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, Error>> {
Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
Pin::new(&mut self.get_mut().writer).poll_flush(cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also removed struct MultiPart since they're also not used anymore.

/// A trait that defines the methods required for a RecordBatch serializer.
pub trait BatchSerializer: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
Expand All @@ -150,19 +74,13 @@ pub trait BatchSerializer: Sync + Send {
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}

/// Returns an [`AbortableWrite`] which writes to the given object store location
/// Returns an [`AsyncWrite`] which writes to the given object store location
/// with the specified compression
pub(crate) async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let (multipart_id, writer) = object_store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For anyone else following along, the BufWriter internally does a multi-part put when appropriate

.put_multipart(location)
.await
.map_err(DataFusionError::ObjectStore)?;
Ok(AbortableWrite::new(
file_compression_type.convert_async_writer(writer)?,
MultiPart::new(object_store, multipart_id, location.clone()),
))
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
let buf_writer = BufWriter::new(object_store, location.clone());
Ok(file_compression_type.convert_async_writer(buf_writer)?)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think create_writer could still be saved cause it create the writer with compression?

}
18 changes: 4 additions & 14 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use std::sync::Arc;

use super::demux::start_demuxer_task;
use super::{create_writer, AbortableWrite, BatchSerializer};
use super::{create_writer, BatchSerializer};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
Expand All @@ -39,7 +39,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver};
use tokio::task::JoinSet;

type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;

/// Serializes a single data stream in parallel and writes to an ObjectStore
Expand All @@ -49,7 +49,7 @@ type SerializerType = Arc<dyn BatchSerializer>;
pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes), DataFusionError>>>(100);
Expand Down Expand Up @@ -173,19 +173,9 @@ pub(crate) async fn stateless_serialize_and_write_files(

// Finalize or abort writers as appropriate
for mut writer in finished_writers.into_iter() {
match any_errors {
true => {
let abort_result = writer.abort_writer();
if abort_result.is_err() {
any_abort_errors = true;
}
}
false => {
writer.shutdown()
writer.shutdown()
.await
.map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know whether it's proper to let all just shutdown here. But I think according to #9648 (comment), it's OK? 👀

}
}
}

if any_errors {
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use futures::{ready, StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -471,7 +472,7 @@ pub async fn plan_to_csv(

let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
let mut buf_writer = BufWriter::new(storeref, file.clone());
let mut buffer = Vec::with_capacity(1024);
//only write headers on first iteration
let mut write_headers = true;
Expand All @@ -481,15 +482,12 @@ pub async fn plan_to_csv(
.build(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
multipart_writer.write_all(&buffer).await?;
buf_writer.write_all(&buffer).await?;
buffer.clear();
//prevent writing headers more than once
write_headers = false;
}
multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
buf_writer.shutdown().await.map_err(DataFusionError::from)
});
}

Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use futures::{ready, StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{self, GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -338,21 +339,18 @@ pub async fn plan_to_json(

let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
let mut buf_writer = BufWriter::new(storeref, file.clone());

let mut buffer = Vec::with_capacity(1024);
while let Some(batch) = stream.next().await.transpose()? {
let mut writer = json::LineDelimitedWriter::new(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
multipart_writer.write_all(&buffer).await?;
buf_writer.write_all(&buffer).await?;
buffer.clear();
}

multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
buf_writer.shutdown().await.map_err(DataFusionError::from)
});
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
Expand Down Expand Up @@ -698,11 +699,11 @@ pub async fn plan_to_parquet(
let propclone = writer_properties.clone();

let storeref = store.clone();
let (_, multipart_writer) = storeref.put_multipart(&file).await?;
let buf_writer = BufWriter::new(storeref, file.clone());
let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let mut writer = AsyncArrowWriter::try_new(
multipart_writer,
buf_writer,
plan.schema(),
10485760,
propclone,
Expand Down
Loading