Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add documentation + diagrams for ExternalSorter #7179

Merged
merged 4 commits into from
Aug 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 205 additions & 12 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,147 @@ impl ExternalSorterMetrics {
}
}

/// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available).
/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
/// a total order. Depending on the input size and memory manager
/// configuration, writes intermediate results to disk ("spills")
/// using Arrow IPC format.
///
/// # Algorithm
///
/// The basic architecture of the algorithm:
/// 1. get a non-empty new batch from input
/// 2. check with the memory manager if we could buffer the batch in memory
/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file.
/// buffer the batch in memory, go to 1.
/// 3. when input is exhausted, merge all in memory batches and spills to get a total order.
///
/// 2. check with the memory manager there is sufficient space to
/// buffer the batch in memory 2.1 if memory sufficient, buffer
/// batch in memory, go to 1.
///
/// 2.2 if no more memory is available, sort all buffered batches and
/// spill to file. buffer the next batch in memory, go to 1.
///
/// 3. when input is exhausted, merge all in memory batches and spills
/// to get a total order.
///
/// # When data fits in available memory
///
/// If there is sufficient memory, data is sorted in memory to produce the output
///
/// ```text
/// ┌─────┐
/// │ 2 │
/// │ 3 │
/// │ 1 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
/// │ 4 │
/// │ 2 │ │
/// └─────┘ ▼
/// ┌─────┐
/// │ 1 │ In memory
/// │ 4 │─ ─ ─ ─ ─ ─▶ sort/merge ─ ─ ─ ─ ─▶ total sorted output
/// │ 1 │
/// └─────┘ ▲
/// ... │
///
/// ┌─────┐ │
/// │ 4 │
/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
/// └─────┘
///
/// in_mem_batches
///
/// ```
///
/// # When data does not fit in available memory
///
/// When memory is exhausted, data is first sorted and written to one
/// or more spill files on disk:
///
/// ```text
/// ┌─────┐ .─────────────────.
/// │ 2 │ ( )
/// │ 3 │ │`─────────────────'│
/// │ 1 │─ ─ ─ ─ ─ ─ ─ │ ┌────┐ │
/// │ 4 │ │ │ │ 1 │░ │
/// │ 2 │ │ │... │░ │
/// └─────┘ ▼ │ │ 4 │░ ┌ ─ ─ │
/// ┌─────┐ │ └────┘░ 1 │░ │
/// │ 1 │ In memory │ ░░░░░░ │ ░░ │
/// │ 4 │─ ─ ▶ sort/merge ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
/// │ 1 │ and write to file │ │ ░░ │
/// └─────┘ │ 4 │░ │
/// ... ▲ │ └░─░─░░ │
/// │ │ ░░░░░░ │
/// ┌─────┐ │.─────────────────.│
/// │ 4 │ │ ( )
/// │ 3 │─ ─ ─ ─ ─ ─ ─ `─────────────────'
/// └─────┘
///
/// in_mem_batches spills
/// (file on disk in Arrow
/// IPC format)
/// ```
///
/// Once the input is completely read, the spill files are read and
/// merged with any in memory batches to produce a single total sorted
/// output:
///
/// ```text
/// .─────────────────.
/// ( )
/// │`─────────────────'│
/// │ ┌────┐ │
/// │ │ 1 │░ │
/// │ │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
/// │ │ 4 │░ ┌────┐ │ │
/// │ └────┘░ │ 1 │░ │ ▼
/// │ ░░░░░░ │ │░ │
/// │ │... │─ ─│─ ─ ─ ▶ merge ─ ─ ─▶ total sorted output
/// │ │ │░ │
/// │ │ 4 │░ │ ▲
/// │ └────┘░ │ │
/// │ ░░░░░░ │
/// │.─────────────────.│ │
/// ( )
/// `─────────────────' │
/// spills
/// │
///
/// │
///
/// ┌─────┐ │
/// │ 1 │
/// │ 4 │─ ─ ─ ─ │
/// └─────┘ │
/// ... In memory
/// └ ─ ─ ─▶ sort/merge
/// ┌─────┐
/// │ 4 │ ▲
/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
/// └─────┘
///
/// in_mem_batches
/// ```
struct ExternalSorter {
/// schema of the output (and the input)
schema: SchemaRef,
/// Potentially unsorted in memory buffer
in_mem_batches: Vec<RecordBatch>,
/// if `Self::in_mem_batches` are sorted
in_mem_batches_sorted: bool,
/// If data has previously been spilled, the locations of the
/// spill files (in Arrow IPC format)
spills: Vec<NamedTempFile>,
/// Sort expressions
expr: Arc<[PhysicalSortExpr]>,
/// Runtime metrics
metrics: ExternalSorterMetrics,
/// If Some, the maximum number of output rows that will be
/// produced.
fetch: Option<usize>,
/// Memory usage tracking
reservation: MemoryReservation,
/// The partition id that this Sort is handling (for identification)
partition_id: usize,
/// A handle to the runtime to get Disk spill files
runtime: Arc<RuntimeEnv>,
/// The target number of rows for output batches
batch_size: usize,
}

Expand Down Expand Up @@ -142,7 +262,7 @@ impl ExternalSorter {
if self.reservation.try_grow(size).is_err() {
let before = self.reservation.size();
self.in_mem_sort().await?;
// Sorting may have freed memory, especially if fetch is not `None`
// Sorting may have freed memory, especially if fetch is `Some`
//
// As such we check again, and if the memory usage has dropped by
// a factor of 2, and we can allocate the necessary capacity,
Expand All @@ -168,7 +288,15 @@ impl ExternalSorter {
!self.spills.is_empty()
}

/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
/// Returns the final sorted output of all batches inserted via
/// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
///
/// This process could either be:
///
/// 1. An in-memory sort/merge (if the input fit in memory)
///
/// 2. A combined streaming merge incorporating both in-memory
/// batches and data from spill files on disk.
fn sort(&mut self) -> Result<SendableRecordBatchStream> {
if self.spilled_before() {
let mut streams = vec![];
Expand Down Expand Up @@ -201,18 +329,25 @@ impl ExternalSorter {
}
}

/// How much memory is buffered in this `ExternalSorter`?
fn used(&self) -> usize {
self.reservation.size()
}

/// How many bytes have been spilled to disk?
fn spilled_bytes(&self) -> usize {
self.metrics.spilled_bytes.value()
}

/// How many spill files have been created?
fn spill_count(&self) -> usize {
self.metrics.spill_count.value()
}

/// Writes any `in_memory_batches` to a spill file and clears
/// the batches. The contents of the spil file are sorted.
///
/// Returns the amount of memory freed.
async fn spill(&mut self) -> Result<usize> {
// we could always get a chance to free some memory as long as we are holding some
if self.in_mem_batches.is_empty() {
Expand Down Expand Up @@ -255,7 +390,64 @@ impl ExternalSorter {
Ok(())
}

/// Consumes in_mem_batches returning a sorted stream
/// Consumes in_mem_batches returning a sorted stream of
/// batches. This proceeds in one of two ways:
///
/// # Small Datasets
///
/// For "smaller" datasets, the data is first concatenated into a
/// single batch and then sorted. This is often faster than
/// sorting and then merging.
///
/// ```text
/// ┌─────┐
/// │ 2 │
/// │ 3 │
/// │ 1 │─ ─ ─ ─ ┐ ┌─────┐
/// │ 4 │ │ 2 │
/// │ 2 │ │ │ 3 │
/// └─────┘ │ 1 │ sorted output
/// ┌─────┐ ▼ │ 4 │ stream
/// │ 1 │ │ 2 │
/// │ 4 │─ ─▶ concat ─ ─ ─ ─ ▶│ 1 │─ ─ ▶ sort ─ ─ ─ ─ ─▶
/// │ 1 │ │ 4 │
/// └─────┘ ▲ │ 1 │
/// ... │ │ ... │
/// │ 4 │
/// ┌─────┐ │ │ 3 │
/// │ 4 │ └─────┘
/// │ 3 │─ ─ ─ ─ ┘
/// └─────┘
/// in_mem_batches
/// ```
///
/// # Larger datasets
///
/// For larger datasets, the batches are first sorted individually
/// and then merged together.
///
/// ```text
/// ┌─────┐ ┌─────┐
/// │ 2 │ │ 1 │
/// │ 3 │ │ 2 │
/// │ 1 │─ ─▶ sort ─ ─▶│ 2 │─ ─ ─ ─ ─ ┐
/// │ 4 │ │ 3 │
/// │ 2 │ │ 4 │ │
/// └─────┘ └─────┘ sorted output
/// ┌─────┐ ┌─────┐ ▼ stream
/// │ 1 │ │ 1 │
/// │ 4 │─ ▶ sort ─ ─ ▶│ 1 ├ ─ ─ ▶ merge ─ ─ ─ ─▶
/// │ 1 │ │ 4 │
/// └─────┘ └─────┘ ▲
/// ... ... ... │
///
/// ┌─────┐ ┌─────┐ │
/// │ 4 │ │ 3 │
/// │ 3 │─ ▶ sort ─ ─ ▶│ 4 │─ ─ ─ ─ ─ ┘
/// └─────┘ └─────┘
///
/// in_mem_batches
/// ```
fn in_mem_sort_stream(
&mut self,
metrics: BaselineMetrics,
Expand Down Expand Up @@ -296,6 +488,7 @@ impl ExternalSorter {
)
}

/// Sorts a single `RecordBatch` into a single stream
fn sort_batch_stream(
&self,
batch: RecordBatch,
Expand Down Expand Up @@ -417,8 +610,8 @@ fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {

/// Sort execution plan.
///
/// This operator supports sorting datasets that are larger than the
/// memory allotted by the memory manager, by spilling to disk.
/// Support sorting datasets that are larger than the memory allotted
/// by the memory manager, by spilling to disk.
#[derive(Debug)]
pub struct SortExec {
/// Input schema
Expand Down