Skip to content

Commit

Permalink
No longer write Parquet column metadata after column chunks *and* in …
Browse files Browse the repository at this point in the history
…the footer (#6117)

* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)

* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* fix example tests

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

---------

Signed-off-by: Bugen Zhao <i@bugenzhao.com>

* Remove `impl<T: AsRef<[u8]>> From<T> for Buffer`  that easily accidentally copies data (#6043)

* deprecate auto copy, ask explicit reference

* update comments

* make cargo doc happy

* Make display of interval types more pretty (#6006)

* improve dispaly for interval.

* update test in pretty, and fix display problem.

* tmp

* fix tests in arrow-cast.

* fix tests in pretty.

* fix style.

* Update snafu (#5930)

* Update Parquet thrift generated structures (#6045)

* update to latest thrift (as of 11 Jul 2024) from parquet-format

* pass None for optional size statistics

* escape HTML tags

* don't need to escape brackets in arrays

* Revert "Revert "Write Bloom filters between row groups instead of the end  (#…" (#5933)

This reverts commit 22e0b44.

* Revert "Update snafu (#5930)" (#6069)

This reverts commit 756b1fb.

* Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)

* Update pyo3 requirement from 0.21.1 to 0.22.1

Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit the latest version.
- [Release notes](https://github.com/pyo3/pyo3/releases)
- [Changelog](https://github.com/PyO3/pyo3/blob/main/CHANGELOG.md)
- [Commits](PyO3/pyo3@v0.21.1...v0.22.1)

---
updated-dependencies:
- dependency-name: pyo3
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* refactor: remove deprecated `FromPyArrow::from_pyarrow`

"GIL Refs" are being phased out.

* chore: update `pyo3` in integration tests

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* remove repeated codes to make the codes more concise. (#6080)

* Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)

* update to latest thrift (as of 11 Jul 2024) from parquet-format

* pass None for optional size statistics

* escape HTML tags

* don't need to escape brackets in arrays

* add support for unencoded_byte_array_data_bytes

* add comments

* change sig of ColumnMetrics::update_variable_length_bytes()

* rename ParquetOffsetIndex to OffsetSizeIndex

* rename some functions

* suggestion from review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* add Default trait to ColumnMetrics as suggested in review

* rename OffsetSizeIndex to OffsetIndexMetaData

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)

Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit the latest version.
- [Release notes](https://github.com/pyo3/pyo3/releases)
- [Changelog](https://github.com/PyO3/pyo3/blob/v0.22.2/CHANGELOG.md)
- [Commits](PyO3/pyo3@v0.21.1...v0.22.2)

---
updated-dependencies:
- dependency-name: pyo3
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Deprecate read_page_locations() and simplify offset index in `ParquetMetaData` (#6095)

* deprecate read_page_locations

* add to_thrift() to OffsetIndexMetaData

* no longer write inline column metadata

* Update parquet/src/column/writer/mod.rs

Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>

* suggestion from review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* add some more documentation

* remove write_metadata from PageWriter

---------

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
Co-authored-by: Xiangpeng Hao <haoxiangpeng123@gmail.com>
Co-authored-by: kamille <caoruiqiu.crq@antgroup.com>
Co-authored-by: Jesse <github@jessebakker.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
8 people authored Aug 2, 2024
1 parent ee6fb87 commit f2de2cd
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 42 deletions.
7 changes: 1 addition & 6 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::column::writer::{
};
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
use crate::file::metadata::{KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
Expand Down Expand Up @@ -489,11 +489,6 @@ impl PageWriter for ArrowPageWriter {
Ok(spec)
}

fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> {
// Skip writing metadata as won't be copied anyway
Ok(())
}

fn close(&mut self) -> Result<()> {
Ok(())
}
Expand Down
8 changes: 1 addition & 7 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;

use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::file::statistics::Statistics;
use crate::format::PageHeader;

/// Parquet Page definition.
Expand Down Expand Up @@ -350,12 +350,6 @@ pub trait PageWriter: Send {
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;

/// Writes column chunk metadata into the output stream/sink.
///
/// This method is called once before page writer is closed, normally when writes are
/// finalised in column writer.
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()>;

/// Closes resources and flushes underlying sink.
/// Page writer should not be used after this method is called.
fn close(&mut self) -> Result<()>;
Expand Down
18 changes: 3 additions & 15 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.write_dictionary_page()?;
}
self.flush_data_pages()?;
let metadata = self.write_column_metadata()?;
let metadata = self.build_column_metadata()?;
self.page_writer.close()?;

let boundary_order = match (
Expand Down Expand Up @@ -1041,24 +1041,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Ok(())
}

/// Assembles and writes column chunk metadata.
fn write_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
/// Assembles column chunk metadata.
fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
let total_compressed_size = self.column_metrics.total_compressed_size as i64;
let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
let num_values = self.column_metrics.total_num_values as i64;
let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
// If data page offset is not set, then no pages have been written
let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;

let file_offset = match dict_page_offset {
Some(dict_offset) => dict_offset + total_compressed_size,
None => data_page_offset + total_compressed_size,
};

let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
.set_encodings(self.encodings.iter().cloned().collect())
.set_file_offset(file_offset)
.set_total_compressed_size(total_compressed_size)
.set_total_uncompressed_size(total_uncompressed_size)
.set_num_values(num_values)
Expand Down Expand Up @@ -1138,8 +1132,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

let metadata = builder.build()?;
self.page_writer.write_metadata(&metadata)?;

Ok(metadata)
}

Expand Down Expand Up @@ -3589,10 +3581,6 @@ mod tests {
Ok(res)
}

fn write_metadata(&mut self, _metadata: &ColumnChunkMetaData) -> Result<()> {
Ok(())
}

fn close(&mut self) -> Result<()> {
Ok(())
}
Expand Down
16 changes: 14 additions & 2 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,12 @@ impl ColumnChunkMetaData {
self.file_path.as_deref()
}

/// Byte offset in `file_path()`.
/// Byte offset of `ColumnMetaData` in `file_path()`.
///
/// Note that the meaning of this field has been inconsistent between implementations
/// so its use has since been deprecated in the Parquet specification. Modern implementations
/// will set this to `0` to indicate that the `ColumnMetaData` is solely contained in the
/// `ColumnChunk` struct.
pub fn file_offset(&self) -> i64 {
self.file_offset
}
Expand Down Expand Up @@ -1040,6 +1045,14 @@ impl ColumnChunkMetaDataBuilder {
}

/// Sets file offset in bytes.
///
/// This field was meant to provide an alternate to storing `ColumnMetadata` directly in
/// the `ColumnChunkMetadata`. However, most Parquet readers assume the `ColumnMetadata`
/// is stored inline and ignore this field.
#[deprecated(
since = "53.0.0",
note = "The Parquet specification requires this field to be 0"
)]
pub fn set_file_offset(mut self, value: i64) -> Self {
self.0.file_offset = value;
self
Expand Down Expand Up @@ -1453,7 +1466,6 @@ mod tests {
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
.set_encodings(vec![Encoding::PLAIN, Encoding::RLE])
.set_file_path("file_path".to_owned())
.set_file_offset(100)
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
.set_total_compressed_size(2000)
Expand Down
12 changes: 0 additions & 12 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,13 +649,10 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
));
}

let file_offset = self.buf.bytes_written() as i64;

let map_offset = |x| x - src_offset + write_offset as i64;
let mut builder = ColumnChunkMetaData::builder(metadata.column_descr_ptr())
.set_compression(metadata.compression())
.set_encodings(metadata.encodings().clone())
.set_file_offset(file_offset)
.set_total_compressed_size(metadata.compressed_size())
.set_total_uncompressed_size(metadata.uncompressed_size())
.set_num_values(metadata.num_values())
Expand All @@ -680,7 +677,6 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
}
}

SerializedPageWriter::new(self.buf).write_metadata(&metadata)?;
let (_, on_close) = self.get_on_close();
on_close(close)
}
Expand Down Expand Up @@ -808,14 +804,6 @@ impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
Ok(spec)
}

fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
metadata
.to_column_metadata_thrift()
.write_to_out_protocol(&mut protocol)?;
Ok(())
}

fn close(&mut self) -> Result<()> {
self.sink.flush()?;
Ok(())
Expand Down

0 comments on commit f2de2cd

Please sign in to comment.