@@ -29,13 +29,11 @@ use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
2929use crate :: expressions:: PhysicalSortExpr ;
3030use crate :: limit:: LimitStream ;
3131use crate :: metrics:: {
32- BaselineMetrics , Count , ExecutionPlanMetricsSet , MetricBuilder , MetricsSet ,
32+ BaselineMetrics , ExecutionPlanMetricsSet , MetricsSet , SpillMetrics ,
3333} ;
3434use crate :: projection:: { make_with_child, update_expr, ProjectionExec } ;
3535use crate :: sorts:: streaming_merge:: StreamingMergeBuilder ;
36- use crate :: spill:: {
37- get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
38- } ;
36+ use crate :: spill:: { get_record_batch_memory_size, InProgressSpillFile , SpillManager } ;
3937use crate :: stream:: RecordBatchStreamAdapter ;
4038use crate :: topk:: TopK ;
4139use crate :: {
@@ -50,7 +48,7 @@ use arrow::array::{
5048use arrow:: compute:: { concat_batches, lexsort_to_indices, take_arrays, SortColumn } ;
5149use arrow:: datatypes:: { DataType , SchemaRef } ;
5250use arrow:: row:: { RowConverter , SortField } ;
53- use datafusion_common:: { internal_err, Result } ;
51+ use datafusion_common:: { internal_datafusion_err , internal_err, Result } ;
5452use datafusion_execution:: disk_manager:: RefCountedTempFile ;
5553use datafusion_execution:: memory_pool:: { MemoryConsumer , MemoryReservation } ;
5654use datafusion_execution:: runtime_env:: RuntimeEnv ;
@@ -65,23 +63,14 @@ struct ExternalSorterMetrics {
6563 /// metrics
6664 baseline : BaselineMetrics ,
6765
68- /// count of spills during the execution of the operator
69- spill_count : Count ,
70-
71- /// total spilled bytes during the execution of the operator
72- spilled_bytes : Count ,
73-
74- /// total spilled rows during the execution of the operator
75- spilled_rows : Count ,
66+ spill_metrics : SpillMetrics ,
7667}
7768
7869impl ExternalSorterMetrics {
7970 fn new ( metrics : & ExecutionPlanMetricsSet , partition : usize ) -> Self {
8071 Self {
8172 baseline : BaselineMetrics :: new ( metrics, partition) ,
82- spill_count : MetricBuilder :: new ( metrics) . spill_count ( partition) ,
83- spilled_bytes : MetricBuilder :: new ( metrics) . spilled_bytes ( partition) ,
84- spilled_rows : MetricBuilder :: new ( metrics) . spilled_rows ( partition) ,
73+ spill_metrics : SpillMetrics :: new ( metrics, partition) ,
8574 }
8675 }
8776}
@@ -230,9 +219,14 @@ struct ExternalSorter {
230219 /// if `Self::in_mem_batches` are sorted
231220 in_mem_batches_sorted : bool ,
232221
233- /// If data has previously been spilled, the locations of the
234- /// spill files (in Arrow IPC format)
235- spills : Vec < RefCountedTempFile > ,
222+ /// During external sorting, in-memory intermediate data will be appended to
223+ /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`].
224+ in_progress_spill_file : Option < InProgressSpillFile > ,
225+ /// If data has previously been spilled, the locations of the spill files (in
226+ /// Arrow IPC format)
227+ /// Within the same spill file, the data might be chunked into multiple batches,
228+ /// and ordered by sort keys.
229+ finished_spill_files : Vec < RefCountedTempFile > ,
236230
237231 // ========================================================================
238232 // EXECUTION RESOURCES:
@@ -244,6 +238,7 @@ struct ExternalSorter {
244238 runtime : Arc < RuntimeEnv > ,
245239 /// Reservation for in_mem_batches
246240 reservation : MemoryReservation ,
241+ spill_manager : SpillManager ,
247242
248243 /// Reservation for the merging of in-memory batches. If the sort
249244 /// might spill, `sort_spill_reservation_bytes` will be
@@ -278,15 +273,23 @@ impl ExternalSorter {
278273 MemoryConsumer :: new ( format ! ( "ExternalSorterMerge[{partition_id}]" ) )
279274 . register ( & runtime. memory_pool ) ;
280275
276+ let spill_manager = SpillManager :: new (
277+ Arc :: clone ( & runtime) ,
278+ metrics. spill_metrics . clone ( ) ,
279+ Arc :: clone ( & schema) ,
280+ ) ;
281+
281282 Self {
282283 schema,
283284 in_mem_batches : vec ! [ ] ,
284285 in_mem_batches_sorted : false ,
285- spills : vec ! [ ] ,
286+ in_progress_spill_file : None ,
287+ finished_spill_files : vec ! [ ] ,
286288 expr : expr. into ( ) ,
287289 metrics,
288290 fetch,
289291 reservation,
292+ spill_manager,
290293 merge_reservation,
291294 runtime,
292295 batch_size,
@@ -320,7 +323,7 @@ impl ExternalSorter {
320323 }
321324
322325 fn spilled_before ( & self ) -> bool {
323- !self . spills . is_empty ( )
326+ !self . finished_spill_files . is_empty ( )
324327 }
325328
326329 /// Returns the final sorted output of all batches inserted via
@@ -348,11 +351,11 @@ impl ExternalSorter {
348351 self . sort_or_spill_in_mem_batches ( true ) . await ?;
349352 }
350353
351- for spill in self . spills . drain ( ..) {
354+ for spill in self . finished_spill_files . drain ( ..) {
352355 if !spill. path ( ) . exists ( ) {
353356 return internal_err ! ( "Spill file {:?} does not exist" , spill. path( ) ) ;
354357 }
355- let stream = read_spill_as_stream ( spill, Arc :: clone ( & self . schema ) , 2 ) ?;
358+ let stream = self . spill_manager . read_spill_as_stream ( spill) ?;
356359 streams. push ( stream) ;
357360 }
358361
@@ -379,46 +382,69 @@ impl ExternalSorter {
379382
380383 /// How many bytes have been spilled to disk?
381384 fn spilled_bytes ( & self ) -> usize {
382- self . metrics . spilled_bytes . value ( )
385+ self . metrics . spill_metrics . spilled_bytes . value ( )
383386 }
384387
385388 /// How many rows have been spilled to disk?
386389 fn spilled_rows ( & self ) -> usize {
387- self . metrics . spilled_rows . value ( )
390+ self . metrics . spill_metrics . spilled_rows . value ( )
388391 }
389392
390393 /// How many spill files have been created?
391394 fn spill_count ( & self ) -> usize {
392- self . metrics . spill_count . value ( )
395+ self . metrics . spill_metrics . spill_file_count . value ( )
393396 }
394397
395- /// Writes any `in_memory_batches` to a spill file and clears
396- /// the batches. The contents of the spill file are sorted .
398+ /// When calling, all `in_mem_batches` must be sorted (*), and then all of them will
399+ /// be appended to the in-progress spill file.
397400 ///
398- /// Returns the amount of memory freed.
399- async fn spill ( & mut self ) -> Result < usize > {
401+ /// (*) 'Sorted' here means globally sorted for all buffered batches when the
402+ /// memory limit is reached, instead of partially sorted within the batch.
403+ async fn spill_append ( & mut self ) -> Result < ( ) > {
404+ assert ! ( self . in_mem_batches_sorted) ;
405+
400406 // we could always get a chance to free some memory as long as we are holding some
401407 if self . in_mem_batches . is_empty ( ) {
402- return Ok ( 0 ) ;
408+ return Ok ( ( ) ) ;
409+ }
410+
411+ // Lazily initialize the in-progress spill file
412+ if self . in_progress_spill_file . is_none ( ) {
413+ self . in_progress_spill_file =
414+ Some ( self . spill_manager . create_in_progress_file ( "Sorting" ) ?) ;
403415 }
404416
405417 self . organize_stringview_arrays ( ) ?;
406418
407419 debug ! ( "Spilling sort data of ExternalSorter to disk whilst inserting" ) ;
408420
409- let spill_file = self . runtime . disk_manager . create_tmp_file ( "Sorting" ) ?;
410421 let batches = std:: mem:: take ( & mut self . in_mem_batches ) ;
411- let ( spilled_rows, spilled_bytes) = spill_record_batches (
412- & batches,
413- spill_file. path ( ) . into ( ) ,
414- Arc :: clone ( & self . schema ) ,
415- ) ?;
416- let used = self . reservation . free ( ) ;
417- self . metrics . spill_count . add ( 1 ) ;
418- self . metrics . spilled_bytes . add ( spilled_bytes) ;
419- self . metrics . spilled_rows . add ( spilled_rows) ;
420- self . spills . push ( spill_file) ;
421- Ok ( used)
422+ self . reservation . free ( ) ;
423+
424+ let in_progress_file = self . in_progress_spill_file . as_mut ( ) . ok_or_else ( || {
425+ internal_datafusion_err ! ( "In-progress spill file should be initialized" )
426+ } ) ?;
427+
428+ for batch in batches {
429+ in_progress_file. append_batch ( & batch) ?;
430+ }
431+
432+ Ok ( ( ) )
433+ }
434+
435+ /// Finishes the in-progress spill file and moves it to the finished spill files.
436+ async fn spill_finish ( & mut self ) -> Result < ( ) > {
437+ let mut in_progress_file =
438+ self . in_progress_spill_file . take ( ) . ok_or_else ( || {
439+ internal_datafusion_err ! ( "Should be called after `spill_append`" )
440+ } ) ?;
441+ let spill_file = in_progress_file. finish ( ) ?;
442+
443+ if let Some ( spill_file) = spill_file {
444+ self . finished_spill_files . push ( spill_file) ;
445+ }
446+
447+ Ok ( ( ) )
422448 }
423449
424450 /// Reconstruct `self.in_mem_batches` to organize the payload buffers of each
@@ -515,6 +541,7 @@ impl ExternalSorter {
515541 // `self.in_mem_batches` is already taken away by the sort_stream, now it is empty.
516542 // We'll gradually collect the sorted stream into self.in_mem_batches, or directly
517543 // write sorted batches to disk when the memory is insufficient.
544+ let mut spilled = false ;
518545 while let Some ( batch) = sorted_stream. next ( ) . await {
519546 let batch = batch?;
520547 let sorted_size = get_reserved_byte_for_record_batch ( & batch) ;
@@ -523,7 +550,8 @@ impl ExternalSorter {
523550 // already in memory, so it's okay to combine it with previously
524551 // sorted batches, and spill together.
525552 self . in_mem_batches . push ( batch) ;
526- self . spill ( ) . await ?; // reservation is freed in spill()
553+ self . spill_append ( ) . await ?; // reservation is freed in spill()
554+ spilled = true ;
527555 } else {
528556 self . in_mem_batches . push ( batch) ;
529557 self . in_mem_batches_sorted = true ;
@@ -540,7 +568,12 @@ impl ExternalSorter {
540568 if ( self . reservation . size ( ) > before / 2 ) || force_spill {
541569 // We have not freed more than 50% of the memory, so we have to spill to
542570 // free up more memory
543- self . spill ( ) . await ?;
571+ self . spill_append ( ) . await ?;
572+ spilled = true ;
573+ }
574+
575+ if spilled {
576+ self . spill_finish ( ) . await ?;
544577 }
545578
546579 // Reserve headroom for next sort/merge
@@ -1489,7 +1522,14 @@ mod tests {
14891522 // batches.
14901523 // The number of spills is roughly calculated as:
14911524 // `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
1492- assert ! ( ( 12 ..=18 ) . contains( & spill_count) ) ;
1525+
1526+ // If this assertion fail with large spill count, make sure the following
1527+ // case does not happen:
1528+ // During external sorting, one sorted run should be spilled to disk in a
1529+ // single file, due to memory limit we might need to append to the file
1530+ // multiple times to spill all the data. Make sure we're not writing each
1531+ // appending as a separate file.
1532+ assert ! ( ( 4 ..=8 ) . contains( & spill_count) ) ;
14931533 assert ! ( ( 15000 ..=20000 ) . contains( & spilled_rows) ) ;
14941534 assert ! ( ( 900000 ..=1000000 ) . contains( & spilled_bytes) ) ;
14951535
0 commit comments