-
Notifications
You must be signed in to change notification settings - Fork 838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API for encoding/decoding ParquetMetadata with more control #6002
Comments
This sounds quite a lot like https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.MetadataLoader.html ? |
That is quite similar -- thank you. Some differences might be also be the with a normal (non |
Ye I think the asyncness would be an important difference. Also that the existing APIs kind of want to load from an entire file. I suppose you could give it a "file" with just the footer and tell it to load just that range... but it feels a bit forced? Same with the asyncness. For my use case I could do some pointless async work (as in, make an async file like thing that just points to a |
Yes I agree this would be ideal. Having two things:
|
I took a crack at using My approach was to manually grab the footer based on the footer size declared in the penultimate 4 bytes of the file and save that. But the metadata size declared in the footer seems to not include the Page Index, and I'm not sure how I'd calculate the start location of the Page Index (and other stuff like bloom filters). My implementation looks somewhat like: Code#[derive(Debug, Clone)]
struct AsyncBytes {
file_size: usize,
inner: Bytes,
}
impl AsyncBytes {
fn new(file_size: usize, inner: Bytes) -> Self {
Self {
file_size,
inner,
}
}
}
impl MetadataFetch for AsyncBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
// check that the range is within the metadata section
let available_range = self.file_size - self.inner.len()..self.file_size;
if !(available_range.start <= range.start && available_range.end >= range.end) {
return async move {
let err = format!("Attempted to fetch data from outside metadata section: range={:?}, available_range={:?}", range, available_range);
Err(parquet::errors::ParquetError::General(err))
}
.boxed();
}
// adjust the range to be within the data section
let range = range.start - available_range.start..range.end - available_range.start;
let data = self.inner.slice(range.start..range.end);
async move { Ok(data) }.boxed()
}
}
/// Load parquet metadata, including the page index, from bytes.
/// This assumes the entire metadata (and no more) is in the provided bytes.
/// Although this method is async, no IO is performed.
pub async fn load_metadata(file_size: usize, serialized_parquet_metadata: Bytes) -> ParquetResult<Arc<ParquetMetaData>> {
let loaded_metadata = decode_metadata(&serialized_parquet_metadata)?;
let reader = AsyncBytes::new(file_size, serialized_parquet_metadata);
let mut metadata = MetadataLoader::new(reader, loaded_metadata);
metadata.load_page_index(true, true).await?;
Ok(Arc::new(metadata.finish()))
} Not sure what the right APIs would be for this sort of use case, or in general but it seems like |
I got my thing working, but it seems quite brittle. TLDR is that I'm just tracking what bytes DataFusion reads and then slicing to those. Which seems like it could be quite inefficient and might break if DataFusion changes internal details. Code#[derive(Debug, Clone)]
struct AsyncBytes {
file_size: usize,
data_suffix: Bytes,
min_offset: usize,
max_offset: usize,
}
impl AsyncBytes {
fn new(file_size: usize, data_suffix: Bytes) -> Self {
Self {
file_size,
data_suffix,
min_offset: file_size,
max_offset: file_size,
}
}
fn fetched_range(&self) -> Range<usize> {
self.min_offset..self.max_offset
}
}
impl MetadataFetch for &mut AsyncBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
self.min_offset = self.min_offset.min(range.start);
self.max_offset = self.max_offset.max(range.end);
let available_range = self.file_size - self.data_suffix.len()..self.file_size;
if !(available_range.start <= range.start && available_range.end >= range.end) {
return async move {
let err = format!(
"Attempted to fetch data from outside metadata section: range={range:?}, available_range={available_range:?}"
);
Err(parquet::errors::ParquetError::General(err))
}
.boxed();
}
// adjust the range to be within the data section
let range = range.start - available_range.start..range.end - available_range.start;
let data = self.data_suffix.slice(range.start..range.end);
async move { Ok(data) }.boxed()
}
}
pub async fn load_metadata(
file_size: usize,
serialized_parquet_metadata: Bytes,
) -> ParquetResult<Arc<ParquetMetaData>> {
let mut reader = AsyncBytes::new(file_size, serialized_parquet_metadata.clone());
let loader = MetadataLoader::load(&mut reader, file_size, None).await?;
let loaded_metadata = loader.finish();
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await?;
Ok(Arc::new(metadata.finish()))
}
pub async fn extract_metadata_from_file(file_data: &Bytes) -> ParquetResult<Vec<u8>> {
let loaded_metadata = parse_metadata(file_data)?;
let mut reader = AsyncBytes::new(file_data.len(), file_data.clone());
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await?;
metadata.finish();
Ok(file_data[reader.fetched_range().start..].to_vec())
} |
Good to hear you got it working. Yes I agree getting a more flexible API worked out that is more efficient would be ideal As I think you are hinting at, Maybe a good place to start would be to write tests / examples of what you are trying to do. For example:
Also are you trying to support when you have bytes in memory that you want to decode parquet metadata from? |
Yes, exactly. But to get those bytes in memory I also have to write them somehow. The big picture use case is that I have a Currently I'm writing the In thinking about it more I don't think we need a new metadata loader. There are various places where metadata references byte ranges or offsets that apply to the entire file (e.g. the column index offsets) so there's always going to be a bit of friction trying to load metadata without the rest of the file. Maybe this is an indication that I'm abusing metadata and instead should be making a completely parallel structure but practically that's unjustifiable in terms of complexity and adding more conversions to load / dump when we already have a good serialization format. In any case, I think a simplified version of #6002 (comment) for reading would be okay: #[derive(Debug, Clone)]
struct MetadataBytes {
file_size: usize,
serialized_parquet_metadata: Bytes,
}
impl MetadataBytes {
fn new(file_size: usize, serialized_parquet_metadata: Bytes) -> Self {
Self {
file_size,
serialized_parquet_metadata,
}
}
}
impl MetadataFetch for &mut MetadataBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
let available_range = self.file_size - self.serialized_parquet_metadata.len()..self.file_size;
if !(available_range.start <= range.start && available_range.end >= range.end) {
return async move {
let err = format!(
"Attempted to fetch data from outside metadata section: range={range:?}, available_range={available_range:?}"
);
Err(parquet::errors::ParquetError::General(err))
}
.boxed();
}
// adjust the range to be within the data section
let range = range.start - available_range.start..range.end - available_range.start;
let data = self.serialized_parquet_metadata.slice(range.start..range.end);
async move { Ok(data) }.boxed()
}
}
pub async fn load_metadata(
file_size: usize,
serialized_parquet_metadata: Bytes,
) -> ParquetResult<Arc<ParquetMetaData>> {
let mut reader = MetadataBytes::new(file_size, serialized_parquet_metadata.clone());
let loader = MetadataLoader::load(&mut reader, file_size, None).await?;
let loaded_metadata = loader.finish();
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await?;
Ok(Arc::new(metadata.finish()))
} There's still some friction here as visible in the complexity of the code, in particular the two-step loading of the page indexes and the false asyncness. The former I now understand is just because you need information from the metadata to know how to load the page indexes. The latter is not worth making a whole new API for. I don't know if you feel this code is worth committing to the project, I'm happy to just use it myself until someone comes along with another use case for loading ParquetMetadata from just the metadata bytes. |
In various conversations I have had the last few days, both internally at InfluxData as well as with others, this has come up Basically, I think having the ability to easily read/write For example I think @XiangpengHao is thinking about it in some contexts, and I know @crepererum and @NGA-TRAN are as well. Thus now that we have a vehicle for working on code for 53.0.0 ( I will find time to actively help and review |
I started working on an example here: #6081 (and tried to summarize what I think the usecase is). |
I think we have our first chunk of the writing side done here: #6197 See also my attempt to document more clearly how all these structures relate and the various APIs available |
I also updated the example in #6081 to use the API that @adriangb added in #6197 (and that I touched up in #6202) I would say that thanks to @adriangb and @etseidl the writing of ParquetMetaData is looking quite nice If someone (🎣 ) had time to make a similar API for reading I think we would be in great shape |
REminder here is what the metadata looks like
How to read this todayUsing the code in #6081 as an example, here is the best way I have come up with for reading metadata without firing up a parquet file reader: Note this DOES NOT read the /// Reads the metadata from a file
///
/// This function reads the format written by `write_metadata_to_file`
fn read_metadata_from_file(file: impl AsRef<Path>) -> ParquetMetaData {
let mut file = std::fs::File::open(file).unwrap();
// This API is kind of awkward compared to the writer
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).unwrap();
let len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[len - 8..len]);
let md_length = decode_footer(&footer).unwrap();
// note this also doesn't contain the ColumnOffset or ColumnIndex
let metadata_buffer = &buffer[len - 8 - md_length..md_length];
decode_metadata(metadata_buffer).unwrap()
} Proposed APIHere is how I would like to interact with the data (this would apply equally to metadata stored memory blobs too) /// Reads the metadata from a file
///
/// This function reads the format written by `write_metadata_to_file`
fn read_metadata_from_file(file: impl AsRef<Path>) -> ParquetMetaData {
let mut file = std::fs::File::open(file).unwrap();
// This API is kind of awkward compared to the writer
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).unwrap();
let decoder = ParquetMetaDataDecoder::new()
// read OffsetIndex and PageIndex, if present, populating
// ParquetMetaData::column_index and ParquetMetaData::offset_index
.with_page_index(true);
decoder.decode(&but).unwrap()
} Nuances
|
My suggestion is that we try to pull the decoding code into a structure like I am pretty sure there will be adjustments required, but that would be a good place to start I think Perhaps @adriangb you could give it a try given your interest https://github.com/apache/arrow-rs/pull/6081/files#r1706311772 |
So here's what I've been working with: /// Load parquet metadata, including the page index, from bytes.
/// This assumes the entire metadata (and no more) is in the provided bytes.
/// Although this method is async, no IO is performed.
pub async fn load_metadata(
file_size: usize,
serialized_parquet_metadata: Bytes,
) -> ParquetResult<Arc<ParquetMetaData>> {
let metadata_length = serialized_parquet_metadata.len();
let mut reader = MaskedBytes::new(
Box::new(AsyncBytes::new(serialized_parquet_metadata)),
file_size - metadata_length..file_size,
);
let metadata = MetadataLoader::load(&mut reader, file_size, None).await?;
let loaded_metadata = metadata.finish();
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await?;
Ok(Arc::new(metadata.finish()))
} Supporting code/// Adapt a `Bytes` to a `MetadataFetch` implementation.
struct AsyncBytes {
data: Bytes,
}
impl AsyncBytes {
fn new(data: Bytes) -> Self {
Self { data }
}
}
impl MetadataFetch for AsyncBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
async move { Ok(self.data.slice(range.start..range.end)) }.boxed()
}
}
/// A `MetadataFetch` implementation that reads from a subset of the full data
/// while accepting ranges that address the full data.
struct MaskedBytes {
inner: Box<dyn MetadataFetch + Send>,
inner_range: Range<usize>,
}
impl MaskedBytes {
fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: Range<usize>) -> Self {
Self { inner, inner_range }
}
}
impl MetadataFetch for &mut MaskedBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
// check that the range is within the metadata section
let inner_range = self.inner_range.clone();
if !(inner_range.start <= range.start && inner_range.end >= range.end) {
return async move {
let err = format!(
"Attempted to fetch data from outside metadata section: range={range:?}, available_range={inner_range:?}",
);
Err(parquet::errors::ParquetError::General(err))
}
.boxed();
}
// adjust the range to be within the data section
let range = range.start - self.inner_range.start..range.end - self.inner_range.start;
self.inner.fetch(range)
}
} Sorry I didn't fully understand the question. I think the API looks good on the surface and pending internal details should work. That offset adjustment would be 0 if you (1) have the whole file or (2) are loading metadata dumped by #6197. As you point out this might be hard to integrate with |
FWIW when I set out to write the MetadataLoader the initial goal was for it to be push-based, however, I struggled to come up with a suitable interface for this in the time I had available. One option might be to return a special Error that allows it to "request" a range be loaded, but it ends up pretty gnarly. IMO the trick is to share the sync decoding logic and expose it an ergonomic way, and accept that the IO piece will have to be different for async vs non-async. This is broadly the pattern that is used throughout the parquet crate, and I don't really see a way around it. |
To alleviate concerns about the API design, could we keep that private? That is, we'd have:
|
@adriangb I think #6002 (comment) is a great idea It also would make
Sorry for not being clear, I was just trying to say it would be good not to have two entirely separate paths for decoding the metadata. I think you ridea of the "internal push based or whatever API decoder" sounds perfect |
The somewhat unfortunate formulation of MetadataLoader has also come up on #6157 |
Update here is that thanks to several PRs from @etseidl and myself I am going to claim this is now basically complete. It is possible to read/write parquet metadata and manipulate it much more easily now (using |
Amazing work thank you all! |
Hello, everyone! This API is a great improvement. I have adopted it in the Thank you, @etseidl, for implementing this. And thanks to everyone here who has joined the discussion. |
The only question left for me is whether I still need to implement It's a bit strange to write code like: let reader = ParquetMetaDataReader::new().with_prefetch_hint(Some(self.prefetch_footer_size));
let size = self.content_length as usize;
// Use `self` inside a `fn get_metadata(&mut self)`
let meta = reader.load_and_finish(self, size).await?; |
It does look a bit strange, but I am not sure what an alternate would look like |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
There are several cases where we would like to have more control over the encoding/deocing of Parquet metadata:
SchemaDescriptorPtr
acrossParquetMetadata
objects #5999At the time of writing, the current APIs exposed
decode_metadata
, has no way for finer grained controlDescribe the solution you'd like
I would like an API that allows more fine grained control over reading/writing metadata and that permits adding additional features over time in a backwards compatible way
Describe alternatives you've considered
Here is one potential idea -- to create
Encoder
/Decoder
structs that can encode and decode the metadata along with various configuration options.Ideally this struct would be integrated into the rest of the crate, e.g. used in SerializedFileWriter?
Similarly for decoding
Additional context
This ticket is based on the discussion with @adriangb here #5988
There are a bunch of discussions on metadata speed here #5770
Here is a PR with a proposed 'encode_metadata' function: #6000
The text was updated successfully, but these errors were encountered: