Skip to content

Commit

Permalink
Merge branch 'main' into feature/11546-map-df-api
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Jul 21, 2024
2 parents 7a5a05e + 36660fe commit ce5b88b
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 232 deletions.
3 changes: 0 additions & 3 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,6 @@ impl ScalarValue {

/// Create an one value in the given type.
pub fn new_one(datatype: &DataType) -> Result<ScalarValue> {
assert!(datatype.is_primitive());
Ok(match datatype {
DataType::Int8 => ScalarValue::Int8(Some(1)),
DataType::Int16 => ScalarValue::Int16(Some(1)),
Expand All @@ -1086,7 +1085,6 @@ impl ScalarValue {

/// Create a negative one value in the given type.
pub fn new_negative_one(datatype: &DataType) -> Result<ScalarValue> {
assert!(datatype.is_primitive());
Ok(match datatype {
DataType::Int8 | DataType::UInt8 => ScalarValue::Int8(Some(-1)),
DataType::Int16 | DataType::UInt16 => ScalarValue::Int16(Some(-1)),
Expand All @@ -1104,7 +1102,6 @@ impl ScalarValue {
}

pub fn new_ten(datatype: &DataType) -> Result<ScalarValue> {
assert!(datatype.is_primitive());
Ok(match datatype {
DataType::Int8 => ScalarValue::Int8(Some(10)),
DataType::Int16 => ScalarValue::Int16(Some(10)),
Expand Down
62 changes: 48 additions & 14 deletions datafusion/core/src/datasource/file_format/write/orchestration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,37 @@ use tokio::task::JoinSet;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;

/// Result of calling [`serialize_rb_stream_to_object_store`]
pub(crate) enum SerializedRecordBatchResult {
Success {
/// the writer
writer: WriterType,

/// the number of rows successfully written
row_count: usize,
},
Failure {
/// As explained in [`serialize_rb_stream_to_object_store`]:
/// - If an IO error occured that involved the ObjectStore writer, then the writer will not be returned to the caller
/// - Otherwise, the writer is returned to the caller
writer: Option<WriterType>,

/// the actual error that occured
err: DataFusionError,
},
}

impl SerializedRecordBatchResult {
/// Create the success variant
pub fn success(writer: WriterType, row_count: usize) -> Self {
Self::Success { writer, row_count }
}

pub fn failure(writer: Option<WriterType>, err: DataFusionError) -> Self {
Self::Failure { writer, err }
}
}

/// Serializes a single data stream in parallel and writes to an ObjectStore concurrently.
/// Data order is preserved.
///
Expand All @@ -55,7 +86,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
) -> std::result::Result<(WriterType, u64), (Option<WriterType>, DataFusionError)> {
) -> SerializedRecordBatchResult {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes), DataFusionError>>>(100);
let serialize_task = SpawnedTask::spawn(async move {
Expand Down Expand Up @@ -86,43 +117,43 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
match writer.write_all(&bytes).await {
Ok(_) => (),
Err(e) => {
return Err((
return SerializedRecordBatchResult::failure(
None,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
))
)
}
};
row_count += cnt;
}
Ok(Err(e)) => {
// Return the writer along with the error
return Err((Some(writer), e));
return SerializedRecordBatchResult::failure(Some(writer), e);
}
Err(e) => {
// Handle task panic or cancellation
return Err((
return SerializedRecordBatchResult::failure(
Some(writer),
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
));
);
}
}
}

match serialize_task.join().await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err((Some(writer), e)),
Ok(Err(e)) => return SerializedRecordBatchResult::failure(Some(writer), e),
Err(_) => {
return Err((
return SerializedRecordBatchResult::failure(
Some(writer),
internal_datafusion_err!("Unknown error writing to object store"),
))
)
}
}
Ok((writer, row_count as u64))
SerializedRecordBatchResult::success(writer, row_count)
}

type FileWriteBundle = (Receiver<RecordBatch>, SerializerType, WriterType);
Expand Down Expand Up @@ -153,14 +184,17 @@ pub(crate) async fn stateless_serialize_and_write_files(
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => match res {
Ok((writer, cnt)) => {
SerializedRecordBatchResult::Success {
writer,
row_count: cnt,
} => {
finished_writers.push(writer);
row_count += cnt;
}
Err((writer, e)) => {
SerializedRecordBatchResult::Failure { writer, err } => {
finished_writers.extend(writer);
any_errors = true;
triggering_error = Some(e);
triggering_error = Some(err);
}
},
Err(e) => {
Expand Down Expand Up @@ -193,7 +227,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
}
}

tx.send(row_count).map_err(|_| {
tx.send(row_count as u64).map_err(|_| {
internal_datafusion_err!(
"Error encountered while sending row count back to file sink!"
)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,8 @@ mod tests {
use crate::datasource::file_format::avro::AvroFormat;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::json::JsonFormat;
use crate::datasource::file_format::parquet::ParquetFormat;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{provider_as_source, MemTable};
use crate::execution::options::ArrowReadOptions;
use crate::physical_plan::collect;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ mod tests {
use crate::datasource::schema_adapter::{
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
#[cfg(feature = "parquet")]
use parquet::arrow::ArrowWriter;
use tempfile::TempDir;

Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
pub mod context;
pub mod session_state;
mod session_state_defaults;

pub use session_state_defaults::SessionStateDefaults;

// backwards compatibility
pub use crate::datasource::file_format::options;
Expand Down
Loading

0 comments on commit ce5b88b

Please sign in to comment.