Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataSink Dynamic Execution Time Demux #7791

Merged
merged 11 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,24 @@ config_namespace! {

/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32

/// Target number of rows in output files when writing multiple.
/// This is a soft max, so it can be exceeded slightly. There also
/// will be one file smaller than the limit if the total
/// number of rows written is not roughly divisible by the soft max
pub soft_max_rows_per_output_file: usize, default = 50000000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ideal value here is very situational, so definitely need to make this configurable at the statement and table level.


/// This is the maximum number of output files being written
/// in parallel. Higher values can potentially give faster write
/// performance at the cost of higher peak memory consumption.
pub max_parallel_ouput_files: usize, default = 8

/// This is the maximum number of RecordBatches buffered
/// for each output file being worked. Higher values can potentially
/// give faster write performance at the cost of higher peak
/// memory consumption
pub max_buffered_batches_per_output_file: usize, default = 2

}
}

Expand Down
192 changes: 86 additions & 106 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;

use super::write::{stateless_append_all, stateless_multipart_put};
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::file_format::write::{
create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode,
};
use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
Expand All @@ -51,7 +50,6 @@ use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};
use rand::distributions::{Alphanumeric, DistString};

/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
Expand Down Expand Up @@ -481,6 +479,82 @@ impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
Self { config }
}

async fn append_all(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very nice cleanup / refactor

&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
let compression = FileCompressionType::from(*compression);

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
let file_groups = &self.config.file_groups;

let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move |file_size| {
let inner_clone = builder_clone.clone();
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 0 {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(false)
} else {
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(options_clone.has_header)
});
serializer
};

stateless_append_all(
data,
context,
object_store,
file_groups,
self.config.unbounded_input,
compression,
Box::new(get_serializer),
)
.await
}

async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let builder = &writer_options.writer_options;

let builder_clone = builder.clone();
let options_clone = writer_options.clone();
let get_serializer = move || {
let inner_clone = builder_clone.clone();
let serializer: Box<dyn BatchSerializer> = Box::new(
CsvSerializer::new()
.with_builder(inner_clone)
.with_header(options_clone.has_header),
);
serializer
};

stateless_multipart_put(
data,
context,
"csv".into(),
Box::new(get_serializer),
&self.config,
writer_options.compression.into(),
)
.await
}
}

#[async_trait]
Expand All @@ -495,116 +569,22 @@ impl DataSink for CsvSink {

async fn write_all(
&self,
data: Vec<SendableRecordBatchStream>,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = data.len();
let writer_options = self.config.file_type_writer_options.try_into_csv()?;
let (builder, compression) =
(&writer_options.writer_options, &writer_options.compression);
let mut has_header = writer_options.has_header;
let compression = FileCompressionType::from(*compression);

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
// Construct serializer and writer for each file group
let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
for file_group in &self.config.file_groups {
let mut append_builder = builder.clone();
// In append mode, consider has_header flag only when file is empty (at the start).
// For other modes, use has_header flag as is.
if file_group.object_meta.size != 0 {
has_header = false;
append_builder = append_builder.has_headers(false);
}
let serializer = CsvSerializer::new()
.with_builder(append_builder)
.with_header(has_header);
serializers.push(Box::new(serializer));

let file = file_group.clone();
let writer = create_writer(
self.config.writer_mode,
compression,
file.object_meta.clone().into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
FileWriterMode::Put => {
return not_impl_err!("Put Mode is not implemented for CSV Sink yet")
let total_count = self.append_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::PutMultipart => {
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
match self.config.single_file_output {
false => {
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
let write_id =
Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let serializer = CsvSerializer::new()
.with_builder(builder.clone())
.with_header(has_header);
serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("{}_{}.csv", write_id, part_idx));
let object_meta = ObjectMeta {
location: file_path,
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
compression,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
true => {
let serializer = CsvSerializer::new()
.with_builder(builder.clone())
.with_header(has_header);
serializers.push(Box::new(serializer));
let file_path = base_path.prefix();
let object_meta = ObjectMeta {
location: file_path.clone(),
last_modified: chrono::offset::Utc::now(),
size: 0,
e_tag: None,
};
let writer = create_writer(
self.config.writer_mode,
compression,
object_meta.into(),
object_store.clone(),
)
.await?;
writers.push(writer);
}
}
let total_count = self.multipartput_all(data, context).await?;
Ok(total_count)
}
FileWriterMode::Put => {
return not_impl_err!("FileWriterMode::Put is not supported yet!")
}
}

stateless_serialize_and_write_files(
data,
serializers,
writers,
self.config.single_file_output,
self.config.unbounded_input,
)
.await
}
}

Expand Down
Loading