-
Notifications
You must be signed in to change notification settings - Fork 838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add AsyncFileReader trait #1803
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a nice improvement to me. Thanks @tustvold
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>; | ||
|
||
/// Retrieve the [`ParquetMetaData`] for this file | ||
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a little strange that the get_metadata
is part of AsyncChunkReader
as I would have expected the "read bytes" and "logically read and decode parquet data" more separated
Would it make sense to consider two separate traits? Something like the following perhaps 🤔
/// A reader that can asynchronously read a range of bytes
pub trait AsyncChunkReader: Send + Unpin + 'static {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
}
/// Returns parquet metadata, possibly asynchronously
pub trait AsyncParquetReader: Send + Unpin + 'static {
/// Retrieve the [`ParquetMetaData`] for this file
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
}
Or maybe call it AsyncChunkedParquetReader
? (though I admit that is getting to be a mouthful)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncParquetFile
?
I agree it is a little odd, but I want to give flexibility to how this metadata is sourced to allow for caching, pre-fetching, etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParquetReaderAccess
?
The usecase of cached / catalog'd metadata is a good one -- perhaps we can just add a comment explaining the rationale.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've split the traits up, but it feels very odd to me to have two traits implemented on the same type that is then passed to ParquetRecordBatchStreamBuilder::new
...
I'll revisit in the morning, I feel this has just made it more confusing tbh...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree it is now more confusing -- hoping a good night sleep will make it clearer. I am happy with whatever you decide
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with AsyncFileReader
and added a load of doc comments
|
||
let mut buffer = vec![0_u8; (end - start) as usize]; | ||
input.read_exact(buffer.as_mut_slice()).await?; | ||
let data = input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is just factored into AsyncChunkReader::get_bytes
correct?
let (offset_1, length_1) = metadata.row_group(0).column(1).byte_range(); | ||
let (offset_2, length_2) = metadata.row_group(0).column(2).byte_range(); | ||
|
||
assert_eq!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
async move { | ||
self.seek(SeekFrom::Start(range.start as u64)).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, as get_bytes
and get_metadata
both async, and they both call seek
. Is any chance there will be race condition between then? For example, calling get_bytes
first but the file pos is changed by call get_metadata
next?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method takes a mutable reference, and return a future with the same lifetime, so they can't race
Codecov Report
@@ Coverage Diff @@
## master #1803 +/- ##
==========================================
- Coverage 83.42% 83.42% -0.01%
==========================================
Files 198 199 +1
Lines 56327 56632 +305
==========================================
+ Hits 46990 47243 +253
- Misses 9337 9389 +52
Continue to review full report at Codecov.
|
Which issue does this PR close?
Part of #1605
Rationale for this change
I originally wanted to introduce an API that would afford clients greater control of this, but particularly with the in-flight work to support column indexes (#1749) and (#1191), it is unclear what exactly this interface should look like. Rather than potentially paint us into a corner, lets keep the interface high-level and we can introduce a lower-level API as and when desired.
What changes are included in this PR?
This extracts an
AsyncChunkReader
which exposes an API to fetch byte ranges. For backwards compatibility, an implementation is provided forAsyncRead + AsyncSeek
. This provides a fairly straightforward location for DataFusion to plug in an object store that supports byte range fetches.Are there any user-facing changes?
No