diff --git a/Cargo.toml b/Cargo.toml index cd88e18fe17c..ce02f176cd95 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ indexmap = "2.0.0" itertools = "0.12" log = "^0.4" num_cpus = "1.13.0" -object_store = { version = "0.9.0", default-features = false } +object_store = { version = "0.9.1", default-features = false } parking_lot = "0.12" parquet = { version = "50.0.0", default-features = false, features = ["arrow", "async", "object_store"] } rand = "0.8" diff --git a/datafusion/core/src/datasource/file_format/file_compression_type.rs b/datafusion/core/src/datasource/file_format/file_compression_type.rs index 3dac7c293050..d05c35258477 100644 --- a/datafusion/core/src/datasource/file_format/file_compression_type.rs +++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs @@ -40,6 +40,7 @@ use futures::StreamExt; #[cfg(feature = "compression")] use futures::TryStreamExt; use std::str::FromStr; +use object_store::buffered::BufWriter; use tokio::io::AsyncWrite; #[cfg(feature = "compression")] use tokio_util::io::{ReaderStream, StreamReader}; @@ -140,11 +141,11 @@ impl FileCompressionType { }) } - /// Wrap the given `AsyncWrite` so that it performs compressed writes + /// Wrap the given `BufWriter` so that it performs compressed writes /// according to this `FileCompressionType`. pub fn convert_async_writer( &self, - w: Box, + w: BufWriter, ) -> Result> { Ok(match self.variant { #[cfg(feature = "compression")] @@ -161,7 +162,7 @@ impl FileCompressionType { "Compression feature is not enabled".to_owned(), )) } - UNCOMPRESSED => w, + UNCOMPRESSED => Box::new(w), }) } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 9729bfa163af..72dd57e14ccd 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -18,44 +18,18 @@ //! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions use arrow_array::RecordBatch; -use async_trait::async_trait; -use datafusion_common::stats::Precision; -use datafusion_physical_plan::metrics::MetricsSet; -use parquet::arrow::arrow_writer::{ - compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, - ArrowLeafColumn, -}; -use parquet::file::writer::SerializedFileWriter; use std::any::Any; use std::fmt; use std::fmt::Debug; use std::sync::Arc; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::task::{JoinHandle, JoinSet}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; -use bytes::{BufMut, BytesMut}; -use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType}; -use datafusion_execution::TaskContext; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; -use futures::{StreamExt, TryStreamExt}; -use hashbrown::HashMap; -use object_store::path::Path; -use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::{ - arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, -}; -use parquet::file::footer::{decode_footer, decode_metadata}; -use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::WriterProperties; -use parquet::file::statistics::Statistics as ParquetStatistics; use super::write::demux::start_demuxer_task; -use super::write::{create_writer, AbortableWrite, SharedBuffer}; +use super::write::{create_writer, SharedBuffer}; use super::{FileFormat, FileScanConfig}; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, @@ -77,6 +51,38 @@ use crate::physical_plan::{ /// Size of the buffer for [`AsyncArrowWriter`]. const PARQUET_WRITER_BUFFER_SIZE: usize = 10485760; +use datafusion_common::stats::Precision; +use datafusion_common::{ + exec_err, not_impl_err, DataFusionError, FileType, +}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_plan::metrics::MetricsSet; + +use async_trait::async_trait; +use bytes::{BufMut, BytesMut}; +use object_store::buffered::BufWriter; +use parquet::arrow::arrow_writer::{ + compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, + ArrowLeafColumn, +}; +use parquet::arrow::{ + arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, +}; +use parquet::file::footer::{decode_footer, decode_metadata}; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; +use parquet::file::statistics::Statistics as ParquetStatistics; +use parquet::file::writer::SerializedFileWriter; +use parquet::format::FileMetaData; +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::task::{JoinHandle, JoinSet}; + +use futures::{StreamExt, TryStreamExt}; +use hashbrown::HashMap; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; /// Initial writing buffer size. Note this is just a size hint for efficiency. It /// will grow beyond the set value if needed. @@ -686,15 +692,10 @@ impl ParquetSink { location: &Path, object_store: Arc, parquet_props: WriterProperties, - ) -> Result< - AsyncArrowWriter>, - > { - let (_, multipart_writer) = object_store - .put_multipart(location) - .await - .map_err(DataFusionError::ObjectStore)?; + ) -> Result> { + let buf_writer = BufWriter::new(object_store, location.clone()); let writer = AsyncArrowWriter::try_new( - multipart_writer, + buf_writer, self.get_writer_schema(), PARQUET_WRITER_BUFFER_SIZE, Some(parquet_props), @@ -1018,7 +1019,7 @@ async fn concatenate_parallel_row_groups( mut serialize_rx: Receiver>, schema: Arc, writer_props: Arc, - mut object_store_writer: AbortableWrite>, + mut object_store_writer: Box, ) -> Result { let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES); @@ -1074,7 +1075,7 @@ async fn concatenate_parallel_row_groups( /// task then stitches these independent RowGroups together and streams this large /// single parquet file to an ObjectStore in multiple parts. async fn output_single_parquet_file_parallelized( - object_store_writer: AbortableWrite>, + object_store_writer: Box, data: Receiver, output_schema: Arc, parquet_props: &WriterProperties, diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs b/datafusion/core/src/datasource/file_format/write/mod.rs index 410a32a19cc1..42115fc7b93f 100644 --- a/datafusion/core/src/datasource/file_format/write/mod.rs +++ b/datafusion/core/src/datasource/file_format/write/mod.rs @@ -18,21 +18,18 @@ //! Module containing helper methods/traits related to enabling //! write support for the various file formats -use std::io::{Error, Write}; -use std::pin::Pin; +use std::io::Write; use std::sync::Arc; -use std::task::{Context, Poll}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::error::Result; use arrow_array::RecordBatch; -use datafusion_common::DataFusionError; use bytes::Bytes; -use futures::future::BoxFuture; +use object_store::buffered::BufWriter; use object_store::path::Path; -use object_store::{MultipartId, ObjectStore}; +use object_store::ObjectStore; use tokio::io::AsyncWrite; pub(crate) mod demux; @@ -69,79 +66,6 @@ impl Write for SharedBuffer { } } -/// Stores data needed during abortion of MultiPart writers -#[derive(Clone)] -pub(crate) struct MultiPart { - /// A shared reference to the object store - store: Arc, - multipart_id: MultipartId, - location: Path, -} - -impl MultiPart { - /// Create a new `MultiPart` - pub fn new( - store: Arc, - multipart_id: MultipartId, - location: Path, - ) -> Self { - Self { - store, - multipart_id, - location, - } - } -} - -/// A wrapper struct with abort method and writer -pub(crate) struct AbortableWrite { - writer: W, - multipart: MultiPart, -} - -impl AbortableWrite { - /// Create a new `AbortableWrite` instance with the given writer, and write mode. - pub(crate) fn new(writer: W, multipart: MultiPart) -> Self { - Self { writer, multipart } - } - - /// handling of abort for different write modes - pub(crate) fn abort_writer(&self) -> Result>> { - let multi = self.multipart.clone(); - Ok(Box::pin(async move { - multi - .store - .abort_multipart(&multi.location, &multi.multipart_id) - .await - .map_err(DataFusionError::ObjectStore) - })) - } -} - -impl AsyncWrite for AbortableWrite { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) - } -} - /// A trait that defines the methods required for a RecordBatch serializer. pub trait BatchSerializer: Sync + Send { /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. @@ -150,19 +74,15 @@ pub trait BatchSerializer: Sync + Send { fn serialize(&self, batch: RecordBatch, initial: bool) -> Result; } -/// Returns an [`AbortableWrite`] which writes to the given object store location -/// with the specified compression +/// Returns an [`AsyncWrite`] which writes to the given object store location +/// with the specified compression. +/// We drop the `AbortableWrite` struct and the writer will not try to cleanup on failure. +/// Users can configure automatic cleanup with their cloud provider. pub(crate) async fn create_writer( file_compression_type: FileCompressionType, location: &Path, object_store: Arc, -) -> Result>> { - let (multipart_id, writer) = object_store - .put_multipart(location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - MultiPart::new(object_store, multipart_id, location.clone()), - )) +) -> Result> { + let buf_writer = BufWriter::new(object_store, location.clone()); + file_compression_type.convert_async_writer(buf_writer) } diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 106b4e0d50e5..5937b6694ebb 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use super::demux::start_demuxer_task; -use super::{create_writer, AbortableWrite, BatchSerializer}; +use super::{create_writer, BatchSerializer}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::FileSinkConfig; use crate::error::Result; @@ -38,7 +38,7 @@ use tokio::sync::mpsc::{self, Receiver}; use tokio::task::{JoinHandle, JoinSet}; use tokio::try_join; -type WriterType = AbortableWrite>; +type WriterType = Box; type SerializerType = Arc; /// Serializes a single data stream in parallel and writes to an ObjectStore @@ -48,7 +48,7 @@ type SerializerType = Arc; pub(crate) async fn serialize_rb_stream_to_object_store( mut data_rx: Receiver, serializer: Arc, - mut writer: AbortableWrite>, + mut writer: WriterType, ) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> { let (tx, mut rx) = mpsc::channel::>>(100); @@ -172,19 +172,9 @@ pub(crate) async fn stateless_serialize_and_write_files( // Finalize or abort writers as appropriate for mut writer in finished_writers.into_iter() { - match any_errors { - true => { - let abort_result = writer.abort_writer(); - if abort_result.is_err() { - any_abort_errors = true; - } - } - false => { - writer.shutdown() + writer.shutdown() .await .map_err(|_| internal_datafusion_err!("Error encountered while finalizing writes! Partial results may have been written to ObjectStore!"))?; - } - } } if any_errors { diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index a818c572f7f5..9911e9e86853 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -45,6 +45,7 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use bytes::{Buf, Bytes}; use datafusion_common::config::ConfigOptions; use futures::{ready, StreamExt, TryStreamExt}; +use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; @@ -460,7 +461,7 @@ pub async fn plan_to_csv( let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; + let mut buf_writer = BufWriter::new(storeref, file.clone()); let mut buffer = Vec::with_capacity(1024); //only write headers on first iteration let mut write_headers = true; @@ -470,15 +471,12 @@ pub async fn plan_to_csv( .build(buffer); writer.write(&batch)?; buffer = writer.into_inner(); - multipart_writer.write_all(&buffer).await?; + buf_writer.write_all(&buffer).await?; buffer.clear(); //prevent writing headers more than once write_headers = false; } - multipart_writer - .shutdown() - .await - .map_err(DataFusionError::from) + buf_writer.shutdown().await.map_err(DataFusionError::from) }); } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index a8a371fed91e..d7b571c3cba1 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -44,8 +44,8 @@ use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use bytes::{Buf, Bytes}; use futures::{ready, StreamExt, TryStreamExt}; -use object_store::{self, GetOptions}; -use object_store::{GetResultPayload, ObjectStore}; +use object_store::buffered::BufWriter; +use object_store::{self, GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; @@ -329,21 +329,18 @@ pub async fn plan_to_json( let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; + let mut buf_writer = BufWriter::new(storeref, file.clone()); let mut buffer = Vec::with_capacity(1024); while let Some(batch) = stream.next().await.transpose()? { let mut writer = json::LineDelimitedWriter::new(buffer); writer.write(&batch)?; buffer = writer.into_inner(); - multipart_writer.write_all(&buffer).await?; + buf_writer.write_all(&buffer).await?; buffer.clear(); } - multipart_writer - .shutdown() - .await - .map_err(DataFusionError::from) + buf_writer.shutdown().await.map_err(DataFusionError::from) }); } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index c2689cfb10a6..bf1509b390d4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -53,6 +53,7 @@ use futures::future::BoxFuture; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; +use object_store::buffered::BufWriter; use object_store::path::Path; use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; @@ -698,15 +699,11 @@ pub async fn plan_to_parquet( let propclone = writer_properties.clone(); let storeref = store.clone(); - let (_, multipart_writer) = storeref.put_multipart(&file).await?; + let buf_writer = BufWriter::new(storeref, file.clone()); let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { - let mut writer = AsyncArrowWriter::try_new( - multipart_writer, - plan.schema(), - 10485760, - propclone, - )?; + let mut writer = + AsyncArrowWriter::try_new(buf_writer, plan.schema(), 10485760, propclone)?; while let Some(next_batch) = stream.next().await { let batch = next_batch?; writer.write(&batch).await?;