From 393b4324c41f733997de288dc0b4212e9f1aa2be Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 15 Oct 2025 14:42:39 +0800 Subject: [PATCH 1/3] feat: Reuse existing file instead of reopening during shuffle write --- native/core/src/execution/shuffle/shuffle_writer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index bf2d3017f5..96cef2a03b 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -780,8 +780,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { // if we wrote a spill file for this partition then copy the // contents into the shuffle file if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() { - let mut spill_file = - BufReader::new(File::open(spill_data.temp_file.path()).map_err(to_df_err)?); + let mut spill_file = BufReader::new(&spill_data.file); let mut write_timer = self.metrics.write_time.timer(); std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?; write_timer.stop(); From 97a398e6f83406f87b6b634860ca0af237fc2f45 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 15 Oct 2025 14:52:29 +0800 Subject: [PATCH 2/3] allow deadcode --- native/core/src/execution/shuffle/shuffle_writer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 96cef2a03b..0de2b220c1 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -1112,6 +1112,7 @@ struct PartitionWriter { } struct SpillFile { + #[allow(dead_code)] temp_file: RefCountedTempFile, file: File, } From 3ec20471b76f983df341c87a1329fa55530bfec8 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 16 Oct 2025 10:50:39 +0800 Subject: [PATCH 3/3] fix --- native/core/src/execution/shuffle/shuffle_writer.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 0de2b220c1..8e8b4a9fee 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -779,8 +779,10 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { // if we wrote a spill file for this partition then copy the // contents into the shuffle file - if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() { - let mut spill_file = BufReader::new(&spill_data.file); + if let Some(spill_data) = self.partition_writers[i].spill_file.as_mut() { + let file = &mut spill_data.file; + file.seek(SeekFrom::Start(0))?; + let mut spill_file = BufReader::new(file); let mut write_timer = self.metrics.write_time.timer(); std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?; write_timer.stop(); @@ -1168,6 +1170,7 @@ impl PartitionWriter { let spill_data = OpenOptions::new() .write(true) .create(true) + .read(true) .truncate(true) .open(spill_file.path()) .map_err(|e| {