20067: fix: race condition in SpillPool caused by buffered stream#240
20067: fix: race condition in SpillPool caused by buffered stream#240martin-augment wants to merge 7 commits intomainfrom
Conversation
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
Signed-off-by: Ahmed hossam <ahmed.hossambahig@gmail.com>
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a race condition within the Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request modifies the spill file buffering strategy in DataFusion's physical execution layer. The SpillManager removes its internal batch read buffer capacity configuration, including the private field and public builder method. The read_spill_as_stream function no longer wraps streams with buffering. Additionally, the multi_level_merge path adjusts its buffer length from 2 to 1 when retrieving sorted spill files. A new concurrent stress test is added to validate writer/reader behavior under interleaved execution conditions. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: Fix Race Condition in SpillPool by Removing Buffered StreamsSummaryThis PR addresses a race condition in Code Quality ✅Strengths:
Minor Issues:
Potential Issues
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@datafusion/physical-plan/src/spill/spill_pool.rs`:
- Around line 1447-1448: Fix the typos in the test comment that currently reads
"stress testing the concurncy in the reader and the reader to make sure there is
now race condtion going for 100 iterations with a 5 batches per iteration" by
replacing "concurncy" with "concurrency", "now" with "no", and "condtion" with
"condition" so the comment reads correctly about stress testing concurrency and
ensuring no race condition.
🧹 Nitpick comments (2)
datafusion/physical-plan/src/sorts/multi_level_merge.rs (1)
262-269: Change looks correct; the recursive buffer-reduction path inget_sorted_spill_files_to_mergeis now dead code.With
buffer_lenhardcoded to1, theif buffer_len > 1branch (line 382) can never be reached. The function still works correctly—it'll returnErrimmediately if it can't reserve memory for the minimum streams—but the recursive retry logic is effectively dead. Consider removing it or adding a comment noting it's retained for future flexibility.datafusion/physical-plan/src/spill/spill_manager.rs (1)
168-193:read_spill_as_streamandread_spill_as_stream_unbufferedare now identical.After removing
spawn_buffered, both methods produce the exact sameBox::pin(cooperative(SpillReaderStream::new(...))). The docstring on line 182 ("Same asread_spill_as_stream, but without buffering") is now misleading since neither method buffers. Consider removingread_spill_as_stream_unbufferedand updating callers to useread_spill_as_stream, or updating the doc to clarify they are the same.#!/bin/bash # Find all callers of read_spill_as_stream_unbuffered to assess impact of removal rg -n 'read_spill_as_stream_unbuffered' --type rust
| // stress testing the concurncy in the reader and the reader to make sure there is now race condtion | ||
| // going for 100 iterations with a 5 batches per iteration |
There was a problem hiding this comment.
Typos in test comment.
Line 1447: "concurncy" → "concurrency", "now" → "no", "condtion" → "condition".
Proposed fix
- // stress testing the concurncy in the reader and the reader to make sure there is now race condtion
- // going for 100 iterations with a 5 batches per iteration
+ // Stress testing the concurrency between the writer and the reader to make sure there is no race condition.
+ // Going for 100 iterations with 5 batches per iteration.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // stress testing the concurncy in the reader and the reader to make sure there is now race condtion | |
| // going for 100 iterations with a 5 batches per iteration | |
| // Stress testing the concurrency between the writer and the reader to make sure there is no race condition. | |
| // Going for 100 iterations with 5 batches per iteration. |
🤖 Prompt for AI Agents
In `@datafusion/physical-plan/src/spill/spill_pool.rs` around lines 1447 - 1448,
Fix the typos in the test comment that currently reads "stress testing the
concurncy in the reader and the reader to make sure there is now race condtion
going for 100 iterations with a 5 batches per iteration" by replacing
"concurncy" with "concurrency", "now" with "no", and "condtion" with "condition"
so the comment reads correctly about stress testing concurrency and ensuring no
race condition.
There was a problem hiding this comment.
value:good-to-have; category:documentation; feedback: The CodeRabbit AI reviewer is correct! There are few minor typos in the comments which need to be fixed.
There was a problem hiding this comment.
Code Review
This pull request addresses a race condition related to buffered streams when reading spilled files. The core change is the removal of read-ahead buffering in SpillManager::read_spill_as_stream, which makes it an unbuffered stream. This change is propagated to MultiLevelMergeBuilder which was using the buffered stream. A new stress test is also added to spill_pool.rs to verify the fix for concurrent writer/reader scenarios and prevent future regressions.
The changes look good and effectively remove the problematic buffering. I have a couple of suggestions for improvement: one is to refactor the now-redundant read_spill_as_stream_unbuffered function, and the other is to fix a comment in the new test.
| ))); | ||
|
|
||
| Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) | ||
| Ok(stream) |
There was a problem hiding this comment.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Gemini AI reviewer is correct! After removing the buffering from read_spill_as_stream() it is not exactly the same as read_spill_as_stream_unbuffered(). If the fix of the initial issue is to drop the pre-fetching then one of these methods should be removed and all its callers should use the other method. But maybe this is the wrong way to fix it. Maybe a better way is to find the reason why a reader gets an invalid EOF/None message and does not wait for more messages.
| // stress testing the concurncy in the reader and the reader to make sure there is now race condtion | ||
| // going for 100 iterations with a 5 batches per iteration |
There was a problem hiding this comment.
The comment contains a few typos and could be clarified. It seems to be testing concurrency between a writer and a reader, not two readers.
| // stress testing the concurncy in the reader and the reader to make sure there is now race condtion | |
| // going for 100 iterations with a 5 batches per iteration | |
| // Stress test for concurrency between the writer and reader to ensure there is no race condition. | |
| // It runs for 100 iterations, with 5 batches per iteration. |
There was a problem hiding this comment.
value:good-to-have; category:documentation; feedback: The Gemini AI reviewer is correct! There are few minor typos in the comments which need to be fixed.
🤖 Augment PR SummarySummary: This PR addresses a race condition in spill reading/merging by removing read-ahead buffering from spill streams. Changes:
Technical Notes: The change reduces in-memory read-ahead (previously via buffered spawning) to avoid concurrency issues between spill file lifecycle and buffered reads. 🤖 Was this summary useful? React with 👍 or 👎 |
| Ok(stream) | ||
| } | ||
|
|
||
| /// Same as `read_spill_as_stream`, but without buffering. |
There was a problem hiding this comment.
After removing spawn_buffered, read_spill_as_stream is also unbuffered, so the doc comment on read_spill_as_stream_unbuffered is now misleading (there’s no behavioral difference). This could confuse callers/tests that expect a distinction between the two APIs.
Severity: low
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Augment AI reviewer is correct! After removing the buffering from read_spill_as_stream() it is not exactly the same as read_spill_as_stream_unbuffered(). If the fix of the initial issue is to drop the pre-fetching then one of these methods should be removed and all its callers should use the other method. But maybe this is the wrong way to fix it. Maybe a better way is to find the reason why a reader gets an invalid EOF/None message and does not wait for more messages.
|
|
||
| let reader_handle = SpawnedTask::spawn(async move { | ||
| let mut batches_read = 0; | ||
| while let Some(result) = reader.next().await { |
There was a problem hiding this comment.
This while let Some(...) loop relies on the writer being dropped to terminate; if the stream fails to close for any reason, the test can hang indefinitely and stall CI. That makes failures show up as timeouts rather than a clear assertion failure.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:annoying; category:bug; feedback: The Augment AI reviewer is not correct! Depending on the writer to drop is a standard way to decide to stop waiting on read operation for all kinds of channels in Rust. The first/main reason is when receiving None message and the last dropped writer is the second reason.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! After removing the buffering from |
value:good-to-have; category:documentation; feedback: The Claude AI reviewer is correct! There are few minor typos in the comments which need to be fixed. |
20067: To review by AI