Skip to content

Commit

Permalink
Add support for appending data to external tables - CSV (#6526)
Browse files Browse the repository at this point in the history
* MemoryExec insert into refactor

* Merge leftovers

* Set target partition

* Comment and formatting improvements

* Comments on state.

* ListingTable INSERT INTO support

* Removing unnecessary code

* Some comments are leftover.

* Compression import error

* Minor resolutions on cargo docs

* Corrections after merge

* Make FileWriterExt available

* Single file support

* Resolve linter errors

* Minor changes, simplifications

* Fix failing tests because of api change

* Simplifications

* Replace block nesting with drop

* remove unnecessary code

* Convert to new approach

* simplify display

* Update debug display

* use handle err macro

* Make handle err close all writer in case of error.

* Final review, stylistic changes

* Improve comments

* Move insert into test to the explain.slt

* convert macro to function

* return error for abort in append mode.

* Simplify condition of has header

* Update comments

* Remove file writer factory

* use AbortableWrite struct instead of trait

---------

Co-authored-by: metesynnada <100111937+metesynnada@users.noreply.github.com>
Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
3 people authored Jun 6, 2023
1 parent 44b83a1 commit 36292f6
Show file tree
Hide file tree
Showing 11 changed files with 1,076 additions and 67 deletions.
329 changes: 321 additions & 8 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,41 @@
//! CSV format abstractions

use std::any::Any;

use std::collections::HashSet;
use std::fmt;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use bytes::{Buf, Bytes};

use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;

use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use super::FileFormat;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::file_format::FileWriterMode;
use crate::datasource::file_format::{
AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::file_format::{
CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
};
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::Statistics;
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
Expand Down Expand Up @@ -220,6 +231,22 @@ impl FileFormat for CsvFormat {
);
Ok(Arc::new(exec))
}

async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let sink = Arc::new(CsvSink::new(
conf,
self.has_header,
self.delimiter,
self.file_compression_type.clone(),
));

Ok(Arc::new(InsertExec::new(input, sink)) as _)
}
}

impl CsvFormat {
Expand Down Expand Up @@ -324,6 +351,243 @@ fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schem
Schema::new(fields)
}

impl Default for CsvSerializer {
fn default() -> Self {
Self::new()
}
}

/// Define a struct for serializing CSV records to a stream
pub struct CsvSerializer {
// CSV writer builder
builder: WriterBuilder,
// Inner buffer for avoiding reallocation
buffer: Vec<u8>,
// Flag to indicate whether there will be a header
header: bool,
}

impl CsvSerializer {
/// Constructor for the CsvSerializer object
pub fn new() -> Self {
Self {
builder: WriterBuilder::new(),
header: true,
buffer: Vec::with_capacity(4096),
}
}

/// Method for setting the CSV writer builder
pub fn with_builder(mut self, builder: WriterBuilder) -> Self {
self.builder = builder;
self
}

/// Method for setting the CSV writer header status
pub fn with_header(mut self, header: bool) -> Self {
self.header = header;
self
}
}

#[async_trait]
impl BatchSerializer for CsvSerializer {
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
let builder = self.builder.clone();
let mut writer = builder.has_headers(self.header).build(&mut self.buffer);
writer.write(&batch)?;
drop(writer);
self.header = false;
Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
}
}

async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
result: Result<T>,
writers: &mut [AbortableWrite<W>],
) -> Result<T> {
match result {
Ok(value) => Ok(value),
Err(e) => {
// Abort all writers before returning the error:
for writer in writers {
let mut abort_future = writer.abort_writer();
if let Ok(abort_future) = &mut abort_future {
let _ = abort_future.await;
}
// Ignore errors that occur during abortion,
// We do try to abort all writers before returning error.
}
// After aborting writers return original error.
Err(e)
}
}
}

/// Implements [`DataSink`] for writing to a CSV file.
struct CsvSink {
/// Config options for writing data
config: FileSinkConfig,
has_header: bool,
delimiter: u8,
file_compression_type: FileCompressionType,
}

impl Debug for CsvSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CsvSink")
.field("has_header", &self.has_header)
.field("delimiter", &self.delimiter)
.field("file_compression_type", &self.file_compression_type)
.finish()
}
}

impl Display for CsvSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"CsvSink(writer_mode={:?}, file_groups={})",
self.config.writer_mode,
FileGroupDisplay(&self.config.file_groups),
)
}
}

impl CsvSink {
fn new(
config: FileSinkConfig,
has_header: bool,
delimiter: u8,
file_compression_type: FileCompressionType,
) -> Self {
Self {
config,
has_header,
delimiter,
file_compression_type,
}
}

// Create a write for Csv files
async fn create_writer(
&self,
file_meta: FileMeta,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let object = &file_meta.object_meta;
match self.config.writer_mode {
// If the mode is append, call the store's append method and return wrapped in
// a boxed trait object.
FileWriterMode::Append => {
let writer = object_store
.append(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
let writer = AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::Append,
);
Ok(writer)
}
// If the mode is put, create a new AsyncPut writer and return it wrapped in
// a boxed trait object
FileWriterMode::Put => {
let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store));
let writer = AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::Put,
);
Ok(writer)
}
// If the mode is put multipart, call the store's put_multipart method and
// return the writer wrapped in a boxed trait object.
FileWriterMode::PutMultipart => {
let (multipart_id, writer) = object_store
.put_multipart(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
Ok(AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::MultiPart(MultiPart::new(
object_store,
multipart_id,
object.location.clone(),
)),
))
}
}
}
}

#[async_trait]
impl DataSink for CsvSink {
async fn write_all(
&self,
mut data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = self.config.file_groups.len();

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;

// Construct serializer and writer for each file group
let mut serializers = vec![];
let mut writers = vec![];
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let header = self.has_header
&& (!matches!(&self.config.writer_mode, FileWriterMode::Append)
|| file_group.object_meta.size == 0);
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);

let file = file_group.clone();
let writer = self
.create_writer(file.object_meta.clone().into(), object_store.clone())
.await?;
writers.push(writer);
}

let mut idx = 0;
let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
while let Some(maybe_batch) = data.next().await {
// Write data to files in a round robin fashion:
idx = (idx + 1) % num_partitions;
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers).await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
}
// Perform cleanup:
let n_writers = writers.len();
for idx in 0..n_writers {
check_for_errors(
writers[idx].shutdown().await.map_err(err_converter),
&mut writers,
)
.await?;
}
Ok(row_count as u64)
}
}

#[cfg(test)]
mod tests {
use super::super::test_util::scan_format;
Expand All @@ -333,6 +597,7 @@ mod tests {
use crate::physical_plan::collect;
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::arrow_test_data;
use arrow::compute::concat_batches;
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
Expand Down Expand Up @@ -606,4 +871,52 @@ mod tests {
let format = CsvFormat::default();
scan_format(state, &format, &root, file_name, projection, limit).await
}

#[tokio::test]
async fn test_csv_serializer() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.read_csv(
&format!("{}/csv/aggregate_test_100.csv", arrow_test_data()),
CsvReadOptions::default().has_header(true),
)
.await?;
let batches = df
.select_columns(&["c2", "c3"])?
.limit(0, Some(10))?
.collect()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let mut serializer = CsvSerializer::new();
let bytes = serializer.serialize(batch).await?;
assert_eq!(
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
);
Ok(())
}

#[tokio::test]
async fn test_csv_serializer_no_header() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.read_csv(
&format!("{}/csv/aggregate_test_100.csv", arrow_test_data()),
CsvReadOptions::default().has_header(true),
)
.await?;
let batches = df
.select_columns(&["c2", "c3"])?
.limit(0, Some(10))?
.collect()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let mut serializer = CsvSerializer::new().with_header(false);
let bytes = serializer.serialize(batch).await?;
assert_eq!(
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
);
Ok(())
}
}
Loading

0 comments on commit 36292f6

Please sign in to comment.