-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FileStream: Open next file in parallel while decoding #5161
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me @thinkharderdev -- thank you. It would be great to figure out some way to test this PR (mostly to ensure we don't break this behavior in the future). However, i don't have any clever ideas on how to do so.
I went through the logic in detail.
I left some suggestions for comments to clarify the intent, which I think would be valuable but are not necessary.
cc @tustvold
partition_values, | ||
} | ||
} | ||
None => return Poll::Ready(None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None => return Poll::Ready(None), | |
// No more input files | |
None => return Poll::Ready(None), |
@@ -237,13 +249,34 @@ impl<F: FileOpener> FileStream<F> { | |||
partition_values, | |||
} => match ready!(future.poll_unpin(cx)) { | |||
Ok(reader) => { | |||
let partition_values = mem::take(partition_values); | |||
|
|||
let next = self.next_file().transpose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let next = self.next_file().transpose(); | |
// begin opening next file | |
let next = self.next_file().transpose(); |
@@ -98,6 +99,8 @@ enum FileStreamState { | |||
partition_values: Vec<ScalarValue>, | |||
/// The reader instance | |||
reader: BoxStream<'static, Result<RecordBatch, ArrowError>>, | |||
/// A [`FileOpenFuture`] for the next file to be processed | |||
next: Option<(FileOpenFuture, Vec<ScalarValue>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could make it future-proof by potentially prefetching n
files instead of 1? I guess in cases where file opening is slower than scanning / processing, this could make a difference (e.g. small files).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a follow on PR could turn this into a stream and use StreamExt::buffered
or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that seems like a good idea
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
f8a339d
to
fa60853
Compare
Let's file a ticket for the "buffer N items at a time" idea and work on it as a follow on PR |
Thanks again @thinkharderdev |
Benchmark runs are scheduled for baseline = 48732b4 and contender = 816a0f8. 816a0f8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Added #5209 |
Which issue does this PR close?
Closes #5129
Rationale for this change
File opening is mostly IO (and may involve a bunch of sequential IO) so it can probably be parallelized well with decoding. So we should open the next file in parallel while decoding the current file in
FileStream
What changes are included in this PR?
Are these changes tested?
I think this should be covered by existing tests
Are there any user-facing changes?
FileStreamMetrics.time_opening
is a slightly different metric now as it won't capture time spent opening but rather time spent opening while also not concurrently decoding.