Skip to content

Commit 13208e6

Browse files
authored
Validate the memory consumption in SPM created by multi level merge (apache#17029)
* use GreedyMemoryPool for sanity check * validate whether batch read from spill exceeds max_record_batch_mem * replace err with warn log
1 parent b293e2c commit 13208e6

File tree

4 files changed

+57
-15
lines changed

4 files changed

+57
-15
lines changed

datafusion/physical-plan/benches/spill_io.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,9 @@ fn bench_spill_io(c: &mut Criterion) {
115115
// - Wait for the consumer to finish processing
116116
|spill_file| {
117117
rt.block_on(async {
118-
let stream =
119-
spill_manager.read_spill_as_stream(spill_file).unwrap();
118+
let stream = spill_manager
119+
.read_spill_as_stream(spill_file, None)
120+
.unwrap();
120121
let _ = collect(stream).await.unwrap();
121122
})
122123
},
@@ -519,8 +520,9 @@ fn benchmark_spill_batches_for_all_codec(
519520
)
520521
.unwrap()
521522
.unwrap();
522-
let stream =
523-
spill_manager.read_spill_as_stream(spill_file).unwrap();
523+
let stream = spill_manager
524+
.read_spill_as_stream(spill_file, None)
525+
.unwrap();
524526
let _ = collect(stream).await.unwrap();
525527
})
526528
},
@@ -553,7 +555,9 @@ fn benchmark_spill_batches_for_all_codec(
553555
let rt = Runtime::new().unwrap();
554556
let start = Instant::now();
555557
rt.block_on(async {
556-
let stream = spill_manager.read_spill_as_stream(spill_file).unwrap();
558+
let stream = spill_manager
559+
.read_spill_as_stream(spill_file, None)
560+
.unwrap();
557561
let _ = collect(stream).await.unwrap();
558562
});
559563
let read_time = start.elapsed();

datafusion/physical-plan/src/sorts/multi_level_merge.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ impl MultiLevelMergeBuilder {
237237
let spill_file = self.sorted_spill_files.remove(0);
238238

239239
// Not reserving any memory for this disk as we are not holding it in memory
240-
self.spill_manager.read_spill_as_stream(spill_file.file)
240+
self.spill_manager
241+
.read_spill_as_stream(spill_file.file, None)
241242
}
242243

243244
// Only in memory streams, so merge them all in a single pass
@@ -274,10 +275,12 @@ impl MultiLevelMergeBuilder {
274275
.spill_manager
275276
.clone()
276277
.with_batch_read_buffer_capacity(buffer_size)
277-
.read_spill_as_stream(spill.file)?;
278+
.read_spill_as_stream(
279+
spill.file,
280+
Some(spill.max_record_batch_memory),
281+
)?;
278282
sorted_streams.push(stream);
279283
}
280-
281284
let merge_sort_stream = self.create_new_merge_sort(
282285
sorted_streams,
283286
// If we have no sorted spill files left, this is the last run

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use datafusion_common_runtime::SpawnedTask;
4343
use datafusion_execution::disk_manager::RefCountedTempFile;
4444
use datafusion_execution::RecordBatchStream;
4545
use futures::{FutureExt as _, Stream};
46+
use log::warn;
4647

4748
/// Stream that reads spill files from disk where each batch is read in a spawned blocking task
4849
/// It will read one batch at a time and will not do any buffering, to buffer data use [`crate::common::spawn_buffered`]
@@ -54,8 +55,16 @@ use futures::{FutureExt as _, Stream};
5455
struct SpillReaderStream {
5556
schema: SchemaRef,
5657
state: SpillReaderStreamState,
58+
/// Maximum memory size observed among spilling sorted record batches.
59+
/// This is used for validation purposes during reading each RecordBatch from spill.
60+
/// For context on why this value is recorded and validated,
61+
/// see `physical_plan/sort/multi_level_merge.rs`.
62+
max_record_batch_memory: Option<usize>,
5763
}
5864

65+
// Small margin allowed to accommodate slight memory accounting variation
66+
const SPILL_BATCH_MEMORY_MARGIN: usize = 4096;
67+
5968
/// When we poll for the next batch, we will get back both the batch and the reader,
6069
/// so we can call `next` again.
6170
type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>, Option<RecordBatch>)>;
@@ -76,10 +85,15 @@ enum SpillReaderStreamState {
7685
}
7786

7887
impl SpillReaderStream {
79-
fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
88+
fn new(
89+
schema: SchemaRef,
90+
spill_file: RefCountedTempFile,
91+
max_record_batch_memory: Option<usize>,
92+
) -> Self {
8093
Self {
8194
schema,
8295
state: SpillReaderStreamState::Uninitialized(spill_file),
96+
max_record_batch_memory,
8397
}
8498
}
8599

@@ -125,6 +139,23 @@ impl SpillReaderStream {
125139
Ok((reader, batch)) => {
126140
match batch {
127141
Some(batch) => {
142+
if let Some(max_record_batch_memory) =
143+
self.max_record_batch_memory
144+
{
145+
let actual_size =
146+
get_record_batch_memory_size(&batch);
147+
if actual_size
148+
> max_record_batch_memory
149+
+ SPILL_BATCH_MEMORY_MARGIN
150+
{
151+
warn!(
152+
"Record batch memory usage ({actual_size} bytes) exceeds the expected limit ({max_record_batch_memory} bytes) \n\
153+
by more than the allowed tolerance ({SPILL_BATCH_MEMORY_MARGIN} bytes).\n\
154+
This likely indicates a bug in memory accounting during spilling.\n\
155+
Please report this issue in https://github.com/apache/datafusion/issues/17340."
156+
);
157+
}
158+
}
128159
self.state = SpillReaderStreamState::Waiting(reader);
129160

130161
Poll::Ready(Some(Ok(batch)))
@@ -417,7 +448,7 @@ mod tests {
417448
let spilled_rows = spill_manager.metrics.spilled_rows.value();
418449
assert_eq!(spilled_rows, num_rows);
419450

420-
let stream = spill_manager.read_spill_as_stream(spill_file)?;
451+
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
421452
assert_eq!(stream.schema(), schema);
422453

423454
let batches = collect(stream).await?;
@@ -481,7 +512,7 @@ mod tests {
481512
let spilled_rows = spill_manager.metrics.spilled_rows.value();
482513
assert_eq!(spilled_rows, num_rows);
483514

484-
let stream = spill_manager.read_spill_as_stream(spill_file)?;
515+
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
485516
assert_eq!(stream.schema(), dict_schema);
486517
let batches = collect(stream).await?;
487518
assert_eq!(batches.len(), 2);
@@ -512,7 +543,7 @@ mod tests {
512543
assert!(spill_file.path().exists());
513544
assert!(max_batch_mem > 0);
514545

515-
let stream = spill_manager.read_spill_as_stream(spill_file)?;
546+
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
516547
assert_eq!(stream.schema(), schema);
517548

518549
let batches = collect(stream).await?;
@@ -547,7 +578,7 @@ mod tests {
547578
let spilled_rows = spill_manager.metrics.spilled_rows.value();
548579
assert_eq!(spilled_rows, num_rows);
549580

550-
let stream = spill_manager.read_spill_as_stream(spill_file)?;
581+
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
551582
assert_eq!(stream.schema(), schema);
552583

553584
let batches = collect(stream).await?;
@@ -931,8 +962,10 @@ mod tests {
931962
.spill_record_batch_and_finish(&batches, "Test2")?
932963
.unwrap();
933964

934-
let mut stream_1 = spill_manager.read_spill_as_stream(spill_file_1)?;
935-
let mut stream_2 = spill_manager.read_spill_as_stream(spill_file_2)?;
965+
let mut stream_1 =
966+
spill_manager.read_spill_as_stream(spill_file_1, None)?;
967+
let mut stream_2 =
968+
spill_manager.read_spill_as_stream(spill_file_2, None)?;
936969
stream_1.next().await;
937970
stream_2.next().await;
938971

datafusion/physical-plan/src/spill/spill_manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,12 @@ impl SpillManager {
174174
pub fn read_spill_as_stream(
175175
&self,
176176
spill_file_path: RefCountedTempFile,
177+
max_record_batch_memory: Option<usize>,
177178
) -> Result<SendableRecordBatchStream> {
178179
let stream = Box::pin(cooperative(SpillReaderStream::new(
179180
Arc::clone(&self.schema),
180181
spill_file_path,
182+
max_record_batch_memory,
181183
)));
182184

183185
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))

0 commit comments

Comments
 (0)