diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 362a681e4f55..69d1c0089f1e 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -18,30 +18,41 @@ //! CSV format abstractions use std::any::Any; - use std::collections::HashSet; +use std::fmt; +use std::fmt::{Debug, Display}; use std::sync::Arc; +use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::{self, datatypes::SchemaRef}; -use async_trait::async_trait; -use bytes::{Buf, Bytes}; - +use arrow_array::RecordBatch; use datafusion_common::DataFusionError; - +use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; + +use async_trait::async_trait; +use bytes::{Buf, Bytes}; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; use super::FileFormat; use crate::datasource::file_format::file_type::FileCompressionType; -use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; +use crate::datasource::file_format::FileWriterMode; +use crate::datasource::file_format::{ + AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart, + DEFAULT_SCHEMA_INFER_MAX_RECORD, +}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; -use crate::physical_plan::ExecutionPlan; +use crate::physical_plan::file_format::{ + CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig, +}; +use crate::physical_plan::insert::{DataSink, InsertExec}; use crate::physical_plan::Statistics; +use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; /// The default file extension of csv files pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; @@ -220,6 +231,22 @@ impl FileFormat for CsvFormat { ); Ok(Arc::new(exec)) } + + async fn create_writer_physical_plan( + &self, + input: Arc, + _state: &SessionState, + conf: FileSinkConfig, + ) -> Result> { + let sink = Arc::new(CsvSink::new( + conf, + self.has_header, + self.delimiter, + self.file_compression_type.clone(), + )); + + Ok(Arc::new(InsertExec::new(input, sink)) as _) + } } impl CsvFormat { @@ -324,6 +351,243 @@ fn build_schema_helper(names: Vec, types: &[HashSet]) -> Schem Schema::new(fields) } +impl Default for CsvSerializer { + fn default() -> Self { + Self::new() + } +} + +/// Define a struct for serializing CSV records to a stream +pub struct CsvSerializer { + // CSV writer builder + builder: WriterBuilder, + // Inner buffer for avoiding reallocation + buffer: Vec, + // Flag to indicate whether there will be a header + header: bool, +} + +impl CsvSerializer { + /// Constructor for the CsvSerializer object + pub fn new() -> Self { + Self { + builder: WriterBuilder::new(), + header: true, + buffer: Vec::with_capacity(4096), + } + } + + /// Method for setting the CSV writer builder + pub fn with_builder(mut self, builder: WriterBuilder) -> Self { + self.builder = builder; + self + } + + /// Method for setting the CSV writer header status + pub fn with_header(mut self, header: bool) -> Self { + self.header = header; + self + } +} + +#[async_trait] +impl BatchSerializer for CsvSerializer { + async fn serialize(&mut self, batch: RecordBatch) -> Result { + let builder = self.builder.clone(); + let mut writer = builder.has_headers(self.header).build(&mut self.buffer); + writer.write(&batch)?; + drop(writer); + self.header = false; + Ok(Bytes::from(self.buffer.drain(..).collect::>())) + } +} + +async fn check_for_errors( + result: Result, + writers: &mut [AbortableWrite], +) -> Result { + match result { + Ok(value) => Ok(value), + Err(e) => { + // Abort all writers before returning the error: + for writer in writers { + let mut abort_future = writer.abort_writer(); + if let Ok(abort_future) = &mut abort_future { + let _ = abort_future.await; + } + // Ignore errors that occur during abortion, + // We do try to abort all writers before returning error. + } + // After aborting writers return original error. + Err(e) + } + } +} + +/// Implements [`DataSink`] for writing to a CSV file. +struct CsvSink { + /// Config options for writing data + config: FileSinkConfig, + has_header: bool, + delimiter: u8, + file_compression_type: FileCompressionType, +} + +impl Debug for CsvSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CsvSink") + .field("has_header", &self.has_header) + .field("delimiter", &self.delimiter) + .field("file_compression_type", &self.file_compression_type) + .finish() + } +} + +impl Display for CsvSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "CsvSink(writer_mode={:?}, file_groups={})", + self.config.writer_mode, + FileGroupDisplay(&self.config.file_groups), + ) + } +} + +impl CsvSink { + fn new( + config: FileSinkConfig, + has_header: bool, + delimiter: u8, + file_compression_type: FileCompressionType, + ) -> Self { + Self { + config, + has_header, + delimiter, + file_compression_type, + } + } + + // Create a write for Csv files + async fn create_writer( + &self, + file_meta: FileMeta, + object_store: Arc, + ) -> Result>> { + let object = &file_meta.object_meta; + match self.config.writer_mode { + // If the mode is append, call the store's append method and return wrapped in + // a boxed trait object. + FileWriterMode::Append => { + let writer = object_store + .append(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AbortableWrite::new( + self.file_compression_type.convert_async_writer(writer)?, + AbortMode::Append, + ); + Ok(writer) + } + // If the mode is put, create a new AsyncPut writer and return it wrapped in + // a boxed trait object + FileWriterMode::Put => { + let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); + let writer = AbortableWrite::new( + self.file_compression_type.convert_async_writer(writer)?, + AbortMode::Put, + ); + Ok(writer) + } + // If the mode is put multipart, call the store's put_multipart method and + // return the writer wrapped in a boxed trait object. + FileWriterMode::PutMultipart => { + let (multipart_id, writer) = object_store + .put_multipart(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + Ok(AbortableWrite::new( + self.file_compression_type.convert_async_writer(writer)?, + AbortMode::MultiPart(MultiPart::new( + object_store, + multipart_id, + object.location.clone(), + )), + )) + } + } + } +} + +#[async_trait] +impl DataSink for CsvSink { + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + context: &Arc, + ) -> Result { + let num_partitions = self.config.file_groups.len(); + + let object_store = context + .runtime_env() + .object_store(&self.config.object_store_url)?; + + // Construct serializer and writer for each file group + let mut serializers = vec![]; + let mut writers = vec![]; + for file_group in &self.config.file_groups { + // In append mode, consider has_header flag only when file is empty (at the start). + // For other modes, use has_header flag as is. + let header = self.has_header + && (!matches!(&self.config.writer_mode, FileWriterMode::Append) + || file_group.object_meta.size == 0); + let builder = WriterBuilder::new().with_delimiter(self.delimiter); + let serializer = CsvSerializer::new() + .with_builder(builder) + .with_header(header); + serializers.push(serializer); + + let file = file_group.clone(); + let writer = self + .create_writer(file.object_meta.clone().into(), object_store.clone()) + .await?; + writers.push(writer); + } + + let mut idx = 0; + let mut row_count = 0; + // Map errors to DatafusionError. + let err_converter = + |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); + while let Some(maybe_batch) = data.next().await { + // Write data to files in a round robin fashion: + idx = (idx + 1) % num_partitions; + let serializer = &mut serializers[idx]; + let batch = check_for_errors(maybe_batch, &mut writers).await?; + row_count += batch.num_rows(); + let bytes = + check_for_errors(serializer.serialize(batch).await, &mut writers).await?; + let writer = &mut writers[idx]; + check_for_errors( + writer.write_all(&bytes).await.map_err(err_converter), + &mut writers, + ) + .await?; + } + // Perform cleanup: + let n_writers = writers.len(); + for idx in 0..n_writers { + check_for_errors( + writers[idx].shutdown().await.map_err(err_converter), + &mut writers, + ) + .await?; + } + Ok(row_count as u64) + } +} + #[cfg(test)] mod tests { use super::super::test_util::scan_format; @@ -333,6 +597,7 @@ mod tests { use crate::physical_plan::collect; use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext}; use crate::test_util::arrow_test_data; + use arrow::compute::concat_batches; use bytes::Bytes; use chrono::DateTime; use datafusion_common::cast::as_string_array; @@ -606,4 +871,52 @@ mod tests { let format = CsvFormat::default(); scan_format(state, &format, &root, file_name, projection, limit).await } + + #[tokio::test] + async fn test_csv_serializer() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx + .read_csv( + &format!("{}/csv/aggregate_test_100.csv", arrow_test_data()), + CsvReadOptions::default().has_header(true), + ) + .await?; + let batches = df + .select_columns(&["c2", "c3"])? + .limit(0, Some(10))? + .collect() + .await?; + let batch = concat_batches(&batches[0].schema(), &batches)?; + let mut serializer = CsvSerializer::new(); + let bytes = serializer.serialize(batch).await?; + assert_eq!( + "c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n", + String::from_utf8(bytes.into()).unwrap() + ); + Ok(()) + } + + #[tokio::test] + async fn test_csv_serializer_no_header() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx + .read_csv( + &format!("{}/csv/aggregate_test_100.csv", arrow_test_data()), + CsvReadOptions::default().has_header(true), + ) + .await?; + let batches = df + .select_columns(&["c2", "c3"])? + .limit(0, Some(10))? + .collect() + .await?; + let batch = concat_batches(&batches[0].schema(), &batches)?; + let mut serializer = CsvSerializer::new().with_header(false); + let bytes = serializer.serialize(batch).await?; + assert_eq!( + "2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n", + String::from_utf8(bytes.into()).unwrap() + ); + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs index 58877c7a82e1..567fffb32367 100644 --- a/datafusion/core/src/datasource/file_format/file_type.rs +++ b/datafusion/core/src/datasource/file_format/file_type.rs @@ -19,6 +19,7 @@ use crate::error::{DataFusionError, Result}; +use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; @@ -31,6 +32,8 @@ use async_compression::tokio::bufread::{ ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder, }; +#[cfg(feature = "compression")] +use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder}; use bytes::Bytes; #[cfg(feature = "compression")] use bzip2::read::MultiBzDecoder; @@ -38,12 +41,12 @@ use datafusion_common::parsers::CompressionTypeVariant; #[cfg(feature = "compression")] use flate2::read::MultiGzDecoder; -use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION; use futures::stream::BoxStream; use futures::StreamExt; #[cfg(feature = "compression")] use futures::TryStreamExt; use std::str::FromStr; +use tokio::io::AsyncWrite; #[cfg(feature = "compression")] use tokio_util::io::{ReaderStream, StreamReader}; #[cfg(feature = "compression")] @@ -149,6 +152,31 @@ impl FileCompressionType { }) } + /// Wrap the given `AsyncWrite` so that it performs compressed writes + /// according to this `FileCompressionType`. + pub fn convert_async_writer( + &self, + w: Box, + ) -> Result> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => Box::new(GzipEncoder::new(w)), + #[cfg(feature = "compression")] + BZIP2 => Box::new(BzEncoder::new(w)), + #[cfg(feature = "compression")] + XZ => Box::new(XzEncoder::new(w)), + #[cfg(feature = "compression")] + ZSTD => Box::new(ZstdEncoder::new(w)), + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } + UNCOMPRESSED => w, + }) + } + /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`. pub fn convert_stream( &self, diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 90037d32f549..71bd8f1c07b8 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -29,19 +29,30 @@ pub mod options; pub mod parquet; use std::any::Any; -use std::fmt; +use std::io::Error; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{fmt, mem}; use crate::arrow::datatypes::SchemaRef; use crate::error::Result; -use crate::physical_plan::file_format::FileScanConfig; +use crate::execution::context::SessionState; +use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig}; use crate::physical_plan::{ExecutionPlan, Statistics}; -use crate::execution::context::SessionState; -use async_trait::async_trait; +use arrow_array::RecordBatch; +use datafusion_common::DataFusionError; use datafusion_physical_expr::PhysicalExpr; -use object_store::{ObjectMeta, ObjectStore}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::ready; +use futures::FutureExt; +use object_store::path::Path; +use object_store::{MultipartId, ObjectMeta, ObjectStore}; +use tokio::io::AsyncWrite; /// This trait abstracts all the file format specific implementations /// from the [`TableProvider`]. This helps code re-utilization across /// providers that support the the same file formats. @@ -87,6 +98,219 @@ pub trait FileFormat: Send + Sync + fmt::Debug { conf: FileScanConfig, filters: Option<&Arc>, ) -> Result>; + + /// Take a list of files and the configuration to convert it to the + /// appropriate writer executor according to this file format. + async fn create_writer_physical_plan( + &self, + _input: Arc, + _state: &SessionState, + _conf: FileSinkConfig, + ) -> Result> { + let msg = "Writer not implemented for this format".to_owned(); + Err(DataFusionError::NotImplemented(msg)) + } +} + +/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. +/// It is specifically designed for the `object_store` crate's `put` method and sends +/// whole bytes at once when the buffer is flushed. +pub struct AsyncPutWriter { + /// Object metadata + object_meta: ObjectMeta, + /// A shared reference to the object store + store: Arc, + /// A buffer that stores the bytes to be sent + current_buffer: Vec, + /// Used for async handling in flush method + inner_state: AsyncPutState, +} + +impl AsyncPutWriter { + /// Constructor for the `AsyncPutWriter` object + pub fn new(object_meta: ObjectMeta, store: Arc) -> Self { + Self { + object_meta, + store, + current_buffer: vec![], + // The writer starts out in buffering mode + inner_state: AsyncPutState::Buffer, + } + } + + /// Separate implementation function that unpins the [`AsyncPutWriter`] so + /// that partial borrows work correctly + fn poll_shutdown_inner( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match &mut self.inner_state { + AsyncPutState::Buffer => { + // Convert the current buffer to bytes and take ownership of it + let bytes = Bytes::from(mem::take(&mut self.current_buffer)); + // Set the inner state to Put variant with the bytes + self.inner_state = AsyncPutState::Put { bytes } + } + AsyncPutState::Put { bytes } => { + // Send the bytes to the object store's put method + return Poll::Ready( + ready!(self + .store + .put(&self.object_meta.location, bytes.clone()) + .poll_unpin(cx)) + .map_err(Error::from), + ); + } + } + } + } +} + +/// An enum that represents the inner state of AsyncPut +enum AsyncPutState { + /// Building Bytes struct in this state + Buffer, + /// Data in the buffer is being sent to the object store + Put { bytes: Bytes }, +} + +impl AsyncWrite for AsyncPutWriter { + // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + // Extend the current buffer with the incoming buffer + self.current_buffer.extend_from_slice(buf); + // Return a ready poll with the length of the incoming buffer + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + // Return a ready poll with an empty result + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // Call the poll_shutdown_inner method to handle the actual sending of data to the object store + self.poll_shutdown_inner(cx) + } +} + +/// Stores data needed during abortion of MultiPart writers +pub(crate) struct MultiPart { + /// A shared reference to the object store + store: Arc, + multipart_id: MultipartId, + location: Path, +} + +impl MultiPart { + /// Create a new `MultiPart` + pub fn new( + store: Arc, + multipart_id: MultipartId, + location: Path, + ) -> Self { + Self { + store, + multipart_id, + location, + } + } +} + +pub(crate) enum AbortMode { + Put, + Append, + MultiPart(MultiPart), +} + +/// A wrapper struct with abort method and writer +struct AbortableWrite { + writer: W, + mode: AbortMode, +} + +impl AbortableWrite { + /// Create a new `AbortableWrite` instance with the given writer, and write mode. + fn new(writer: W, mode: AbortMode) -> Self { + Self { writer, mode } + } + + /// handling of abort for different write modes + fn abort_writer(&self) -> Result>> { + match &self.mode { + AbortMode::Put => Ok(async { Ok(()) }.boxed()), + AbortMode::Append => Err(DataFusionError::Execution( + "Cannot abort in append mode".to_string(), + )), + AbortMode::MultiPart(MultiPart { + store, + multipart_id, + location, + }) => { + let location = location.clone(); + let multipart_id = multipart_id.clone(); + let store = store.clone(); + Ok(Box::pin(async move { + store + .abort_multipart(&location, &multipart_id) + .await + .map_err(DataFusionError::ObjectStore) + })) + } + } + } +} + +impl AsyncWrite for AbortableWrite { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) + } +} + +/// An enum that defines different file writer modes. +#[derive(Debug, Clone, Copy)] +pub enum FileWriterMode { + /// Data is appended to an existing file. + Append, + /// Data is written to a new file. + Put, + /// Data is written to a new file in multiple parts. + PutMultipart, +} +/// A trait that defines the methods required for a RecordBatch serializer. +#[async_trait] +pub trait BatchSerializer: Unpin + Send { + /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. + async fn serialize(&mut self, batch: RecordBatch) -> Result; } #[cfg(test)] diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 76711e6bfc72..fd316a74b282 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -44,7 +44,7 @@ use crate::datasource::{ }; use crate::logical_expr::TableProviderFilterPushDown; use crate::physical_plan; -use crate::physical_plan::file_format::FileScanConfig; +use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig}; use crate::{ error::{DataFusionError, Result}, execution::context::SessionState, @@ -759,6 +759,63 @@ impl TableProvider for ListingTable { fn get_table_definition(&self) -> Option<&str> { self.definition.as_deref() } + + async fn insert_into( + &self, + state: &SessionState, + input: Arc, + ) -> Result> { + // Check that the schema of the plan matches the schema of this table. + if !input.schema().eq(&self.schema()) { + return Err(DataFusionError::Plan( + // Return an error if schema of the input query does not match with the table schema. + "Inserting query must have the same schema with the table.".to_string(), + )); + } + + if self.table_paths().len() > 1 { + return Err(DataFusionError::Plan( + "Writing to a table backed by multiple files is not supported yet" + .to_owned(), + )); + } + + let table_path = &self.table_paths()[0]; + // Get the object store for the table path. + let store = state.runtime_env().object_store(table_path)?; + + let file_list_stream = pruned_partition_list( + store.as_ref(), + table_path, + &[], + &self.options.file_extension, + &self.options.table_partition_cols, + ) + .await?; + + let file_groups = file_list_stream.try_collect::>().await?; + + if file_groups.len() > 1 { + return Err(DataFusionError::Plan( + "Datafusion currently supports tables from single partition and/or file." + .to_owned(), + )); + } + + // Sink related option, apart from format + let config = FileSinkConfig { + object_store_url: self.table_paths()[0].object_store(), + file_groups, + output_schema: input.schema(), + table_partition_cols: self.options.table_partition_cols.clone(), + writer_mode: crate::datasource::file_format::FileWriterMode::Append, + }; + + self.options() + .format + .create_writer_physical_plan(input, state, config) + .await + } } impl ListingTable { @@ -830,16 +887,24 @@ impl ListingTable { mod tests { use super::*; use crate::datasource::file_format::file_type::GetExt; + use crate::datasource::{provider_as_source, MemTable}; + use crate::physical_plan::collect; use crate::prelude::*; use crate::{ + assert_batches_eq, datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat}, execution::options::ReadOptions, logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, }; + use arrow::csv; use arrow::datatypes::{DataType, Schema}; + use arrow::error::Result as ArrowResult; + use arrow::record_batch::RecordBatch; use chrono::DateTime; use datafusion_common::assert_contains; + use datafusion_common::from_slice::FromSlice; + use datafusion_expr::LogicalPlanBuilder; use rstest::*; use std::fs::File; use tempfile::TempDir; @@ -1323,6 +1388,24 @@ mod tests { Ok(Arc::new(table)) } + fn load_empty_schema_csv_table( + schema: SchemaRef, + temp_path: &str, + ) -> Result> { + File::create(temp_path)?; + let table_path = ListingTableUrl::parse(temp_path).unwrap(); + + let file_format = CsvFormat::default(); + let listing_options = ListingOptions::new(Arc::new(file_format)); + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(schema); + + let table = ListingTable::try_new(config)?; + Ok(Arc::new(table)) + } + /// Check that the files listed by the table match the specified `output_partitioning` /// when the object store contains `files`. async fn assert_list_files_for_scan_grouping( @@ -1426,4 +1509,167 @@ mod tests { meta2.location = Path::from("test2"); assert!(cache.get(&meta2).is_none()); } + + #[tokio::test] + async fn test_append_plan_to_external_table_stored_as_csv() -> Result<()> { + let file_type = FileType::CSV; + let file_compression_type = FileCompressionType::UNCOMPRESSED; + + // Create the initial context, schema, and batch. + let session_ctx = SessionContext::new(); + // Create a new schema with one field called "a" of type Int32 + let schema = Arc::new(Schema::new(vec![Field::new( + "column1", + DataType::Int32, + false, + )])); + + // Create a new batch of data to insert into the table + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::Int32Array::from_slice([1, 2, 3]))], + )?; + + // Filename with extension + let filename = format!( + "path{}", + file_type + .to_owned() + .get_ext_with_compression(file_compression_type.clone()) + .unwrap() + ); + + // Define batch size for file reader + let batch_size = batch.num_rows(); + + // Create a temporary directory and a CSV file within it. + let tmp_dir = TempDir::new()?; + let path = tmp_dir.path().join(filename); + + let initial_table = + load_empty_schema_csv_table(schema.clone(), path.to_str().unwrap())?; + session_ctx.register_table("t", initial_table)?; + // Create and register the source table with the provided schema and inserted data + let source_table = Arc::new(MemTable::try_new( + schema.clone(), + vec![vec![batch.clone(), batch.clone()]], + )?); + session_ctx.register_table("source", source_table.clone())?; + // Convert the source table into a provider so that it can be used in a query + let source = provider_as_source(source_table); + // Create a table scan logical plan to read from the source table + let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?; + // Create an insert plan to insert the source data into the initial table + let insert_into_table = + LogicalPlanBuilder::insert_into(scan_plan, "t", &schema)?.build()?; + // Create a physical plan from the insert plan + let plan = session_ctx + .state() + .create_physical_plan(&insert_into_table) + .await?; + + // Execute the physical plan and collect the results + let res = collect(plan, session_ctx.task_ctx()).await?; + // Insert returns the number of rows written, in our case this would be 6. + let expected = vec![ + "+-------+", + "| count |", + "+-------+", + "| 6 |", + "+-------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &res); + // Open the CSV file, read its contents as a record batch, and collect the batches into a vector. + let file = File::open(path.clone())?; + let reader = csv::ReaderBuilder::new(schema.clone()) + .has_header(true) + .with_batch_size(batch_size) + .build(file) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + let batches = reader + .collect::>>() + .into_iter() + .collect::>>() + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + // Define the expected result as a vector of strings. + let expected = vec![ + "+---------+", + "| column1 |", + "+---------+", + "| 1 |", + "| 2 |", + "| 3 |", + "| 1 |", + "| 2 |", + "| 3 |", + "+---------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &batches); + + // Create a physical plan from the insert plan + let plan = session_ctx + .state() + .create_physical_plan(&insert_into_table) + .await?; + + // Again, execute the physical plan and collect the results + let res = collect(plan, session_ctx.task_ctx()).await?; + // Insert returns the number of rows written, in our case this would be 6. + let expected = vec![ + "+-------+", + "| count |", + "+-------+", + "| 6 |", + "+-------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &res); + + // Open the CSV file, read its contents as a record batch, and collect the batches into a vector. + let file = File::open(path.clone())?; + let reader = csv::ReaderBuilder::new(schema.clone()) + .has_header(true) + .with_batch_size(batch_size) + .build(file) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + + let batches = reader + .collect::>>() + .into_iter() + .collect::>>() + .map_err(|e| DataFusionError::Internal(e.to_string())); + + // Define the expected result after the second append. + let expected = vec![ + "+---------+", + "| column1 |", + "+---------+", + "| 1 |", + "| 2 |", + "| 3 |", + "| 1 |", + "| 2 |", + "| 3 |", + "| 1 |", + "| 2 |", + "| 3 |", + "| 1 |", + "| 2 |", + "| 3 |", + "+---------+", + ]; + + // Assert that the batches read from the file after the second append match the expected result. + assert_batches_eq!(expected, &batches?); + + // Return Ok if the function + Ok(()) + } } diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 0b8afda333ec..97758031851d 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use async_trait::async_trait; +use datafusion_execution::TaskContext; use tokio::sync::RwLock; use crate::datasource::{TableProvider, TableType}; @@ -223,7 +224,11 @@ impl MemSink { #[async_trait] impl DataSink for MemSink { - async fn write_all(&self, mut data: SendableRecordBatchStream) -> Result { + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + _context: &Arc, + ) -> Result { let num_partitions = self.batches.len(); // buffer up the data round robin style into num_partitions diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 2a68d988614d..cbdd626f0e55 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -35,6 +35,8 @@ use arrow::datatypes::SchemaRef; use datafusion_execution::TaskContext; use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties}; +use super::FileScanConfig; + use bytes::{Buf, Bytes}; use futures::ready; use futures::{StreamExt, TryStreamExt}; @@ -46,8 +48,6 @@ use std::sync::Arc; use std::task::Poll; use tokio::task::{self, JoinHandle}; -use super::FileScanConfig; - /// Execution plan for scanning a CSV file #[derive(Debug, Clone)] pub struct CsvExec { diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 4dd3ad60be22..0d1dc1bee225 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -27,14 +27,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; -use arrow::datatypes::SchemaRef; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; -use datafusion_common::ScalarValue; -use futures::future::BoxFuture; -use futures::stream::BoxStream; -use futures::{ready, FutureExt, Stream, StreamExt}; - use crate::datasource::listing::PartitionedFile; use crate::error::Result; use crate::physical_plan::file_format::{ @@ -45,6 +37,15 @@ use crate::physical_plan::metrics::{ }; use crate::physical_plan::RecordBatchStream; +use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; +use arrow::record_batch::RecordBatch; +use datafusion_common::ScalarValue; + +use futures::future::BoxFuture; +use futures::stream::BoxStream; +use futures::{ready, FutureExt, Stream, StreamExt}; + /// A fallible future that resolves to a stream of [`RecordBatch`] pub type FileOpenFuture = BoxFuture<'static, Result>>>; @@ -138,19 +139,19 @@ enum FileStreamState { } /// A timer that can be started and stopped. -struct StartableTime { - metrics: Time, +pub struct StartableTime { + pub(crate) metrics: Time, // use for record each part cost time, will eventually add into 'metrics'. - start: Option, + pub(crate) start: Option, } impl StartableTime { - fn start(&mut self) { + pub(crate) fn start(&mut self) { assert!(self.start.is_none()); self.start = Some(Instant::now()); } - fn stop(&mut self) { + pub(crate) fn stop(&mut self) { if let Some(start) = self.start.take() { self.metrics.add_elapsed(start); } @@ -519,12 +520,11 @@ impl RecordBatchStream for FileStream { mod tests { use arrow_schema::Schema; use datafusion_common::DataFusionError; - use futures::StreamExt; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; use super::*; + use crate::datasource::file_format::BatchSerializer; use crate::datasource::object_store::ObjectStoreUrl; + use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use crate::prelude::SessionContext; use crate::{ @@ -532,6 +532,13 @@ mod tests { test::{make_partition, object_store::register_test_store}, }; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + use async_trait::async_trait; + use bytes::Bytes; + use futures::StreamExt; + /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] struct TestOpener { @@ -981,4 +988,15 @@ mod tests { Ok(()) } + + struct TestSerializer { + bytes: Bytes, + } + + #[async_trait] + impl BatchSerializer for TestSerializer { + async fn serialize(&mut self, _batch: RecordBatch) -> Result { + Ok(self.bytes.clone()) + } + } } diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 831f81e5997a..c78a39f7aee6 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -31,10 +31,11 @@ pub use self::csv::{CsvConfig, CsvExec, CsvOpener}; pub(crate) use self::parquet::plan_to_parquet; pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; use arrow::{ - array::{ArrayData, ArrayRef, BufferBuilder, DictionaryArray}, + array::{new_null_array, ArrayData, ArrayRef, BufferBuilder, DictionaryArray}, buffer::Buffer, + compute::can_cast_types, datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}, - record_batch::RecordBatch, + record_batch::{RecordBatch, RecordBatchOptions}, }; pub use arrow_file::ArrowExec; pub use avro::AvroExec; @@ -43,6 +44,7 @@ pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; pub(crate) use json::plan_to_json; pub use json::{JsonOpener, NdJsonExec}; +use crate::datasource::file_format::FileWriterMode; use crate::datasource::{ listing::{FileRange, PartitionedFile}, object_store::ObjectStoreUrl, @@ -52,19 +54,17 @@ use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; -use arrow::array::new_null_array; -use arrow::compute::can_cast_types; -use arrow::record_batch::RecordBatchOptions; + use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_physical_expr::expressions::Column; + use log::{debug, warn}; use object_store::path::Path; use object_store::ObjectMeta; -use std::fmt::Debug; use std::{ borrow::Cow, collections::HashMap, - fmt::{Display, Formatter, Result as FmtResult}, + fmt::{Debug, Display, Formatter, Result as FmtResult}, marker::PhantomData, sync::Arc, vec, @@ -238,6 +238,30 @@ impl FileScanConfig { } } +/// The base configurations to provide when creating a physical plan for +/// writing to any given file format. +#[derive(Debug, Clone)] +pub struct FileSinkConfig { + /// Object store URL, used to get an ObjectStore instance + pub object_store_url: ObjectStoreUrl, + /// A vector of [`PartitionedFile`] structs, each representing a file partition + pub file_groups: Vec, + /// The schema of the output file + pub output_schema: SchemaRef, + /// A vector of column names and their corresponding data types, + /// representing the partitioning columns for the file + pub table_partition_cols: Vec<(String, DataType)>, + /// A writer mode that determines how data is written to the file + pub writer_mode: FileWriterMode, +} + +impl FileSinkConfig { + /// Get output schema + pub fn output_schema(&self) -> &SchemaRef { + &self.output_schema + } +} + impl Debug for FileScanConfig { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "object_store_url={:?}, ", self.object_store_url)?; @@ -287,34 +311,19 @@ struct FileGroupsDisplay<'a>(&'a [Vec]); impl<'a> Display for FileGroupsDisplay<'a> { fn fmt(&self, f: &mut Formatter) -> FmtResult { - let mut first_group = true; - let groups = if self.0.len() == 1 { "group" } else { "groups" }; - write!(f, "{{{} {}: [", self.0.len(), groups)?; + let n_group = self.0.len(); + let groups = if n_group == 1 { "group" } else { "groups" }; + write!(f, "{{{n_group} {groups}: [")?; // To avoid showing too many partitions let max_groups = 5; - for group in self.0.iter().take(max_groups) { - if !first_group { + for (idx, group) in self.0.iter().take(max_groups).enumerate() { + if idx > 0 { write!(f, ", ")?; } - first_group = false; - write!(f, "[")?; - - let mut first_file = true; - for pf in group { - if !first_file { - write!(f, ", ")?; - } - first_file = false; - - write!(f, "{}", pf.object_meta.location.as_ref())?; - - if let Some(range) = pf.range.as_ref() { - write!(f, ":{}..{}", range.start, range.end)?; - } - } - write!(f, "]")?; + write!(f, "{}", FileGroupDisplay(group))?; } - if self.0.len() > max_groups { + // Remaining elements are showed as `...` (to indicate there is more) + if n_group > max_groups { write!(f, ", ...")?; } write!(f, "]}}")?; @@ -322,6 +331,33 @@ impl<'a> Display for FileGroupsDisplay<'a> { } } +/// A wrapper to customize partitioned file display +/// +/// Prints in the format: +/// ```text +/// [file1, file2,...] +/// ``` +#[derive(Debug)] +pub(crate) struct FileGroupDisplay<'a>(pub &'a [PartitionedFile]); + +impl<'a> Display for FileGroupDisplay<'a> { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + let group = self.0; + write!(f, "[")?; + for (idx, pf) in group.iter().enumerate() { + if idx > 0 { + write!(f, ", ")?; + } + write!(f, "{}", pf.object_meta.location.as_ref())?; + if let Some(range) = pf.range.as_ref() { + write!(f, ":{}..{}", range.start, range.end)?; + } + } + write!(f, "]")?; + Ok(()) + } +} + /// A wrapper to customize partitioned file display #[derive(Debug)] struct ProjectSchemaDisplay<'a>(&'a SchemaRef); @@ -1306,6 +1342,14 @@ mod tests { assert_eq!(&FileGroupsDisplay(&files).to_string(), expected); } + #[test] + fn file_group_display_many() { + let files = vec![partitioned_file("foo"), partitioned_file("bar")]; + + let expected = "[foo, bar]"; + assert_eq!(&FileGroupDisplay(&files).to_string(), expected); + } + /// create a PartitionedFile for testing fn partitioned_file(path: &str) -> PartitionedFile { let object_meta = ObjectMeta { diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index e1252fbab728..c608b64e6f96 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -54,7 +54,11 @@ pub trait DataSink: Display + Debug + Send + Sync { /// This method will be called exactly once during each DML /// statement. Thus prior to return, the sink should do any commit /// or rollback required. - async fn write_all(&self, data: SendableRecordBatchStream) -> Result; + async fn write_all( + &self, + data: SendableRecordBatchStream, + context: &Arc, + ) -> Result; } /// Execution plan for writing record batches to a [`DataSink`] @@ -163,12 +167,12 @@ impl ExecutionPlan for InsertExec { ))); } - let data = self.input.execute(0, context)?; + let data = self.input.execute(0, context.clone())?; let schema = self.schema.clone(); let sink = self.sink.clone(); let stream = futures::stream::once(async move { - sink.write_all(data).await.map(make_count_batch) + sink.write_all(data, &context).await.map(make_count_batch) }) .boxed(); diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index e972d39f4a4f..76bc487eabb9 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -21,6 +21,7 @@ #[cfg(test)] mod unix_test { use arrow::array::Array; + use arrow::csv::ReaderBuilder; use arrow::datatypes::{DataType, Field, Schema}; use datafusion::test_util::register_unbounded_file_with_ordering; use datafusion::{ @@ -332,4 +333,94 @@ mod unix_test { ); Ok(()) } + + /// It tests the INSERT INTO functionality. + #[tokio::test] + async fn test_sql_insert_into_fifo() -> Result<()> { + // To make unbounded deterministic + let waiting = Arc::new(AtomicBool::new(true)); + let waiting_thread = waiting.clone(); + // create local execution context + let config = SessionConfig::new().with_batch_size(TEST_BATCH_SIZE); + let ctx = SessionContext::with_config(config); + // Create a new temporary FIFO file + let tmp_dir = TempDir::new()?; + let source_fifo_path = create_fifo_file(&tmp_dir, "source.csv")?; + // Prevent move + let (source_fifo_path_thread, source_display_fifo_path) = + (source_fifo_path.clone(), source_fifo_path.display()); + // Tasks + let mut tasks: Vec> = vec![]; + // TEST_BATCH_SIZE + 1 rows will be provided. However, after processing precisely + // TEST_BATCH_SIZE rows, the program will pause and wait for a batch to be read in another + // thread. This approach ensures that the pipeline remains unbroken. + tasks.push(create_writing_thread( + source_fifo_path_thread, + "a1,a2\n".to_owned(), + (0..TEST_DATA_SIZE) + .map(|_| "a,1\n".to_string()) + .collect::>(), + waiting, + TEST_BATCH_SIZE, + )); + // Create a new temporary FIFO file + let sink_fifo_path = create_fifo_file(&tmp_dir, "sink.csv")?; + // Prevent move + let (sink_fifo_path_thread, sink_display_fifo_path) = + (sink_fifo_path.clone(), sink_fifo_path.display()); + // Spawn a new thread to read sink EXTERNAL TABLE. + tasks.push(thread::spawn(move || { + let file = File::open(sink_fifo_path_thread).unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Utf8, false), + Field::new("a2", DataType::UInt32, false), + ])); + + let mut reader = ReaderBuilder::new(schema) + .has_header(true) + .with_batch_size(TEST_BATCH_SIZE) + .build(file) + .map_err(|e| DataFusionError::Internal(e.to_string())) + .unwrap(); + + while let Some(Ok(_)) = reader.next() { + waiting_thread.store(false, Ordering::SeqCst); + } + })); + // register second csv file with the SQL (create an empty file if not found) + ctx.sql(&format!( + "CREATE EXTERNAL TABLE source_table ( + a1 VARCHAR NOT NULL, + a2 INT NOT NULL + ) + STORED AS CSV + WITH HEADER ROW + OPTIONS ('UNBOUNDED' 'TRUE') + LOCATION '{source_display_fifo_path}'" + )) + .await?; + + // register csv file with the SQL + ctx.sql(&format!( + "CREATE EXTERNAL TABLE sink_table ( + a1 VARCHAR NOT NULL, + a2 INT NOT NULL + ) + STORED AS CSV + WITH HEADER ROW + OPTIONS ('UNBOUNDED' 'TRUE') + LOCATION '{sink_display_fifo_path}'" + )) + .await?; + + let df = ctx + .sql( + "INSERT INTO sink_table + SELECT a1, a2 FROM source_table", + ) + .await?; + df.collect().await?; + tasks.into_iter().for_each(|jh| jh.join().unwrap()); + Ok(()) + } } diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt index 6a9d07aba7ad..d230286adcb9 100644 --- a/datafusion/core/tests/sqllogictests/test_files/explain.slt +++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt @@ -133,6 +133,42 @@ EXPLAIN SELECT a, b, c FROM simple_explain_test logical_plan TableScan: simple_explain_test projection=[a, b, c] physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true +# create a sink table, path is same with aggregate_test_100 table +# we do not overwrite this file, we only assert plan. +statement ok +CREATE EXTERNAL TABLE sink_table ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT NOT NULL, + c5 INTEGER NOT NULL, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) +STORED AS CSV +WITH HEADER ROW +LOCATION '../../testing/data/csv/aggregate_test_100.csv'; + +query TT +EXPLAIN INSERT INTO sink_table SELECT * FROM aggregate_test_100 ORDER by c1 +---- +logical_plan +Dml: op=[Insert] table=[sink_table] +--Projection: aggregate_test_100.c1 AS c1, aggregate_test_100.c2 AS c2, aggregate_test_100.c3 AS c3, aggregate_test_100.c4 AS c4, aggregate_test_100.c5 AS c5, aggregate_test_100.c6 AS c6, aggregate_test_100.c7 AS c7, aggregate_test_100.c8 AS c8, aggregate_test_100.c9 AS c9, aggregate_test_100.c10 AS c10, aggregate_test_100.c11 AS c11, aggregate_test_100.c12 AS c12, aggregate_test_100.c13 AS c13 +----Sort: aggregate_test_100.c1 ASC NULLS LAST +------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] +physical_plan +InsertExec: sink=CsvSink(writer_mode=Append, file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]) +--ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c5@4 as c5, c6@5 as c6, c7@6 as c7, c8@7 as c8, c9@8 as c9, c10@9 as c10, c11@10 as c11, c12@11 as c12, c13@12 as c13] +----SortExec: expr=[c1@0 ASC NULLS LAST] +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true + # test EXPLAIN VERBOSE query TT EXPLAIN VERBOSE SELECT a, b, c FROM simple_explain_test