Skip to content

Commit

Permalink
Use object_store:BufWriter to replace put_multipart (#9648)
Browse files Browse the repository at this point in the history
* feat: use BufWriter to replace put_multipart

* feat: remove AbortableWrite

* fix clippy

* fix: add doc comment
  • Loading branch information
yyy1000 authored Mar 20, 2024
1 parent 7a0dd6f commit dbfb153
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 134 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.9.0", default-features = false }
object_store = { version = "0.9.1", default-features = false }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
use object_store::buffered::BufWriter;
use tokio::io::AsyncWrite;
#[cfg(feature = "compression")]
use tokio_util::io::{ReaderStream, StreamReader};
Expand Down Expand Up @@ -148,11 +149,11 @@ impl FileCompressionType {
})
}

/// Wrap the given `AsyncWrite` so that it performs compressed writes
/// Wrap the given `BufWriter` so that it performs compressed writes
/// according to this `FileCompressionType`.
pub fn convert_async_writer(
&self,
w: Box<dyn AsyncWrite + Send + Unpin>,
w: BufWriter,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
Expand All @@ -169,7 +170,7 @@ impl FileCompressionType {
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => w,
UNCOMPRESSED => Box::new(w),
})
}

Expand Down
19 changes: 7 additions & 12 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, AbortableWrite, SharedBuffer};
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
Expand Down Expand Up @@ -56,6 +56,7 @@ use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use object_store::buffered::BufWriter;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn,
Expand Down Expand Up @@ -613,19 +614,13 @@ impl ParquetSink {
location: &Path,
object_store: Arc<dyn ObjectStore>,
parquet_props: WriterProperties,
) -> Result<
AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send + Unpin>>,
> {
let (_, multipart_writer) = object_store
.put_multipart(location)
.await
.map_err(DataFusionError::ObjectStore)?;
) -> Result<AsyncArrowWriter<BufWriter>> {
let buf_writer = BufWriter::new(object_store, location.clone());
let writer = AsyncArrowWriter::try_new(
multipart_writer,
buf_writer,
self.get_writer_schema(),
Some(parquet_props),
)?;

Ok(writer)
}

Expand Down Expand Up @@ -943,7 +938,7 @@ async fn concatenate_parallel_row_groups(
mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
) -> Result<FileMetaData> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

Expand Down Expand Up @@ -985,7 +980,7 @@ async fn concatenate_parallel_row_groups(
/// task then stitches these independent RowGroups together and streams this large
/// single parquet file to an ObjectStore in multiple parts.
async fn output_single_parquet_file_parallelized(
object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
data: Receiver<RecordBatch>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
Expand Down
100 changes: 10 additions & 90 deletions datafusion/core/src/datasource/file_format/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@
//! Module containing helper methods/traits related to enabling
//! write support for the various file formats

use std::io::{Error, Write};
use std::pin::Pin;
use std::io::Write;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::error::Result;

use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;

use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::{MultipartId, ObjectStore};
use object_store::ObjectStore;
use tokio::io::AsyncWrite;

pub(crate) mod demux;
Expand Down Expand Up @@ -69,79 +66,6 @@ impl Write for SharedBuffer {
}
}

/// Stores data needed during abortion of MultiPart writers
#[derive(Clone)]
pub(crate) struct MultiPart {
/// A shared reference to the object store
store: Arc<dyn ObjectStore>,
multipart_id: MultipartId,
location: Path,
}

impl MultiPart {
/// Create a new `MultiPart`
pub fn new(
store: Arc<dyn ObjectStore>,
multipart_id: MultipartId,
location: Path,
) -> Self {
Self {
store,
multipart_id,
location,
}
}
}

/// A wrapper struct with abort method and writer
pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
writer: W,
multipart: MultiPart,
}

impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
/// Create a new `AbortableWrite` instance with the given writer, and write mode.
pub(crate) fn new(writer: W, multipart: MultiPart) -> Self {
Self { writer, multipart }
}

/// handling of abort for different write modes
pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
let multi = self.multipart.clone();
Ok(Box::pin(async move {
multi
.store
.abort_multipart(&multi.location, &multi.multipart_id)
.await
.map_err(DataFusionError::ObjectStore)
}))
}
}

impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, Error>> {
Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
Pin::new(&mut self.get_mut().writer).poll_flush(cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
}
}

/// A trait that defines the methods required for a RecordBatch serializer.
pub trait BatchSerializer: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
Expand All @@ -150,19 +74,15 @@ pub trait BatchSerializer: Sync + Send {
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}

/// Returns an [`AbortableWrite`] which writes to the given object store location
/// with the specified compression
/// Returns an [`AsyncWrite`] which writes to the given object store location
/// with the specified compression.
/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
/// Users can configure automatic cleanup with their cloud provider.
pub(crate) async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let (multipart_id, writer) = object_store
.put_multipart(location)
.await
.map_err(DataFusionError::ObjectStore)?;
Ok(AbortableWrite::new(
file_compression_type.convert_async_writer(writer)?,
MultiPart::new(object_store, multipart_id, location.clone()),
))
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
let buf_writer = BufWriter::new(object_store, location.clone());
file_compression_type.convert_async_writer(buf_writer)
}
18 changes: 4 additions & 14 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use std::sync::Arc;

use super::demux::start_demuxer_task;
use super::{create_writer, AbortableWrite, BatchSerializer};
use super::{create_writer, BatchSerializer};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
Expand All @@ -39,7 +39,7 @@ use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver};
use tokio::task::JoinSet;

type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;

/// Serializes a single data stream in parallel and writes to an ObjectStore
Expand All @@ -49,7 +49,7 @@ type SerializerType = Arc<dyn BatchSerializer>;
pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes), DataFusionError>>>(100);
Expand Down Expand Up @@ -173,19 +173,9 @@ pub(crate) async fn stateless_serialize_and_write_files(

// Finalize or abort writers as appropriate
for mut writer in finished_writers.into_iter() {
match any_errors {
true => {
let abort_result = writer.abort_writer();
if abort_result.is_err() {
any_abort_errors = true;
}
}
false => {
writer.shutdown()
writer.shutdown()
.await
.map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?;
}
}
}

if any_errors {
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use futures::{ready, StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -471,7 +472,7 @@ pub async fn plan_to_csv(

let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
let mut buf_writer = BufWriter::new(storeref, file.clone());
let mut buffer = Vec::with_capacity(1024);
//only write headers on first iteration
let mut write_headers = true;
Expand All @@ -481,15 +482,12 @@ pub async fn plan_to_csv(
.build(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
multipart_writer.write_all(&buffer).await?;
buf_writer.write_all(&buffer).await?;
buffer.clear();
//prevent writing headers more than once
write_headers = false;
}
multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
buf_writer.shutdown().await.map_err(DataFusionError::from)
});
}

Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use bytes::{Buf, Bytes};
use futures::{ready, StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{self, GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -338,21 +339,18 @@ pub async fn plan_to_json(

let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
let mut buf_writer = BufWriter::new(storeref, file.clone());

let mut buffer = Vec::with_capacity(1024);
while let Some(batch) = stream.next().await.transpose()? {
let mut writer = json::LineDelimitedWriter::new(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
multipart_writer.write_all(&buffer).await?;
buf_writer.write_all(&buffer).await?;
buffer.clear();
}

multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
buf_writer.shutdown().await.map_err(DataFusionError::from)
});
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
Expand Down Expand Up @@ -698,11 +699,11 @@ pub async fn plan_to_parquet(
let propclone = writer_properties.clone();

let storeref = store.clone();
let (_, multipart_writer) = storeref.put_multipart(&file).await?;
let buf_writer = BufWriter::new(storeref, file.clone());
let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let mut writer =
AsyncArrowWriter::try_new(multipart_writer, plan.schema(), propclone)?;
AsyncArrowWriter::try_new(buf_writer, plan.schema(), propclone)?;
while let Some(next_batch) = stream.next().await {
let batch = next_batch?;
writer.write(&batch).await?;
Expand Down

0 comments on commit dbfb153

Please sign in to comment.