Skip to content

Commit

Permalink
Add optional page row count limit for parquet WriterProperties (#2941
Browse files Browse the repository at this point in the history
…) (#2942)

* Add page row count limit (#2941)

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
tustvold and alamb authored Oct 27, 2022
1 parent 66ea66b commit 880c4d9
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 6 deletions.
6 changes: 4 additions & 2 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
//
// In such a scenario the dictionary decoder may return an estimated encoded
// size in excess of the page size limit, even when there are no buffered values
if self.encoder.num_values() == 0 {
if self.page_metrics.num_buffered_values == 0 {
return false;
}

self.encoder.estimated_data_page_size() >= self.props.data_pagesize_limit()
self.page_metrics.num_buffered_rows as usize
>= self.props.data_page_row_count_limit()
|| self.encoder.estimated_data_page_size() >= self.props.data_pagesize_limit()
}

/// Performs dictionary fallback.
Expand Down
43 changes: 40 additions & 3 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub type WriterPropertiesPtr = Arc<WriterProperties>;
pub struct WriterProperties {
data_pagesize_limit: usize,
dictionary_pagesize_limit: usize,
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
writer_version: WriterVersion,
Expand All @@ -112,15 +113,29 @@ impl WriterProperties {
}

/// Returns data page size limit.
///
/// Note: this is a best effort limit based on the write batch size
pub fn data_pagesize_limit(&self) -> usize {
self.data_pagesize_limit
}

/// Returns dictionary page size limit.
///
/// Note: this is a best effort limit based on the write batch size
pub fn dictionary_pagesize_limit(&self) -> usize {
self.dictionary_pagesize_limit
}

/// Returns the maximum page row count
///
/// This can be used to limit the number of rows within a page to
/// yield better page pruning
///
/// Note: this is a best effort limit based on the write batch size
pub fn data_page_row_count_limit(&self) -> usize {
self.data_page_row_count_limit
}

/// Returns configured batch size for writes.
///
/// When writing a batch of data, this setting allows to split it internally into
Expand Down Expand Up @@ -222,6 +237,7 @@ impl WriterProperties {
pub struct WriterPropertiesBuilder {
data_pagesize_limit: usize,
dictionary_pagesize_limit: usize,
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
writer_version: WriterVersion,
Expand All @@ -237,6 +253,7 @@ impl WriterPropertiesBuilder {
Self {
data_pagesize_limit: DEFAULT_PAGE_SIZE,
dictionary_pagesize_limit: DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT,
data_page_row_count_limit: usize::MAX,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
writer_version: DEFAULT_WRITER_VERSION,
Expand All @@ -252,6 +269,7 @@ impl WriterPropertiesBuilder {
WriterProperties {
data_pagesize_limit: self.data_pagesize_limit,
dictionary_pagesize_limit: self.dictionary_pagesize_limit,
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
writer_version: self.writer_version,
Expand All @@ -271,19 +289,38 @@ impl WriterPropertiesBuilder {
self
}

/// Sets data page size limit.
/// Sets best effort maximum size of a data page in bytes
///
/// Note: this is a best effort limit based on the write batch size
pub fn set_data_pagesize_limit(mut self, value: usize) -> Self {
self.data_pagesize_limit = value;
self
}

/// Sets dictionary page size limit.
/// Sets best effort maximum number of rows in a data page
///
///
/// This can be used to limit the number of rows within a page to
/// yield better page pruning
///
/// Note: this is a best effort limit based on the write batch size
pub fn set_data_page_row_count_limit(mut self, value: usize) -> Self {
self.data_page_row_count_limit = value;
self
}

/// Sets best effort maximum dictionary page size, in bytes
///
/// Note: this is a best effort limit based on the write batch size
pub fn set_dictionary_pagesize_limit(mut self, value: usize) -> Self {
self.dictionary_pagesize_limit = value;
self
}

/// Sets write batch size.
/// Sets write batch size
///
/// Data is written in batches of this size, acting as an upper-bound on
/// the enforcement granularity of page limits
pub fn set_write_batch_size(mut self, value: usize) -> Self {
self.write_batch_size = value;
self
Expand Down
30 changes: 29 additions & 1 deletion parquet/tests/arrow_writer_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ fn test_primitive() {

do_test(LayoutTest {
props,
batches: vec![batch],
batches: vec![batch.clone()],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
Expand Down Expand Up @@ -308,6 +308,34 @@ fn test_primitive() {
}],
},
});

// Test row count limit
let props = WriterProperties::builder()
.set_dictionary_enabled(false)
.set_data_page_row_count_limit(100)
.set_write_batch_size(100)
.build();

do_test(LayoutTest {
props,
batches: vec![batch],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: (0..20)
.map(|_| Page {
rows: 100,
page_header_size: 34,
compressed_size: 400,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
})
.collect(),
dictionary_page: None,
}],
}],
},
});
}

#[test]
Expand Down

0 comments on commit 880c4d9

Please sign in to comment.