Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for appending data to external tables - CSV #6526

Merged
merged 44 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f40b186
MemoryExec insert into refactor
metesynnada Apr 10, 2023
1e45a2c
Merge branch 'main' into enhance/insert_into_as_exec
metesynnada Apr 14, 2023
7bb7757
Merge leftovers
metesynnada Apr 14, 2023
741abdb
Set target partition
metesynnada Apr 14, 2023
242eca0
Comment and formatting improvements
ozankabak Apr 18, 2023
558a468
Comments on state.
metesynnada Apr 18, 2023
223b1e4
Merge branch 'main' into enhance/insert_into_as_exec
metesynnada Apr 21, 2023
e0d4dff
ListingTable INSERT INTO support
metesynnada Mar 14, 2023
4d0fe4d
Merge branch 'main' into feature/listing_table_insert_into_support
metesynnada Apr 21, 2023
4e84bbc
Removing unnecessary code
metesynnada Apr 21, 2023
6d117c1
Some comments are leftover.
metesynnada Apr 21, 2023
cd4ab10
Compression import error
metesynnada Apr 21, 2023
41e4bb7
Minor resolutions on cargo docs
metesynnada Apr 21, 2023
4b11606
Merge branch 'main' into feature/listing_table_insert_into_support
metesynnada May 3, 2023
03d5d97
Corrections after merge
metesynnada May 3, 2023
53d9ca7
Make FileWriterExt available
metesynnada May 5, 2023
31de586
Merge remote-tracking branch 'upstream/main' into feature/listing_tab…
metesynnada May 5, 2023
997c19a
Single file support
metesynnada May 5, 2023
962a03d
Resolve linter errors
mustafasrepo May 11, 2023
3e0b14f
Minor changes, simplifications
mustafasrepo May 12, 2023
d8c7d38
Merge branch 'main' into feature/listing_table_insert_into_support
mustafasrepo May 16, 2023
d6fc57d
Fix failing tests because of api change
mustafasrepo May 16, 2023
dd0d47a
Simplifications
ozankabak May 26, 2023
bbc535f
Replace block nesting with drop
ozankabak May 26, 2023
ea6fc47
Merge branch 'main' into feature/listing_table_insert_into_support
mustafasrepo May 26, 2023
e717343
remove unnecessary code
mustafasrepo May 26, 2023
5a443e9
Convert to new approach
mustafasrepo Jun 1, 2023
83f9ded
Merge branch 'main' into feature/listing_table_insert_into_support2
mustafasrepo Jun 1, 2023
192d5c0
simplify display
mustafasrepo Jun 1, 2023
1a60f49
Update debug display
mustafasrepo Jun 1, 2023
1cd54d0
use handle err macro
mustafasrepo Jun 1, 2023
de82bda
Make handle err close all writer in case of error.
mustafasrepo Jun 1, 2023
f14ebae
Merge branch 'main' into feature/listing_table_insert_into_support2
mustafasrepo Jun 1, 2023
0318ffd
Final review, stylistic changes
ozankabak Jun 2, 2023
9db8d93
Merge branch 'main' into feature/listing_table_insert_into_support2
mustafasrepo Jun 2, 2023
8e591a1
Improve comments
ozankabak Jun 5, 2023
22b9833
Move insert into test to the explain.slt
mustafasrepo Jun 5, 2023
f36ada4
convert macro to function
mustafasrepo Jun 5, 2023
19fa6d9
return error for abort in append mode.
mustafasrepo Jun 5, 2023
31d99cb
Simplify condition of has header
mustafasrepo Jun 5, 2023
8b3eb9c
Update comments
mustafasrepo Jun 5, 2023
39c80e6
Remove file writer factory
mustafasrepo Jun 6, 2023
5ce2303
use AbortableWrite struct instead of trait
mustafasrepo Jun 6, 2023
72f8e36
Merge branch 'main' into feature/listing_table_insert_into_support2
mustafasrepo Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 322 additions & 8 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,41 @@
//! CSV format abstractions

use std::any::Any;

use std::collections::HashSet;
use std::fmt;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use bytes::{Buf, Bytes};

use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;

use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use super::FileFormat;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::file_format::FileWriterMode;
use crate::datasource::file_format::{
AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::file_format::{
CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
};
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::Statistics;
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
Expand Down Expand Up @@ -220,6 +231,22 @@ impl FileFormat for CsvFormat {
);
Ok(Arc::new(exec))
}

async fn create_writer_physical_plan(
&self,
input: Arc<dyn ExecutionPlan>,
_state: &SessionState,
conf: FileSinkConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let sink = Arc::new(CsvSink::new(
conf,
self.has_header,
self.delimiter,
self.file_compression_type.clone(),
));

Ok(Arc::new(InsertExec::new(input, sink)) as _)
}
}

impl CsvFormat {
Expand Down Expand Up @@ -324,6 +351,244 @@ fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schem
Schema::new(fields)
}

impl Default for CsvSerializer {
fn default() -> Self {
Self::new()
}
}

/// Define a struct for serializing CSV records to a stream
pub struct CsvSerializer {
// CSV writer builder
builder: WriterBuilder,
// Inner buffer for avoiding reallocation
buffer: Vec<u8>,
// Flag to indicate whether there will be a header
header: bool,
}

impl CsvSerializer {
/// Constructor for the CsvSerializer object
pub fn new() -> Self {
Self {
builder: WriterBuilder::new(),
header: true,
buffer: Vec::with_capacity(4096),
}
}

/// Method for setting the CSV writer builder
pub fn with_builder(mut self, builder: WriterBuilder) -> Self {
self.builder = builder;
self
}

/// Method for setting the CSV writer header status
pub fn with_header(mut self, header: bool) -> Self {
self.header = header;
self
}
}

#[async_trait]
impl BatchSerializer for CsvSerializer {
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
let builder = self.builder.clone();
let mut writer = builder.has_headers(self.header).build(&mut self.buffer);
writer.write(&batch)?;
drop(writer);
self.header = false;
Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
}
}

async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
result: Result<T>,
writers: &mut [AbortableWrite<W>],
) -> Result<T> {
match result {
Ok(value) => Ok(value),
Err(e) => {
// Abort all writers before returning the error:
for writer in writers {
let mut abort_future = writer.abort_writer();
if let Ok(abort_future) = &mut abort_future {
let _ = abort_future.await;
}
// Ignore errors that occur during abortion,
// We do try to abort all writers before returning error.
}
// After aborting writers return original error.
Err(e)
}
}
}

/// Implements [`DataSink`] for writing to a CSV file.
struct CsvSink {
/// Config options for writing data
config: FileSinkConfig,
has_header: bool,
delimiter: u8,
file_compression_type: FileCompressionType,
}

impl Debug for CsvSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CsvSink")
.field("has_header", &self.has_header)
.field("delimiter", &self.delimiter)
.field("file_compression_type", &self.file_compression_type)
.finish()
}
}

impl Display for CsvSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"CsvSink(writer_mode={:?}, file_groups={})",
self.config.writer_mode,
FileGroupDisplay(&self.config.file_groups),
)
}
}

impl CsvSink {
fn new(
config: FileSinkConfig,
has_header: bool,
delimiter: u8,
file_compression_type: FileCompressionType,
) -> Self {
Self {
config,
has_header,
delimiter,
file_compression_type,
}
}

// Create a write for Csv files
async fn create_writer(
&self,
file_meta: FileMeta,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
let object = &file_meta.object_meta;
match self.config.writer_mode {
// If the mode is append, call the store's append method and return wrapped in
// a boxed trait object.
FileWriterMode::Append => {
let writer = object_store
.append(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
let writer = AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::Append,
);
Ok(writer)
}
// If the mode is put, create a new AsyncPut writer and return it wrapped in
// a boxed trait object
FileWriterMode::Put => {
let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store));
let writer = AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::Put,
);
Ok(writer)
}
// If the mode is put multipart, call the store's put_multipart method and
// return the writer wrapped in a boxed trait object.
FileWriterMode::PutMultipart => {
let (multipart_id, writer) = object_store
.put_multipart(&object.location)
.await
.map_err(DataFusionError::ObjectStore)?;
Ok(AbortableWrite::new(
self.file_compression_type.convert_async_writer(writer)?,
AbortMode::MultiPart(MultiPart::new(
object_store,
multipart_id,
object.location.clone(),
)),
))
}
}
}
}

#[async_trait]
impl DataSink for CsvSink {
async fn write_all(
&self,
mut data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = self.config.file_groups.len();

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;

// Construct serializer and writer for each file group
let mut serializers = vec![];
let mut writers = vec![];
for file_group in &self.config.file_groups {
// In append mode, consider has_header flag only when file is empty (at the start).
let header = if matches!(&self.config.writer_mode, FileWriterMode::Append) {
self.has_header && file_group.object_meta.size == 0
} else {
self.has_header
};
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
serializers.push(serializer);

let file = file_group.clone();
let writer = self
.create_writer(file.object_meta.clone().into(), object_store.clone())
.await?;
writers.push(writer);
}

let mut idx = 0;
let mut row_count = 0;
// Map errors to DatafusionError.
let err_converter =
|_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
while let Some(maybe_batch) = data.next().await {
// Write data to files in a round robin fashion:
idx = (idx + 1) % num_partitions;
let serializer = &mut serializers[idx];
let batch = check_for_errors(maybe_batch, &mut writers).await?;
row_count += batch.num_rows();
let bytes =
check_for_errors(serializer.serialize(batch).await, &mut writers).await?;
let writer = &mut writers[idx];
check_for_errors(
writer.write_all(&bytes).await.map_err(err_converter),
&mut writers,
)
.await?;
}
// Perform cleanup:
let n_writers = writers.len();
for idx in 0..n_writers {
check_for_errors(
writers[idx].shutdown().await.map_err(err_converter),
&mut writers,
)
.await?;
}
Ok(row_count as u64)
}
}

#[cfg(test)]
mod tests {
use super::super::test_util::scan_format;
Expand All @@ -333,6 +598,7 @@ mod tests {
use crate::physical_plan::collect;
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::arrow_test_data;
use arrow::compute::concat_batches;
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
Expand Down Expand Up @@ -605,4 +871,52 @@ mod tests {
let format = CsvFormat::default();
scan_format(state, &format, &root, file_name, projection, limit).await
}

#[tokio::test]
async fn test_csv_serializer() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.read_csv(
&format!("{}/csv/aggregate_test_100.csv", arrow_test_data()),
CsvReadOptions::default().has_header(true),
)
.await?;
let batches = df
.select_columns(&["c2", "c3"])?
.limit(0, Some(10))?
.collect()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let mut serializer = CsvSerializer::new();
let bytes = serializer.serialize(batch).await?;
assert_eq!(
"c2,c3\n2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
);
Ok(())
}

#[tokio::test]
async fn test_csv_serializer_no_header() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.read_csv(
&format!("{}/csv/aggregate_test_100.csv", arrow_test_data()),
CsvReadOptions::default().has_header(true),
)
.await?;
let batches = df
.select_columns(&["c2", "c3"])?
.limit(0, Some(10))?
.collect()
.await?;
let batch = concat_batches(&batches[0].schema(), &batches)?;
let mut serializer = CsvSerializer::new().with_header(false);
let bytes = serializer.serialize(batch).await?;
assert_eq!(
"2,1\n5,-40\n1,29\n1,-85\n5,-82\n4,-111\n3,104\n3,13\n1,38\n4,-38\n",
String::from_utf8(bytes.into()).unwrap()
);
Ok(())
}
}
Loading