diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index fc45acbc43d9..c7ae09bb2e34 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -76,27 +76,147 @@ impl ExternalSorterMetrics { } } -/// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). +/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to +/// a total order. Depending on the input size and memory manager +/// configuration, writes intermediate results to disk ("spills") +/// using Arrow IPC format. +/// +/// # Algorithm /// -/// The basic architecture of the algorithm: /// 1. get a non-empty new batch from input -/// 2. check with the memory manager if we could buffer the batch in memory -/// 2.1 if memory sufficient, then buffer batch in memory, go to 1. -/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file. -/// buffer the batch in memory, go to 1. -/// 3. when input is exhausted, merge all in memory batches and spills to get a total order. +/// +/// 2. check with the memory manager there is sufficient space to +/// buffer the batch in memory 2.1 if memory sufficient, buffer +/// batch in memory, go to 1. +/// +/// 2.2 if no more memory is available, sort all buffered batches and +/// spill to file. buffer the next batch in memory, go to 1. +/// +/// 3. when input is exhausted, merge all in memory batches and spills +/// to get a total order. +/// +/// # When data fits in available memory +/// +/// If there is sufficient memory, data is sorted in memory to produce the output +/// +/// ```text +/// ┌─────┐ +/// │ 2 │ +/// │ 3 │ +/// │ 1 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ +/// │ 4 │ +/// │ 2 │ │ +/// └─────┘ ▼ +/// ┌─────┐ +/// │ 1 │ In memory +/// │ 4 │─ ─ ─ ─ ─ ─▶ sort/merge ─ ─ ─ ─ ─▶ total sorted output +/// │ 1 │ +/// └─────┘ ▲ +/// ... │ +/// +/// ┌─────┐ │ +/// │ 4 │ +/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ +/// └─────┘ +/// +/// in_mem_batches +/// +/// ``` +/// +/// # When data does not fit in available memory +/// +/// When memory is exhausted, data is first sorted and written to one +/// or more spill files on disk: +/// +/// ```text +/// ┌─────┐ .─────────────────. +/// │ 2 │ ( ) +/// │ 3 │ │`─────────────────'│ +/// │ 1 │─ ─ ─ ─ ─ ─ ─ │ ┌────┐ │ +/// │ 4 │ │ │ │ 1 │░ │ +/// │ 2 │ │ │... │░ │ +/// └─────┘ ▼ │ │ 4 │░ ┌ ─ ─ │ +/// ┌─────┐ │ └────┘░ 1 │░ │ +/// │ 1 │ In memory │ ░░░░░░ │ ░░ │ +/// │ 4 │─ ─ ▶ sort/merge ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │ +/// │ 1 │ and write to file │ │ ░░ │ +/// └─────┘ │ 4 │░ │ +/// ... ▲ │ └░─░─░░ │ +/// │ │ ░░░░░░ │ +/// ┌─────┐ │.─────────────────.│ +/// │ 4 │ │ ( ) +/// │ 3 │─ ─ ─ ─ ─ ─ ─ `─────────────────' +/// └─────┘ +/// +/// in_mem_batches spills +/// (file on disk in Arrow +/// IPC format) +/// ``` +/// +/// Once the input is completely read, the spill files are read and +/// merged with any in memory batches to produce a single total sorted +/// output: +/// +/// ```text +/// .─────────────────. +/// ( ) +/// │`─────────────────'│ +/// │ ┌────┐ │ +/// │ │ 1 │░ │ +/// │ │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ +/// │ │ 4 │░ ┌────┐ │ │ +/// │ └────┘░ │ 1 │░ │ ▼ +/// │ ░░░░░░ │ │░ │ +/// │ │... │─ ─│─ ─ ─ ▶ merge ─ ─ ─▶ total sorted output +/// │ │ │░ │ +/// │ │ 4 │░ │ ▲ +/// │ └────┘░ │ │ +/// │ ░░░░░░ │ +/// │.─────────────────.│ │ +/// ( ) +/// `─────────────────' │ +/// spills +/// │ +/// +/// │ +/// +/// ┌─────┐ │ +/// │ 1 │ +/// │ 4 │─ ─ ─ ─ │ +/// └─────┘ │ +/// ... In memory +/// └ ─ ─ ─▶ sort/merge +/// ┌─────┐ +/// │ 4 │ ▲ +/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ +/// └─────┘ +/// +/// in_mem_batches +/// ``` struct ExternalSorter { + /// schema of the output (and the input) schema: SchemaRef, + /// Potentially unsorted in memory buffer in_mem_batches: Vec, + /// if `Self::in_mem_batches` are sorted in_mem_batches_sorted: bool, + /// If data has previously been spilled, the locations of the + /// spill files (in Arrow IPC format) spills: Vec, /// Sort expressions expr: Arc<[PhysicalSortExpr]>, + /// Runtime metrics metrics: ExternalSorterMetrics, + /// If Some, the maximum number of output rows that will be + /// produced. fetch: Option, + /// Memory usage tracking reservation: MemoryReservation, + /// The partition id that this Sort is handling (for identification) partition_id: usize, + /// A handle to the runtime to get Disk spill files runtime: Arc, + /// The target number of rows for output batches batch_size: usize, } @@ -142,7 +262,7 @@ impl ExternalSorter { if self.reservation.try_grow(size).is_err() { let before = self.reservation.size(); self.in_mem_sort().await?; - // Sorting may have freed memory, especially if fetch is not `None` + // Sorting may have freed memory, especially if fetch is `Some` // // As such we check again, and if the memory usage has dropped by // a factor of 2, and we can allocate the necessary capacity, @@ -168,7 +288,15 @@ impl ExternalSorter { !self.spills.is_empty() } - /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. + /// Returns the final sorted output of all batches inserted via + /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es. + /// + /// This process could either be: + /// + /// 1. An in-memory sort/merge (if the input fit in memory) + /// + /// 2. A combined streaming merge incorporating both in-memory + /// batches and data from spill files on disk. fn sort(&mut self) -> Result { if self.spilled_before() { let mut streams = vec![]; @@ -201,18 +329,25 @@ impl ExternalSorter { } } + /// How much memory is buffered in this `ExternalSorter`? fn used(&self) -> usize { self.reservation.size() } + /// How many bytes have been spilled to disk? fn spilled_bytes(&self) -> usize { self.metrics.spilled_bytes.value() } + /// How many spill files have been created? fn spill_count(&self) -> usize { self.metrics.spill_count.value() } + /// Writes any `in_memory_batches` to a spill file and clears + /// the batches. The contents of the spil file are sorted. + /// + /// Returns the amount of memory freed. async fn spill(&mut self) -> Result { // we could always get a chance to free some memory as long as we are holding some if self.in_mem_batches.is_empty() { @@ -255,7 +390,64 @@ impl ExternalSorter { Ok(()) } - /// Consumes in_mem_batches returning a sorted stream + /// Consumes in_mem_batches returning a sorted stream of + /// batches. This proceeds in one of two ways: + /// + /// # Small Datasets + /// + /// For "smaller" datasets, the data is first concatenated into a + /// single batch and then sorted. This is often faster than + /// sorting and then merging. + /// + /// ```text + /// ┌─────┐ + /// │ 2 │ + /// │ 3 │ + /// │ 1 │─ ─ ─ ─ ┐ ┌─────┐ + /// │ 4 │ │ 2 │ + /// │ 2 │ │ │ 3 │ + /// └─────┘ │ 1 │ sorted output + /// ┌─────┐ ▼ │ 4 │ stream + /// │ 1 │ │ 2 │ + /// │ 4 │─ ─▶ concat ─ ─ ─ ─ ▶│ 1 │─ ─ ▶ sort ─ ─ ─ ─ ─▶ + /// │ 1 │ │ 4 │ + /// └─────┘ ▲ │ 1 │ + /// ... │ │ ... │ + /// │ 4 │ + /// ┌─────┐ │ │ 3 │ + /// │ 4 │ └─────┘ + /// │ 3 │─ ─ ─ ─ ┘ + /// └─────┘ + /// in_mem_batches + /// ``` + /// + /// # Larger datasets + /// + /// For larger datasets, the batches are first sorted individually + /// and then merged together. + /// + /// ```text + /// ┌─────┐ ┌─────┐ + /// │ 2 │ │ 1 │ + /// │ 3 │ │ 2 │ + /// │ 1 │─ ─▶ sort ─ ─▶│ 2 │─ ─ ─ ─ ─ ┐ + /// │ 4 │ │ 3 │ + /// │ 2 │ │ 4 │ │ + /// └─────┘ └─────┘ sorted output + /// ┌─────┐ ┌─────┐ ▼ stream + /// │ 1 │ │ 1 │ + /// │ 4 │─ ▶ sort ─ ─ ▶│ 1 ├ ─ ─ ▶ merge ─ ─ ─ ─▶ + /// │ 1 │ │ 4 │ + /// └─────┘ └─────┘ ▲ + /// ... ... ... │ + /// + /// ┌─────┐ ┌─────┐ │ + /// │ 4 │ │ 3 │ + /// │ 3 │─ ▶ sort ─ ─ ▶│ 4 │─ ─ ─ ─ ─ ┘ + /// └─────┘ └─────┘ + /// + /// in_mem_batches + /// ``` fn in_mem_sort_stream( &mut self, metrics: BaselineMetrics, @@ -296,6 +488,7 @@ impl ExternalSorter { ) } + /// Sorts a single `RecordBatch` into a single stream fn sort_batch_stream( &self, batch: RecordBatch, @@ -417,8 +610,8 @@ fn read_spill(sender: Sender>, path: &Path) -> Result<()> { /// Sort execution plan. /// -/// This operator supports sorting datasets that are larger than the -/// memory allotted by the memory manager, by spilling to disk. +/// Support sorting datasets that are larger than the memory allotted +/// by the memory manager, by spilling to disk. #[derive(Debug)] pub struct SortExec { /// Input schema