Skip to content

Commit

Permalink
feat(5851): ArrowWriter memory usage (#5967)
Browse files Browse the repository at this point in the history
* refactor(5851): delineate the different memory estimates APIs for the ArrowWriter and column writers

* feat(5851): add memory size estimates to the ColumnValueEncoder implementations and the DictEncoder

* test(5851): add memory_size() to in-progress test

* chore(5851): update docs to make it more explicit what is the difference btwn memory_size vs get_estimated_total_byte

* feat(5851): clarify the ColumnValueEncoder::estimated_memory_size interface, and update impls to account for bloom filter size

* feat(5851): account for stats array size in the ByteArrayEncoder

* Refine documentation

* More accurate memory estimation

* Improve tests

* Update accounting for non dict encoded data

* Include more memory size calculations

* clean up async writer

* clippy

* tweak

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
wiedld and alamb authored Jul 2, 2024
1 parent e61fb62 commit 5c6f857
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 18 deletions.
38 changes: 38 additions & 0 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl FallbackEncoder {
}
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize {
match &self.encoder {
FallbackEncoderImpl::Plain { buffer, .. } => buffer.len(),
Expand Down Expand Up @@ -304,6 +308,12 @@ impl Storage for ByteArrayStorage {

key as u64
}

#[allow(dead_code)] // not used in parquet_derive, so is dead there
fn estimated_memory_size(&self) -> usize {
self.page.capacity() * std::mem::size_of::<u8>()
+ self.values.capacity() * std::mem::size_of::<std::ops::Range<usize>>()
}
}

/// A dictionary encoder for byte array data
Expand Down Expand Up @@ -334,6 +344,10 @@ impl DictEncoder {
num_required_bits(length.saturating_sub(1) as u64)
}

fn estimated_memory_size(&self) -> usize {
self.interner.estimated_memory_size() + self.indices.capacity() * std::mem::size_of::<u64>()
}

fn estimated_data_page_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
Expand Down Expand Up @@ -443,10 +457,34 @@ impl ColumnValueEncoder for ByteArrayEncoder {
self.dict_encoder.is_some()
}

fn estimated_memory_size(&self) -> usize {
let encoder_size = match &self.dict_encoder {
Some(encoder) => encoder.estimated_memory_size(),
// For the FallbackEncoder, these unflushed bytes are already encoded.
// Therefore, the size should be the same as estimated_data_page_size.
None => self.fallback.estimated_data_page_size(),
};

let bloom_filter_size = self
.bloom_filter
.as_ref()
.map(|bf| bf.estimated_memory_size())
.unwrap_or_default();

let stats_size = self.min_value.as_ref().map(|v| v.len()).unwrap_or_default()
+ self.max_value.as_ref().map(|v| v.len()).unwrap_or_default();

encoder_size + bloom_filter_size + stats_size
}

fn estimated_dict_page_size(&self) -> Option<usize> {
Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
}

/// Returns an estimate of the data page size in bytes
///
/// This includes:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.estimated_data_page_size(),
Expand Down
83 changes: 71 additions & 12 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,20 @@ mod levels;
///
/// ## Memory Limiting
///
/// The nature of parquet forces buffering of an entire row group before it can be flushed
/// to the underlying writer. Data is buffered in its encoded form, to reduce memory usage,
/// but if writing rows containing large strings or very nested data, this may still result in
/// non-trivial memory usage.
/// The nature of parquet forces buffering of an entire row group before it can
/// be flushed to the underlying writer. Data is mostly buffered in its encoded
/// form, reducing memory usage. However, some data such as dictionary keys or
/// large strings or very nested data may still result in non-trivial memory
/// usage.
///
/// [`ArrowWriter::in_progress_size`] can be used to track the size of the buffered row group,
/// and potentially trigger an early flush of a row group based on a memory threshold and/or
/// global memory pressure. However, users should be aware that smaller row groups will result
/// in higher metadata overheads, and may worsen compression ratios and query performance.
/// See Also:
/// * [`ArrowWriter::memory_size`]: the current memory usage of the writer.
/// * [`ArrowWriter::in_progress_size`]: Estimated size of the buffered row group,
///
/// Call [`Self::flush`] to trigger an early flush of a row group based on a
/// memory threshold and/or global memory pressure. However, smaller row groups
/// result in higher metadata overheads, and thus may worsen compression ratios
/// and query performance.
///
/// ```no_run
/// # use std::io::Write;
Expand All @@ -101,7 +106,7 @@ mod levels;
/// # let mut writer: ArrowWriter<Vec<u8>> = todo!();
/// # let batch: RecordBatch = todo!();
/// writer.write(&batch).unwrap();
/// // Trigger an early flush if buffered size exceeds 1_000_000
/// // Trigger an early flush if anticipated size exceeds 1_000_000
/// if writer.in_progress_size() > 1_000_000 {
/// writer.flush().unwrap();
/// }
Expand Down Expand Up @@ -203,7 +208,23 @@ impl<W: Write + Send> ArrowWriter<W> {
self.writer.flushed_row_groups()
}

/// Returns the estimated length in bytes of the current in progress row group
/// Estimated memory usage, in bytes, of this `ArrowWriter`
///
/// This estimate is formed bu summing the values of
/// [`ArrowColumnWriter::memory_size`] all in progress columns.
pub fn memory_size(&self) -> usize {
match &self.in_progress {
Some(in_progress) => in_progress.writers.iter().map(|x| x.memory_size()).sum(),
None => 0,
}
}

/// Anticipated encoded size of the in progress row group.
///
/// This estimate the row group size after being completely encoded is,
/// formed by summing the values of
/// [`ArrowColumnWriter::get_estimated_total_bytes`] for all in progress
/// columns.
pub fn in_progress_size(&self) -> usize {
match &self.in_progress {
Some(in_progress) => in_progress
Expand Down Expand Up @@ -629,7 +650,30 @@ impl ArrowColumnWriter {
Ok(ArrowColumnChunk { data, close })
}

/// Returns the estimated total bytes for this column writer
/// Returns the estimated total memory usage by the writer.
///
/// This [`Self::get_estimated_total_bytes`] this is an estimate
/// of the current memory usage and not it's anticipated encoded size.
///
/// This includes:
/// 1. Data buffered in encoded form
/// 2. Data buffered in un-encoded form (e.g. `usize` dictionary keys)
///
/// This value should be greater than or equal to [`Self::get_estimated_total_bytes`]
pub fn memory_size(&self) -> usize {
match &self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.memory_size(),
ArrowColumnWriterImpl::Column(c) => c.memory_size(),
}
}

/// Returns the estimated total encoded bytes for this column writer.
///
/// This includes:
/// 1. Data buffered in encoded form
/// 2. An estimate of how large the data buffered in un-encoded form would be once encoded
///
/// This value should be less than or equal to [`Self::memory_size`]
pub fn get_estimated_total_bytes(&self) -> usize {
match &self.writer {
ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _,
Expand Down Expand Up @@ -2894,24 +2938,39 @@ mod tests {
// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.memory_size(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial header
writer.write(&batch).unwrap();

// updated on write
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), 5);
let initial_memory = writer.memory_size();
assert!(initial_memory > 0);
// memory estimate is larger than estimated encoded size
assert!(
initial_size <= initial_memory,
"{initial_size} <= {initial_memory}"
);

// updated on second write
writer.write(&batch).unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), 10);
assert!(writer.memory_size() > initial_memory);
assert!(
writer.in_progress_size() <= writer.memory_size(),
"in_progress_size {} <= memory_size {}",
writer.in_progress_size(),
writer.memory_size()
);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.memory_size(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().unwrap();
Expand Down
25 changes: 24 additions & 1 deletion parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,16 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
self.sync_writer.flushed_row_groups()
}

/// Returns the estimated length in bytes of the current in progress row group
/// Estimated memory usage, in bytes, of this `ArrowWriter`
///
/// See [ArrowWriter::memory_size] for more information.
pub fn memory_size(&self) -> usize {
self.sync_writer.memory_size()
}

/// Anticipated encoded size of the in progress row group.
///
/// See [ArrowWriter::memory_size] for more information.
pub fn in_progress_size(&self) -> usize {
self.sync_writer.in_progress_size()
}
Expand Down Expand Up @@ -419,16 +428,30 @@ mod tests {
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), batch.num_rows());
let initial_memory = writer.memory_size();
// memory estimate is larger than estimated encoded size
assert!(
initial_size <= initial_memory,
"{initial_size} <= {initial_memory}"
);

// updated on second write
writer.write(&batch).await.unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);
assert!(writer.memory_size() > initial_memory);
assert!(
writer.in_progress_size() <= writer.memory_size(),
"in_progress_size {} <= memory_size {}",
writer.in_progress_size(),
writer.memory_size()
);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().await.unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.memory_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ impl Sbbf {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
}

/// Return the total in memory size of this bloom filter in bytes
pub(crate) fn estimated_memory_size(&self) -> usize {
self.0.capacity() * std::mem::size_of::<Block>()
}
}

// per spec we use xxHash with seed=0
Expand Down
29 changes: 27 additions & 2 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,17 @@ pub trait ColumnValueEncoder {
/// Returns true if this encoder has a dictionary page
fn has_dictionary(&self) -> bool;

/// Returns an estimate of the dictionary page size in bytes, or `None` if no dictionary
/// Returns the estimated total memory usage of the encoder
///
fn estimated_memory_size(&self) -> usize;

/// Returns an estimate of the encoded size of dictionary page size in bytes, or `None` if no dictionary
fn estimated_dict_page_size(&self) -> Option<usize>;

/// Returns an estimate of the data page size in bytes
/// Returns an estimate of the encoded data page size in bytes
///
/// This should include:
/// <already_written_encoded_byte_size> + <estimated_encoded_size_of_unflushed_bytes>
fn estimated_data_page_size(&self) -> usize;

/// Flush the dictionary page for this column chunk if any. Any subsequent calls to
Expand Down Expand Up @@ -227,6 +234,24 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
self.dict_encoder.is_some()
}

fn estimated_memory_size(&self) -> usize {
let encoder_size = self.encoder.estimated_memory_size();

let dict_encoder_size = self
.dict_encoder
.as_ref()
.map(|encoder| encoder.estimated_memory_size())
.unwrap_or_default();

let bloom_filter_size = self
.bloom_filter
.as_ref()
.map(|bf| bf.estimated_memory_size())
.unwrap_or_default();

encoder_size + dict_encoder_size + bloom_filter_size
}

fn estimated_dict_page_size(&self) -> Option<usize> {
Some(self.dict_encoder.as_ref()?.dict_encoded_size())
}
Expand Down
22 changes: 19 additions & 3 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ pub enum ColumnWriter<'a> {
}

impl<'a> ColumnWriter<'a> {
/// Returns the estimated total bytes for this column writer
/// Returns the estimated total memory usage
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
downcast_writer!(self, typed, typed.memory_size())
}

/// Returns the estimated total encoded bytes for this column writer
#[cfg(feature = "arrow")]
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
downcast_writer!(self, typed, typed.get_estimated_total_bytes())
Expand Down Expand Up @@ -419,6 +425,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
)
}

/// Returns the estimated total memory usage.
///
/// Unlike [`Self::get_estimated_total_bytes`] this is an estimate
/// of the current memory usage and not the final anticipated encoded size.
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
}

/// Returns total number of bytes written by this column writer so far.
/// This value is also returned when column writer is closed.
///
Expand All @@ -428,10 +443,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_metrics.total_bytes_written
}

/// Returns the estimated total bytes for this column writer
/// Returns the estimated total encoded bytes for this column writer.
///
/// Unlike [`Self::get_total_bytes_written`] this includes an estimate
/// of any data that has not yet been flushed to a page
/// of any data that has not yet been flushed to a page, based on it's
/// anticipated encoded size.
#[cfg(feature = "arrow")]
pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
self.column_metrics.total_bytes_written
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/encodings/encoding/byte_stream_split_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,9 @@ impl<T: DataType> Encoder<T> for ByteStreamSplitEncoder<T> {
self.buffer.clear();
Ok(encoded.into())
}

/// return the estimated memory size of this encoder.
fn estimated_memory_size(&self) -> usize {
self.buffer.capacity() * std::mem::size_of::<u8>()
}
}
Loading

0 comments on commit 5c6f857

Please sign in to comment.