-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Account for memory usage in SortPreservingMerge (#5885) #7130
Conversation
dc9c2b6
to
f5019c9
Compare
} | ||
|
||
#[tokio::test] | ||
async fn sort_spill_reservation() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test demonstrates why the sort_spill_reservation_bytes
is needed -- if it is insufficiently large, spilling may fail (because it runs out of memory when trying to write to the spill file). If someone hits this they can increase the value of the memory reserved for merge
use datafusion::physical_plan::SendableRecordBatchStream; | ||
use datafusion_common::assert_contains; | ||
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; | ||
use datafusion_common::{assert_contains, Result}; | ||
|
||
use datafusion::prelude::{SessionConfig, SessionContext}; | ||
use datafusion_execution::TaskContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests in this file can be cleaned up significantly, but I will do so as a follow on PR to keep the size of this one down
} | ||
|
||
impl TestCase { | ||
// TODO remove expected errors and memory limits and query from constructor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do this as a follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f17afb0
to
9dbee0b
Compare
/// When sorting, below what size should data be concatenated | ||
/// and sorted in a single RecordBatch rather than sorted in | ||
/// batches and merged. | ||
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a behavior change. This constant was hard coded in sort.rs -- I have just pulled it out into its own config setting so I can write tests
9dbee0b
to
ac0aea2
Compare
@@ -84,13 +85,16 @@ pub struct RowCursorStream { | |||
column_expressions: Vec<Arc<dyn PhysicalExpr>>, | |||
/// Input streams | |||
streams: FusedStreams, | |||
/// Tracks the memory used by `converter` | |||
reservation: MemoryReservation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We observed this to be a key consumer of memory for large dictionary encoded data
ac0aea2
to
3505dba
Compare
// enough memory to sort if we don't try to merge it all at once | ||
(partition_size * 5) / 2, | ||
) | ||
// use a single partiton so only a sort is needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test demonstrates the need for reserving memory up front for the spill -- and shows that if someone hits the error they can increased the memory set aside for the merge and it will work
I ran the sort benchmarks and they are basically the same. I think the 9% slower measure is due to a high variance in the benchmark (which I should look into if/when I have time). I saw similar variations when I compared
|
I have also tested this on our code upstream and it definitely helps account for some of the difference between tracked and actual memory. |
I'll review this carefully today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me! Thanks @alamb!
use datafusion::physical_optimizer::PhysicalOptimizerRule; | ||
use datafusion::physical_plan::common::batch_byte_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would probably remove this method and use RecordBatch::get_array_memory_size
in the repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good idea -- I will do so in a follow on PR
Update: #7245
/// A handle to the runtime to get Disk spill files | ||
/// Reservation for the merging of in-memory batches. If the sort | ||
/// might spill, `sort_spill_reservation_bytes` will be | ||
/// pre-reserved to ensure there is some space for this sort/merg. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// pre-reserved to ensure there is some space for this sort/merg. | |
/// pre-reserved to ensure there is some space for this sort/merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in f87705e
// Release the memory reserved for merge back to the pool so | ||
// there is some left when `in_memo_sort_stream` requests an | ||
// allocation. | ||
self.merge_reservation.free(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
datafusion/common/src/config.rs
Outdated
/// How much memory is set aside, for each spillable sort, to | ||
/// ensure an in-memory merge can occur. This setting has no | ||
/// if the sort can not spill (there is no `DiskManager` | ||
/// configured) | ||
/// | ||
/// As part of spilling to disk, in memory data must be sorted | ||
/// / merged before writing the file. This in-memory | ||
/// sort/merge requires memory as well, so To avoid allocating | ||
/// once memory is exhausted, DataFusion sets aside this | ||
/// many bytes before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe:
/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
///
/// When a sort operation spills to disk, the in-memory data must be
/// sorted and merged before being written to a file. This setting reserves
/// a specific amount of memory for that in-memory sort/merge process.
///
/// Note: This setting is irrelevant if the sort operation cannot spill
/// (i.e., if there's no `DiskManager` configured).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
let streams = std::mem::take(&mut self.in_mem_batches) | ||
.into_iter() | ||
.map(|batch| { | ||
let metrics = self.metrics.baseline.intermediate(); | ||
Ok(spawn_buffered(self.sort_batch_stream(batch, metrics)?, 1)) | ||
let reservation = self.reservation.split(batch.get_array_memory_size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me (modulo what @yjshen already said).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @yjshen and @crepererum
use datafusion::physical_optimizer::PhysicalOptimizerRule; | ||
use datafusion::physical_plan::common::batch_byte_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good idea -- I will do so in a follow on PR
Update: #7245
datafusion/common/src/config.rs
Outdated
/// How much memory is set aside, for each spillable sort, to | ||
/// ensure an in-memory merge can occur. This setting has no | ||
/// if the sort can not spill (there is no `DiskManager` | ||
/// configured) | ||
/// | ||
/// As part of spilling to disk, in memory data must be sorted | ||
/// / merged before writing the file. This in-memory | ||
/// sort/merge requires memory as well, so To avoid allocating | ||
/// once memory is exhausted, DataFusion sets aside this | ||
/// many bytes before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// A handle to the runtime to get Disk spill files | ||
/// Reservation for the merging of in-memory batches. If the sort | ||
/// might spill, `sort_spill_reservation_bytes` will be | ||
/// pre-reserved to ensure there is some space for this sort/merg. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in f87705e
Great, let's merge this! Thanks @alamb @crepererum @gruuya! |
Which issue does this PR close?
Closes #5885
Closes #6382 (an earlier version of this code)
Rationale for this change
Merging takes memory. It is most pronounced for dictionaries where the RowConverter actually interns the dictionary values and thus can take an appreciable amount of memory for high cardinality dictionaries. We have seen this be 10s of GB in certain IOx cases.
Thus it is important that the
streaming_merge
and things that use it, likeSort
andSortPreservingMerge
properly account for the memory used while merging.What changes are included in this PR?
This is based on the changes from @tustvold in #6382:
MemoryReservation
through tostreaming_merge
sort_spill_reservation_bytes
andsort_in_place_threshold_bytes
that control the level of spillingThere is some subtlety related to reserving memory for this merge up front when doing a spilling Sort, which I describe inline in comments
Are these changes tested?
Yes
Are there any user-facing changes?
If memory limits are configured, some plans will now error rather than exceed that limit.