diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3e8fdf1f3ed7e..2e6ce429fd05d 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -260,6 +260,9 @@ struct ExternalSorter { } impl ExternalSorter { + fn is_large_batch(&self, batch: &RecordBatch) -> bool { + batch.num_rows() > self.batch_size + } // TODO: make a builder or some other nicer API to avoid the // clippy warning #[expect(clippy::too_many_arguments)] @@ -308,9 +311,47 @@ impl ExternalSorter { }) } + /// Sorts a single oversized `RecordBatch` and spills it incrementally in + /// `batch_size`-sized chunks. + /// + /// This is used as a fallback under severe memory pressure when a single + /// input batch cannot be safely sorted in memory and there are no buffered + /// batches available to spill first. + async fn sort_and_spill_large_batch(&mut self, batch: RecordBatch) -> Result<()> { + debug!("Sorting and spilling large batch chunk-by-chunk"); + + // Lazily create spill file + if self.in_progress_spill_file.is_none() { + self.in_progress_spill_file = + Some((self.spill_manager.create_in_progress_file("Sorting")?, 0)); + } + + // Sort the batch into batch_size-sized chunks + let sorted_chunks = sort_batch_chunked(&batch, &self.expr, self.batch_size)?; + + // Drop the original large batch early to free memory + drop(batch); + + let (spill_file, max_batch_size) = self + .in_progress_spill_file + .as_mut() + .expect("spill file must exist"); + + // Append each sorted chunk to the spill file + for chunk in sorted_chunks { + let chunk_size = chunk.get_sliced_size()?; + spill_file.append_batch(&chunk)?; + *max_batch_size = (*max_batch_size).max(chunk_size); + // chunk dropped here + } + + Ok(()) + } + /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` /// /// Updates memory usage metrics, and possibly triggers spilling to disk + /// Buffers an input batch, spilling to disk if memory is insufficient. async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> { if input.num_rows() == 0 { return Ok(()); @@ -320,6 +361,7 @@ impl ExternalSorter { self.reserve_memory_for_batch_and_maybe_spill(&input) .await?; + // Safe to buffer after successful memory reservation or spill handling self.in_mem_batches.push(input); Ok(()) } @@ -795,20 +837,45 @@ impl ExternalSorter { &mut self, input: &RecordBatch, ) -> Result<()> { - let size = get_reserved_bytes_for_record_batch(input)?; + let full_sort_size = get_reserved_bytes_for_record_batch(input)?; - match self.reservation.try_grow(size) { + match self.reservation.try_grow(full_sort_size) { Ok(_) => Ok(()), + Err(e) => { - if self.in_mem_batches.is_empty() { - return Err(Self::err_with_oom_context(e)); + // CASE 1: we can spill existing batches + if !self.in_mem_batches.is_empty() { + self.sort_and_spill_in_mem_batches().await?; + self.reservation + .try_grow(full_sort_size) + .map_err(Self::err_with_oom_context)?; + return Ok(()); } - // Spill and try again. - self.sort_and_spill_in_mem_batches().await?; - self.reservation - .try_grow(size) - .map_err(Self::err_with_oom_context) + // CASE 2: single oversized batch under memory pressure + // + // If we cannot reserve enough memory and there are no buffered batches + // to spill first, fall back to chunked sorting and spilling. + // + // This avoids creating a single large sorted batch in memory while + // preserving correct ordering and output batch sizing. + if self.is_large_batch(input) { + debug!("Chunked spilling oversized batch"); + + // Reserve minimal memory for the input batch + let batch_mem = get_record_batch_memory_size(input); + self.reservation + .try_grow(batch_mem) + .map_err(Self::err_with_oom_context)?; + + // Spill immediately in sorted chunks + self.sort_and_spill_large_batch(input.clone()).await?; // Spill immediately using chunked sorting to avoid OOM on a single large batch + + return Ok(()); + } + + // CASE 3: true OOM + Err(Self::err_with_oom_context(e)) } } }