Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for memory usage in SortPreservingMerge (#5885) #7130

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ config_namespace! {
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = num_cpus::get()

/// How much memory is set aside, for each spillable sort, to
/// ensure an in-memory merge can occur. This setting has no
/// if the sort can not spill (there is no `DiskManager`
/// configured)
///
/// As part of spilling to disk, in memory data must be sorted
/// / merged before writing the file. This in-memory
/// sort/merge requires memory as well, so To avoid allocating
/// once memory is exhausted, DataFusion sets aside this
/// many bytes before.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
///
/// When a sort operation spills to disk, the in-memory data must be
/// sorted and merged before being written to a file. This setting reserves
/// a specific amount of memory for that in-memory sort/merge process.
///
/// Note: This setting is irrelevant if the sort operation cannot spill
/// (i.e., if there's no `DiskManager` configured).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a much better wording. Thank you @yjshen -- in f87705e

pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024

/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a behavior change. This constant was hard coded in sort.rs -- I have just pulled it out into its own config setting so I can write tests

}
}

Expand Down
11 changes: 9 additions & 2 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,14 +576,21 @@ impl ExecutionPlan for RepartitionExec {

// Get existing ordering:
let sort_exprs = self.input.output_ordering().unwrap_or(&[]);
// Merge streams (while preserving ordering) coming from input partitions to this partition:

// Merge streams (while preserving ordering) coming from
// input partitions to this partition:
let fetch = None;
let merge_reservation =
MemoryConsumer::new(format!("{}[Merge {partition}]", self.name()))
.register(context.memory_pool());
streaming_merge(
input_streams,
self.schema(),
sort_exprs,
BaselineMetrics::new(&self.metrics, partition),
context.session_config().batch_size(),
None,
fetch,
merge_reservation,
)
} else {
Ok(Box::pin(RepartitionStream {
Expand Down
22 changes: 18 additions & 4 deletions datafusion/core/src/physical_plan/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;

#[derive(Debug, Copy, Clone, Default)]
struct BatchCursor {
Expand All @@ -37,6 +38,9 @@ pub struct BatchBuilder {
/// Maintain a list of [`RecordBatch`] and their corresponding stream
batches: Vec<(usize, RecordBatch)>,

/// Accounts for memory used by buffered batches
reservation: MemoryReservation,

/// The current [`BatchCursor`] for each stream
cursors: Vec<BatchCursor>,

Expand All @@ -47,23 +51,31 @@ pub struct BatchBuilder {

impl BatchBuilder {
/// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
pub fn new(
schema: SchemaRef,
stream_count: usize,
batch_size: usize,
reservation: MemoryReservation,
) -> Self {
Self {
schema,
batches: Vec::with_capacity(stream_count * 2),
cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
reservation,
}
}

/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
self.reservation.try_grow(batch.get_array_memory_size())?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
batch_idx,
row_idx: 0,
}
};
Ok(())
}

/// Append the next row from `stream_idx`
Expand Down Expand Up @@ -119,14 +131,16 @@ impl BatchBuilder {
// We can therefore drop all but the last batch for each stream
let mut batch_idx = 0;
let mut retained = 0;
self.batches.retain(|(stream_idx, _)| {
self.batches.retain(|(stream_idx, batch)| {
let stream_cursor = &mut self.cursors[*stream_idx];
let retain = stream_cursor.batch_idx == batch_idx;
batch_idx += 1;

if retain {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.reservation.shrink(batch.get_array_memory_size());
}
retain
});
Expand Down
20 changes: 18 additions & 2 deletions datafusion/core/src/physical_plan/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use arrow::datatypes::ArrowNativeTypeOp;
use arrow::row::{Row, Rows};
use arrow_array::types::ByteArrayType;
use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
use datafusion_execution::memory_pool::MemoryReservation;
use std::cmp::Ordering;

/// A [`Cursor`] for [`Rows`]
Expand All @@ -29,6 +30,11 @@ pub struct RowCursor {
num_rows: usize,

rows: Rows,

/// Tracks for the memory used by in the `Rows` of this
/// cursor. Freed on drop
#[allow(dead_code)]
reservation: MemoryReservation,
}

impl std::fmt::Debug for RowCursor {
Expand All @@ -41,12 +47,22 @@ impl std::fmt::Debug for RowCursor {
}

impl RowCursor {
/// Create a new SortKeyCursor
pub fn new(rows: Rows) -> Self {
/// Create a new SortKeyCursor from `rows` and a `reservation`
/// that tracks its memory.
///
/// Panic's if the reservation is not for exactly `rows.size()`
/// bytes
pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
assert_eq!(
rows.size(),
reservation.size(),
"memory reservation mismatch"
);
Self {
cur_row: 0,
num_rows: rows.num_rows(),
rows,
reservation,
}
}

Expand Down
30 changes: 20 additions & 10 deletions datafusion/core/src/physical_plan/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::*;
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
use futures::Stream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
Expand All @@ -42,14 +43,15 @@ macro_rules! primitive_merge_helper {
}

macro_rules! merge_helper {
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident) => {{
($t:ty, $sort:ident, $streams:ident, $schema:ident, $tracking_metrics:ident, $batch_size:ident, $fetch:ident, $reservation:ident) => {{
let streams = FieldCursorStream::<$t>::new($sort, $streams);
return Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
$schema,
$tracking_metrics,
$batch_size,
$fetch,
$reservation,
)));
}};
}
Expand All @@ -63,28 +65,36 @@ pub fn streaming_merge(
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch)
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation)
_ => {}
}
}

let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?;
let streams = RowCursorStream::try_new(
schema.as_ref(),
expressions,
streams,
reservation.new_empty(),
)?;

Ok(Box::pin(SortPreservingMergeStream::new(
Box::new(streams),
schema,
metrics,
batch_size,
fetch,
reservation,
)))
}

Expand Down Expand Up @@ -162,11 +172,12 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
metrics: BaselineMetrics,
batch_size: usize,
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Self {
let stream_count = streams.partitions();

Self {
in_progress: BatchBuilder::new(schema, stream_count, batch_size),
in_progress: BatchBuilder::new(schema, stream_count, batch_size, reservation),
streams,
metrics,
aborted: false,
Expand Down Expand Up @@ -197,8 +208,7 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
Some(Err(e)) => Poll::Ready(Err(e)),
Some(Ok((cursor, batch))) => {
self.cursors[idx] = Some(cursor);
self.in_progress.push_batch(idx, batch);
Poll::Ready(Ok(()))
Poll::Ready(self.in_progress.push_batch(idx, batch))
}
}
}
Expand Down
Loading