2020use super :: metrics:: {
2121 BaselineMetrics , ExecutionPlanMetricsSet , MetricsSet , RecordOutput ,
2222} ;
23- use super :: { RecordBatchStream , SendableRecordBatchStream , Statistics } ;
2423use crate :: error:: { DataFusionError , Result } ;
2524use crate :: execution:: disk_manager:: { DiskManager , PathFile } ;
2625use crate :: execution:: memory_management:: {
@@ -40,6 +39,7 @@ use crate::physical_plan::sort_preserving_merge::SortPreservingMergeStream;
4039use crate :: physical_plan:: sorts:: in_mem_sort:: InMemSortStream ;
4140use crate :: physical_plan:: sorts:: sort:: sort_batch;
4241use crate :: physical_plan:: sorts:: sort_preserving_merge:: SortPreservingMergeStream ;
42+ use crate :: physical_plan:: stream:: RecordBatchReceiverStream ;
4343use crate :: physical_plan:: {
4444 common, DisplayFormatType , Distribution , ExecutionPlan , Partitioning ,
4545 RecordBatchStream , SendableRecordBatchStream , Statistics ,
@@ -48,22 +48,28 @@ use arrow::compute::aggregate::estimated_bytes_size;
4848use arrow:: compute:: { sort:: lexsort_to_indices, take} ;
4949use arrow:: datatypes:: SchemaRef ;
5050use arrow:: error:: Result as ArrowResult ;
51+ use arrow:: io:: ipc:: read:: { read_file_metadata, FileReader } ;
5152use arrow:: record_batch:: RecordBatch ;
5253use arrow:: { array:: ArrayRef , error:: ArrowError } ;
5354use async_trait:: async_trait;
54- use futures:: channel:: mpsc;
5555use futures:: { Future , SinkExt , Stream , StreamExt } ;
56- use log:: { debug, info} ;
57- use parking_lot:: Mutex ;
56+ use log:: { debug, error, info} ;
5857use pin_project_lite:: pin_project;
5958use std:: any:: Any ;
59+ use std:: fs:: File ;
6060use std:: pin:: Pin ;
6161use std:: sync:: atomic:: { AtomicIsize , AtomicUsize , Ordering } ;
62- use std:: sync:: Arc ;
62+ use std:: sync:: { Arc , Mutex } ;
6363use std:: task:: { Context , Poll } ;
64- use tokio:: sync:: mpsc:: { Receiver , Sender } ;
64+ use tokio:: sync:: mpsc:: { Receiver as TKReceiver , Sender as TKSender } ;
6565use tokio:: task;
6666
67+ enum ExternalSortingState {
68+ Insert ,
69+ OutputWithMem ,
70+ OutputAllDisk ,
71+ }
72+
6773struct ExternalSorter {
6874 id : MemoryConsumerId ,
6975 schema : SchemaRef ,
@@ -74,6 +80,7 @@ struct ExternalSorter {
7480 spilled_count : AtomicUsize ,
7581 /// Sort expressions
7682 expr : Vec < PhysicalSortExpr > ,
83+ exec_state : ExternalSortingState ,
7784 runtime : RuntimeEnv ,
7885}
7986
@@ -93,46 +100,26 @@ impl ExternalSorter {
93100 spilled_bytes : AtomicUsize :: new ( 0 ) ,
94101 spilled_count : AtomicUsize :: new ( 0 ) ,
95102 expr,
103+ exec_state : ExternalSortingState :: Insert ,
96104 runtime,
97105 }
98106 }
99107
100- fn insert_batch (
101- & mut self ,
102- input : RecordBatch ,
103- schema : SchemaRef ,
104- expr : & [ PhysicalSortExpr ] ,
105- ) -> Result < ( ) > {
106- let size = batch_memory_size ( & input) ;
107- self . allocate ( size) ?;
108- // sort each batch as it's inserted, more probably to be cache-resident
109- let sorted_batch = sort_batch ( input, schema, expr) ?;
110- let mut in_mem_batches = self . in_mem_batches . lock ( ) ;
111- in_mem_batches. push ( sorted_batch) ;
112- }
113-
114- fn sort ( & self ) { }
115- }
116-
117- impl MemoryConsumer for ExternalSorter {
118- fn name ( & self ) -> String {
119- "ExternalSorter" . to_owned ( )
120- }
121-
122- fn id ( & self ) -> & MemoryConsumerId {
123- & self . id
108+ fn output_with_mem ( & mut self ) {
109+ assert_eq ! ( self . exec_state, ExternalSortingState :: Insert ) ;
110+ self . exec_state = ExternalSortingState :: OutputWithMem
124111 }
125112
126- fn memory_manager ( & self ) -> Arc < MemoryManager > {
127- self . runtime . memory_manager . clone ( )
113+ fn spill_during_output ( & mut self ) {
114+ assert_eq ! ( self . exec_state, ExternalSortingState :: OutputWithMem ) ;
115+ self . exec_state = ExternalSortingState :: OutputAllDisk
128116 }
129117
130- async fn spill ( & self , _size : usize , _trigger : & dyn MemoryConsumer ) -> Result < usize > {
131- let in_mem_batches = self . in_mem_batches . lock ( ) ;
132-
118+ async fn spill_while_inserting ( & self ) -> usize {
119+ let mut in_mem_batches = self . in_mem_batches . lock ( ) . unwrap ( ) ;
133120 // we could always get a chance to free some memory as long as we are holding some
134121 if in_mem_batches. len ( ) == 0 {
135- return Ok ( 0 ) ;
122+ return 0 ;
136123 }
137124
138125 info ! (
@@ -156,13 +143,50 @@ impl MemoryConsumer for ExternalSorter {
156143 . await ;
157144
158145 spill ( stream, path, self . schema . clone ( ) ) ?;
146+ * in_mem_batches = vec ! [ ] ;
159147
160148 {
161- let mut spills = self . spills . lock ( ) ;
149+ let mut spills = self . spills . lock ( ) . unwrap ( ) ;
162150 self . spilled_count . fetch_add ( 1 , Ordering :: SeqCst ) ;
163151 self . spilled_bytes . fetch_add ( total_size, Ordering :: SeqCst ) ;
164152 spills. push ( path) ;
165153 }
154+ total_size
155+ }
156+
157+ fn insert_batch (
158+ & mut self ,
159+ input : RecordBatch ,
160+ schema : SchemaRef ,
161+ expr : & [ PhysicalSortExpr ] ,
162+ ) -> Result < ( ) > {
163+ let size = batch_memory_size ( & input) ;
164+ self . allocate ( size) ?;
165+ // sort each batch as it's inserted, more probably to be cache-resident
166+ let sorted_batch = sort_batch ( input, schema, expr) ?;
167+ let mut in_mem_batches = self . in_mem_batches . lock ( ) . unwrap ( ) ;
168+ in_mem_batches. push ( sorted_batch) ;
169+ }
170+
171+ fn sort ( & self ) { }
172+ }
173+
174+ impl MemoryConsumer for ExternalSorter {
175+ fn name ( & self ) -> String {
176+ "ExternalSorter" . to_owned ( )
177+ }
178+
179+ fn id ( & self ) -> & MemoryConsumerId {
180+ & self . id
181+ }
182+
183+ fn memory_manager ( & self ) -> Arc < MemoryManager > {
184+ self . runtime . memory_manager . clone ( )
185+ }
186+
187+ async fn spill ( & self , _size : usize , _trigger : & dyn MemoryConsumer ) -> Result < usize > {
188+ let total_size = self . spill_while_inserting ( ) . await ;
189+
166190 Ok ( total_size)
167191 }
168192
@@ -211,17 +235,39 @@ async fn spill(
211235 path : String ,
212236 schema : SchemaRef ,
213237) -> Result < ( ) > {
214- let ( mut sender, receiver) : ( Sender < RecordBatch > , Receiver < RecordBatch > ) =
238+ let ( mut sender, receiver) : ( TKSender < RecordBatch > , TKReceiver < RecordBatch > ) =
215239 tokio:: sync:: mpsc:: channel ( 2 ) ;
216240 while let Some ( item) = sorted_stream. next ( ) . await {
217241 sender. send ( item) . await . ok ( ) ;
218242 }
219- task:: spawn_blocking ( move || write_sorted ( receiver, path, schema) ) ;
243+ let path_clone = path. clone ( ) ;
244+ task:: spawn_blocking ( move || {
245+ if let Err ( e) = write_sorted ( receiver, path_clone, schema) {
246+ error ! ( "Failure while spilling to path {}. Error: {}" , path, e) ;
247+ }
248+ } ) ;
220249 Ok ( ( ) )
221250}
222251
252+ async fn read_spill_as_stream (
253+ path : String ,
254+ schema : SchemaRef ,
255+ ) -> Result < SendableRecordBatchStream > {
256+ let ( mut sender, receiver) : (
257+ TKSender < ArrowResult < RecordBatch > > ,
258+ TKReceiver < ArrowResult < RecordBatch > > ,
259+ ) = tokio:: sync:: mpsc:: channel ( 2 ) ;
260+ let path_clone = path. clone ( ) ;
261+ task:: spawn_blocking ( move || {
262+ if let Err ( e) = read_spill ( sender, path_clone) {
263+ error ! ( "Failure while reading spill file: {}. Error: {}" , path, e) ;
264+ }
265+ } ) ;
266+ Ok ( RecordBatchReceiverStream :: create ( & schema, receiver) )
267+ }
268+
223269fn write_sorted (
224- mut receiver : Receiver < RecordBatch > ,
270+ mut receiver : TKReceiver < RecordBatch > ,
225271 path : String ,
226272 schema : SchemaRef ,
227273) -> Result < ( ) > {
@@ -237,25 +283,17 @@ fn write_sorted(
237283 Ok ( ( ) )
238284}
239285
240- struct SpillableSortedStream {
241- id : MemoryConsumerId ,
242- schema : SchemaRef ,
243- in_mem_batches : Mutex < Vec < RecordBatch > > ,
244- /// Sort expressions
245- expr : Vec < PhysicalSortExpr > ,
246- runtime : RuntimeEnv ,
247- }
248-
249- impl SpillableSortedStream {
250- fn new ( ) -> Self {
251- Self { }
286+ fn read_spill (
287+ mut sender : TKSender < ArrowResult < RecordBatch > > ,
288+ path : String ,
289+ ) -> Result < ( ) > {
290+ let mut file = File :: open ( & path) . map_err ( |e| e. into ( ) ) ?;
291+ let file_meta = read_file_metadata ( & mut file) . map_err ( |e| from_arrow_err ( & e) ) ?;
292+ let reader = FileReader :: new ( & mut file, file_meta, None ) ;
293+ for batch in reader {
294+ sender. blocking_send ( batch) ?;
252295 }
253-
254- fn memory_used ( & self ) -> usize { }
255-
256- fn get_sorted_stream ( & self ) { }
257-
258- fn spill_remaining ( & self ) { }
296+ Ok ( ( ) )
259297}
260298
261299/// Sort execution plan
@@ -402,14 +440,9 @@ impl ExecutionPlan for ExternalSortExec {
402440 }
403441}
404442
405- pin_project ! {
406- /// stream for sort plan
407- struct ExternalSortStream {
408- #[ pin]
409- output: futures:: channel:: oneshot:: Receiver <ArrowResult <Option <RecordBatch >>>,
410- finished: bool ,
411- schema: SchemaRef ,
412- }
443+ /// stream for sort plan
444+ struct ExternalSortStream {
445+ schema : SchemaRef ,
413446}
414447
415448impl ExternalSortStream {
@@ -492,6 +525,7 @@ mod tests {
492525 use crate :: physical_plan:: coalesce_partitions:: CoalescePartitionsExec ;
493526 use crate :: physical_plan:: expressions:: col;
494527 use crate :: physical_plan:: memory:: MemoryExec ;
528+ use crate :: physical_plan:: sorts:: SortOptions ;
495529 use crate :: physical_plan:: {
496530 collect,
497531 csv:: { CsvExec , CsvReadOptions } ,
@@ -661,3 +695,5 @@ mod tests {
661695 Ok ( ( ) )
662696 }
663697}
698+
699+ impl ExternalSorter { }
0 commit comments