-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC: Add ParquetMetaDataReader #6392
Conversation
Thank you @etseidl this looks amazing. I hope to give it a look over the next few days |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This generally looks great to me! The new API with the sync / async method is perfect. Amazing work!
The TODO with the ability to load from a byte range that only has the metadata is the one piece missing for this to satisfy our use case, but we can easily work around that by passing in a ChunkReader
that lies about the offsets it's loading from and errors if you try to access an offset outside of it's range.
Thanks @adriangb. I've added the functionality from the TODO ( |
Amazing speed, yes that's perfect! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for working on this. Here are some suggestions from my side.
parquet/src/file/metadata/reader.rs
Outdated
prefetch_hint: Option<usize>, | ||
} | ||
|
||
impl Default for ParquetMetaDataReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we can derive(Default)
and use Self::default()
in fn new()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course. Done.
parquet/src/file/metadata/reader.rs
Outdated
if self.metadata.is_none() { | ||
return Err(general_err!("could not parse parquet metadata")); | ||
} | ||
Ok(self.metadata.take().unwrap()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I know the unwrap()
here is safe, but what about replacing it with
self.metadata
.take()
.ok_or_else(|| general_err!("could not parse parquet metadata"))
I believe it can eliminate an extra check and improve readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, still learning Rust idioms :)
parquet/src/file/metadata/reader.rs
Outdated
|
||
// Get bounds needed for page indexes (if any are present in the file). | ||
let range = self.range_for_page_index(); | ||
let range = match range { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using:
let Some(range) = self.range_for_page_index() else {
return Ok(());
};
parquet/src/file/metadata/reader.rs
Outdated
/// least two fetches, regardless of the value of `prefetch_hint`, if the page indexes are | ||
/// requested. | ||
#[cfg(feature = "async")] | ||
pub async fn try_load_from_tail<R: AsyncFileReader + AsyncRead + AsyncSeek + Unpin + Send>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, requiring an additional bound of AsyncRead + AsyncSeek
is a bit confusing. Could you provide more context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm echoing the bounds from here. AsyncFileReader
isn't really necessary, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this design is correct. How about removing this API until we resolve #6157? We can bring this API back while we have native suffix read support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, removed. TBH I don't fully understand the issue in #6157 and thought the approach in AsyncFileReader::get_metadata
could be an alternative solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @etseidl -- I basically agree with @adriangb and @Xuanwo that this is wonderful work
This is an attempt to consolidate Parquet footer/page index reading/parsing into a single place.
It is amazing
This implementation is very rough, with not enough safety checking and documentation. At this point I'm hoping for feedback on the approach.
My feedback is it is great -- thank you to @adriangb and @Xuanwo for their earlier feedback.
If this seems at all useful, then a path forward would be to first add ParquetMetaDataReader alone, and then in subsequent PRs begin to use it as a replacement for other functions which could then be deprecated. The idea is to get as much in without breaking changes, and then introduce the breaking changes once 54.0.0 is open.
I think this plan makes a lot of sense to me (and I think we can avoid most breaking changes -- deprecating is not a breaking change in my perspective)
@@ -61,6 +66,8 @@ impl std::fmt::Display for ParquetError { | |||
write!(fmt, "Index {index} out of bound: {bound}") | |||
} | |||
ParquetError::External(e) => write!(fmt, "External: {e}"), | |||
ParquetError::NeedMoreData(needed) => write!(fmt, "NeedMoreData: {needed}"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nitpick is that these seems pretty similar . I wonder if it would make sense to combine them somehow 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't intending two, it just turned out that way. I could make the second usize
optional with the understanding that a range is being requested.
Also, does adding to the enum make this a breaking change? If so, I could go back to my tortured use of IndexOutOfBound
until it's open season on breaking changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, does adding to the enum make this a breaking change? If so, I could go back to my tortured use of IndexOutOfBound until it's open season on breaking changes.
Yes, unfortunately, it does make it a breaking change
https://github.com/apache/arrow-rs/blob/master/parquet/src/errors.rs#L29
We should probably mark the error type as "non exhaustive" which would make it a non breaking change in the future
@@ -237,8 +236,10 @@ where | |||
Fut: Future<Output = Result<Bytes>> + Send, | |||
{ | |||
let fetch = MetadataFetchFn(fetch); | |||
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?; | |||
Ok(loader.finish()) | |||
// TODO(ets): should add option to read page index to this function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative perhaps would be to deprecate fetch_parquet_metadata
entirely and suggest people use ParquetMetaDataReader
which s more complete and full featured -- I think we could deprecate this function in a minor release (we can't remover it until a major release)
parquet/src/file/metadata/reader.rs
Outdated
/// arguments). | ||
/// | ||
/// [Page Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md | ||
pub fn parquet_metadata_from_file<R: ChunkReader>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nitpick is maybe we can call this "parquet_metadata_from_reader`
Also I wonder if instead of a new API it would make sense to always directly use ParquetMetaDataReader
directly. That would certainly be more verbose, but it also might be more explicit.
For the common case that the wrapping code won't retry (aka all the callsites of parquet_metadata_from_file
, we could also add some sort of consuming API too that combines try_parse
and finish
to make it less verbose. Something like
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(column_index)
.with_offset_indexes(offset_index)
.parse(file)?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that seems reasonable. And yes, I struggle with naming things 😄.
parquet/src/file/metadata/reader.rs
Outdated
} | ||
|
||
/// Same as [`Self::try_parse()`], but only `file_range` bytes of the original file are | ||
/// available. `file_range.end` must point to the end of the file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a little confused about how file_range
works in this case (given that it seems to me that ChunkReader
would in theory allow reading arbitrary ranges)
Is the idea that try_parse_range
limits the requests to the reader
so they are only within file_range
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for the case of impl ChunkReader for Bytes
. Say you've speculatively read the last 1000 bytes of a file into a buffer, and let's say that's actually sufficient for the page indexes. But the locations of the page indexes are absolute (in this case they're in the range 1100..1500), so you can't actually find the indexes in the buffer unless you know the file offset of the beginning of the buffer (trying to seek to 1100 in a 1000 byte buffer clearly won't work...you need to subtract 1000 from the absolute offsets and read 100..500 from the buffer).
I also wanted different error behavior for the File
vs Bytes
use cases. If a File
is passed, there's no sense in asking for a larger file; if the metadata can't be read there's either some I/O error going on or a corrupted file.
Perhaps we could instead just pass in the file size, while still mandating that if Bytes
are passed they must include the footer. We could then infer the range as file_size-reader.len()..file_size
, and then errors could simply return the number of bytes needed from the tail of the file. This would perhaps solve the above issue with two new and very similar errors types.
parquet/src/file/metadata/reader.rs
Outdated
} | ||
|
||
/// Attempts to (asynchronously) parse the footer metadata (and optionally page indexes) | ||
/// given a [`MetadataFetch`]. The file size must be known to use this function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might also be good to note here that try_load
will attempt to minimize the number of calls to fetch
by prefetching but may make potentially multiple requests depending on how the data is laid out.
As an aside (and not changed in this PR), I found the use of MetadataFetch
as basically an async version of ChunkReader
confusing when trying to understand this API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might also be good to note here that try_load will attempt to minimize the number of calls to fetch by prefetching but may make potentially multiple requests depending on how the data is laid out.
I can add a reference back to with_prefetch_hint
where this is explained already.
As an aside (and not changed in this PR), I found the use of MetadataFetch as basically an async version of ChunkReader confusing when trying to understand this API
I'll admit to not being well versed in the subtleties of async code. And I am trying for a drop-in replacement for MetadataLoader
initially. Do you think using AsyncFileReader
would be cleaner/clearer?
@adriangb I also found the use of "offsets" confusing as they parquet metadata has and uses offsets that are always "absolute" offsets within the overall file. Maybe we can make / update the example in #6081. Speaking of which, I will go do that now. |
replace try_parse_range() with try_parse_sized()
Which issue does this PR close?
Relates to #6002
Rationale for this change
This is an attempt to consolidate Parquet footer/page index reading/parsing into a single place.
What changes are included in this PR?
The new
ParquetMetaDataReader
basically takes the code inparquet/src/file/footer.rs
andparquet/src/arrow/async_reader/metadata.rs
and mashes them together into a single API. Using this, theread_metadata_from_file
call from #6081 would become:Also included are two async functions
try_load()
andtry_load_from_tail()
. The former is a combination ofMetadataLoader::load()
andMetadataLoader::load_page_index
. The latter is an attempt at addressing the issue of loading the footer when the file size is not known, so it requires being able to seek from the end of the file.This implementation is very rough, with not enough safety checking and documentation. At this point I'm hoping for feedback on the approach. If this seems at all useful, then a path forward would be to first add
ParquetMetaDataReader
alone, and then in subsequent PRs begin to use it as a replacement for other functions which could then be deprecated. The idea is to get as much in without breaking changes, and then introduce the breaking changes once 54.0.0 is open.Are there any user-facing changes?
Eventually, yes.