From b4dcf1e656566f924519e38a7f3f169c1f5e8dc1 Mon Sep 17 00:00:00 2001 From: Georgi Krastev Date: Tue, 2 Jul 2024 11:08:17 +0300 Subject: [PATCH] Avoid calling shutdown after failed write of AsyncWrite (#249) in `serialize_rb_stream_to_object_store` --- .../datasource/file_format/write/orchestration.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs b/datafusion/core/src/datasource/file_format/write/orchestration.rs index 1ef571a2c61b..f340f2b7d853 100644 --- a/datafusion/core/src/datasource/file_format/write/orchestration.rs +++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs @@ -50,7 +50,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store( mut data_rx: Receiver, serializer: Arc, mut writer: WriterType, -) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> { +) -> std::result::Result<(WriterType, u64), (Option, DataFusionError)> { let (tx, mut rx) = mpsc::channel::>>(100); let serialize_task = SpawnedTask::spawn(async move { @@ -82,7 +82,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store( Ok(_) => (), Err(e) => { return Err(( - writer, + None, DataFusionError::Execution(format!( "Error writing to object store: {e}" )), @@ -93,12 +93,12 @@ pub(crate) async fn serialize_rb_stream_to_object_store( } Ok(Err(e)) => { // Return the writer along with the error - return Err((writer, e)); + return Err((Some(writer), e)); } Err(e) => { // Handle task panic or cancellation return Err(( - writer, + Some(writer), DataFusionError::Execution(format!( "Serialization task panicked or was cancelled: {e}" )), @@ -109,10 +109,10 @@ pub(crate) async fn serialize_rb_stream_to_object_store( match serialize_task.join().await { Ok(Ok(_)) => (), - Ok(Err(e)) => return Err((writer, e)), + Ok(Err(e)) => return Err((Some(writer), e)), Err(_) => { return Err(( - writer, + Some(writer), internal_datafusion_err!("Unknown error writing to object store"), )) } @@ -153,7 +153,7 @@ pub(crate) async fn stateless_serialize_and_write_files( row_count += cnt; } Err((writer, e)) => { - finished_writers.push(writer); + finished_writers.extend(writer); any_errors = true; triggering_error = Some(e); }