Skip to content

Commit

Permalink
Use object_store:BufWriter to replace put_multipart (apache#9648)
Browse files Browse the repository at this point in the history
* feat: use BufWriter to replace put_multipart

* feat: remove AbortableWrite

* fix clippy

* fix: add doc comment
  • Loading branch information
yyy1000 authored and tillrohrmann committed Jul 25, 2024
1 parent e58446b commit 9d292b0
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 166 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
num_cpus = "1.13.0"
object_store = { version = "0.9.0", default-features = false }
object_store = { version = "0.9.1", default-features = false }
parking_lot = "0.12"
parquet = { version = "50.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
use std::str::FromStr;
use object_store::buffered::BufWriter;
use tokio::io::AsyncWrite;
#[cfg(feature = "compression")]
use tokio_util::io::{ReaderStream, StreamReader};
Expand Down Expand Up @@ -140,11 +141,11 @@ impl FileCompressionType {
})
}

/// Wrap the given `AsyncWrite` so that it performs compressed writes
/// Wrap the given `BufWriter` so that it performs compressed writes
/// according to this `FileCompressionType`.
pub fn convert_async_writer(
&self,
w: Box<dyn AsyncWrite + Send + Unpin>,
w: BufWriter,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
Expand All @@ -161,7 +162,7 @@ impl FileCompressionType {
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => w,
UNCOMPRESSED => Box::new(w),
})
}

Expand Down
75 changes: 38 additions & 37 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,18 @@
//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions

use arrow_array::RecordBatch;
use async_trait::async_trait;
use datafusion_common::stats::Precision;
use datafusion_physical_plan::metrics::MetricsSet;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn,
};
use parquet::file::writer::SerializedFileWriter;
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::{JoinHandle, JoinSet};

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, Schema};
use bytes::{BufMut, BytesMut};
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;

use super::write::demux::start_demuxer_task;
use super::write::{create_writer, AbortableWrite, SharedBuffer};
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
Expand All @@ -77,6 +51,38 @@ use crate::physical_plan::{

/// Size of the buffer for [`AsyncArrowWriter`].
const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760;
use datafusion_common::stats::Precision;
use datafusion_common::{
exec_err, not_impl_err, DataFusionError, FileType,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use object_store::buffered::BufWriter;
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn,
};
use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
};
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::{JoinHandle, JoinSet};

use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
/// will grow beyond the set value if needed.
Expand Down Expand Up @@ -686,15 +692,10 @@ impl ParquetSink {
location: &Path,
object_store: Arc<dyn ObjectStore>,
parquet_props: WriterProperties,
) -> Result<
AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send + Unpin>>,
> {
let (_, multipart_writer) = object_store
.put_multipart(location)
.await
.map_err(DataFusionError::ObjectStore)?;
) -> Result<AsyncArrowWriter<BufWriter>> {
let buf_writer = BufWriter::new(object_store, location.clone());
let writer = AsyncArrowWriter::try_new(
multipart_writer,
buf_writer,
self.get_writer_schema(),
PARQUET_WRITER_BUFFER_SIZE,
Some(parquet_props),
Expand Down Expand Up @@ -1018,7 +1019,7 @@ async fn concatenate_parallel_row_groups(
mut serialize_rx: Receiver<JoinHandle<RBStreamSerializeResult>>,
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
) -> Result<usize> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

Expand Down Expand Up @@ -1074,7 +1075,7 @@ async fn concatenate_parallel_row_groups(
/// task then stitches these independent RowGroups together and streams this large
/// single parquet file to an ObjectStore in multiple parts.
async fn output_single_parquet_file_parallelized(
object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
data: Receiver<RecordBatch>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
Expand Down
100 changes: 10 additions & 90 deletions datafusion/core/src/datasource/file_format/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@
//! Module containing helper methods/traits related to enabling
//! write support for the various file formats

use std::io::{Error, Write};
use std::pin::Pin;
use std::io::Write;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::error::Result;

use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;

use bytes::Bytes;
use futures::future::BoxFuture;
use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::{MultipartId, ObjectStore};
use object_store::ObjectStore;
use tokio::io::AsyncWrite;

pub(crate) mod demux;
Expand Down Expand Up @@ -69,79 +66,6 @@ impl Write for SharedBuffer {
}
}

/// Stores data needed during abortion of MultiPart writers
#[derive(Clone)]
pub(crate) struct MultiPart {
/// A shared reference to the object store
store: Arc<dyn ObjectStore>,
multipart_id: MultipartId,
location: Path,
}

impl MultiPart {
/// Create a new `MultiPart`
pub fn new(
store: Arc<dyn ObjectStore>,
multipart_id: MultipartId,
location: Path,
) -> Self {
Self {
store,
multipart_id,
location,
}
}
}

/// A wrapper struct with abort method and writer
pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
writer: W,
multipart: MultiPart,
}

impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
/// Create a new `AbortableWrite` instance with the given writer, and write mode.
pub(crate) fn new(writer: W, multipart: MultiPart) -> Self {
Self { writer, multipart }
}

/// handling of abort for different write modes
pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
let multi = self.multipart.clone();
Ok(Box::pin(async move {
multi
.store
.abort_multipart(&multi.location, &multi.multipart_id)
.await
.map_err(DataFusionError::ObjectStore)
}))
}
}

impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, Error>> {
Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
Pin::new(&mut self.get_mut().writer).poll_flush(cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Error>> {
Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
}
}

/// A trait that defines the methods required for a RecordBatch serializer.
pub trait BatchSerializer: Sync + Send {
/// Asynchronously serializes a `RecordBatch` and returns the serialized bytes.
Expand All @@ -150,19 +74,15 @@ pub trait BatchSerializer: Sync + Send {
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}

/// Returns an [`AbortableWrite`] which writes to the given object store location
/// with the specified compression
/// Returns an [`AsyncWrite`] which writes to the given object store location
/// with the specified compression.
/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure.
/// Users can configure automatic cleanup with their cloud provider.
pub(crate) async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let (multipart_id, writer) = object_store
.put_multipart(location)
.await
.map_err(DataFusionError::ObjectStore)?;
Ok(AbortableWrite::new(
file_compression_type.convert_async_writer(writer)?,
MultiPart::new(object_store, multipart_id, location.clone()),
))
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
let buf_writer = BufWriter::new(object_store, location.clone());
file_compression_type.convert_async_writer(buf_writer)
}
18 changes: 4 additions & 14 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use std::sync::Arc;

use super::demux::start_demuxer_task;
use super::{create_writer, AbortableWrite, BatchSerializer};
use super::{create_writer, BatchSerializer};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
Expand All @@ -38,7 +38,7 @@ use tokio::sync::mpsc::{self, Receiver};
use tokio::task::{JoinHandle, JoinSet};
use tokio::try_join;

type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;

/// Serializes a single data stream in parallel and writes to an ObjectStore
Expand All @@ -48,7 +48,7 @@ type SerializerType = Arc<dyn BatchSerializer>;
pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<JoinHandle<Result<(usize, Bytes), DataFusionError>>>(100);
Expand Down Expand Up @@ -172,19 +172,9 @@ pub(crate) async fn stateless_serialize_and_write_files(

// Finalize or abort writers as appropriate
for mut writer in finished_writers.into_iter() {
match any_errors {
true => {
let abort_result = writer.abort_writer();
if abort_result.is_err() {
any_abort_errors = true;
}
}
false => {
writer.shutdown()
writer.shutdown()
.await
.map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?;
}
}
}

if any_errors {
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use bytes::{Buf, Bytes};
use datafusion_common::config::ConfigOptions;
use futures::{ready, StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -460,7 +461,7 @@ pub async fn plan_to_csv(

let mut stream = plan.execute(i, task_ctx.clone())?;
join_set.spawn(async move {
let (_, mut multipart_writer) = storeref.put_multipart(&file).await?;
let mut buf_writer = BufWriter::new(storeref, file.clone());
let mut buffer = Vec::with_capacity(1024);
//only write headers on first iteration
let mut write_headers = true;
Expand All @@ -470,15 +471,12 @@ pub async fn plan_to_csv(
.build(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
multipart_writer.write_all(&buffer).await?;
buf_writer.write_all(&buffer).await?;
buffer.clear();
//prevent writing headers more than once
write_headers = false;
}
multipart_writer
.shutdown()
.await
.map_err(DataFusionError::from)
buf_writer.shutdown().await.map_err(DataFusionError::from)
});
}

Expand Down
Loading

0 comments on commit 9d292b0

Please sign in to comment.