From fc4bad85e2b62634a2570518126329672007e3bc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 9 Dec 2022 21:10:30 +0000 Subject: [PATCH] Use BufWriter when writing bloom filters (#3318) Disable bloom filters for most tests --- parquet/src/arrow/arrow_writer/mod.rs | 55 +++++++++++++++++++++++---- parquet/src/bloom_filter/mod.rs | 7 +++- parquet/src/file/properties.rs | 14 +++---- 3 files changed, 59 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index a609b992a393..53ca71d28077 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1225,16 +1225,44 @@ mod tests { file } + struct RoundTripOptions { + values: ArrayRef, + schema: SchemaRef, + bloom_filter: bool, + } + + impl RoundTripOptions { + fn new(values: ArrayRef, nullable: bool) -> Self { + let data_type = values.data_type().clone(); + let schema = Schema::new(vec![Field::new("col", data_type, nullable)]); + Self { + values, + schema: Arc::new(schema), + bloom_filter: false, + } + } + } + fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec { - let data_type = values.data_type().clone(); - let schema = Schema::new(vec![Field::new("col", data_type, nullable)]); - one_column_roundtrip_with_schema(values, Arc::new(schema)) + one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable)) } fn one_column_roundtrip_with_schema( values: ArrayRef, schema: SchemaRef, ) -> Vec { + let mut options = RoundTripOptions::new(values, false); + options.schema = schema; + one_column_roundtrip_with_options(options) + } + + fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec { + let RoundTripOptions { + values, + schema, + bloom_filter, + } = options; + let encodings = match values.data_type() { DataType::Utf8 | DataType::LargeUtf8 @@ -1270,7 +1298,7 @@ mod tests { .set_dictionary_enabled(dictionary_size != 0) .set_dictionary_pagesize_limit(dictionary_size.max(1)) .set_encoding(*encoding) - .set_bloom_filter_enabled(true) + .set_bloom_filter_enabled(bloom_filter) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -1596,8 +1624,11 @@ mod tests { #[test] fn i32_column_bloom_filter() { - let positive_values: Vec = (0..SMALL_SIZE as i32).collect(); - let files = values_required::(positive_values); + let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); + let mut options = RoundTripOptions::new(array, false); + options.bloom_filter = true; + + let files = one_column_roundtrip_with_options(options); check_bloom_filter( files, "col".to_string(), @@ -1612,7 +1643,11 @@ mod tests { let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect(); let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice()); - let files = values_required::(many_vecs_iter); + let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter)); + let mut options = RoundTripOptions::new(array, false); + options.bloom_filter = true; + + let files = one_column_roundtrip_with_options(options); check_bloom_filter( files, "col".to_string(), @@ -1626,7 +1661,11 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect(); let raw_strs = raw_values.iter().map(|s| s.as_str()); - let files = values_optional::(raw_strs); + let array = Arc::new(StringArray::from_iter_values(raw_strs)); + let mut options = RoundTripOptions::new(array, false); + options.bloom_filter = true; + + let files = one_column_roundtrip_with_options(options); let optional_raw_values: Vec<_> = raw_values .iter() diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 5bb89bf3f4d2..9334fbd7a05c 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -28,7 +28,7 @@ use crate::format::{ }; use bytes::{Buf, Bytes}; use std::hash::Hasher; -use std::io::Write; +use std::io::{BufWriter, Write}; use std::sync::Arc; use thrift::protocol::{ TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable, @@ -177,7 +177,9 @@ impl Sbbf { } /// Write the bloom filter data (header and then bitset) to the output - pub(crate) fn write(&self, mut writer: W) -> Result<(), ParquetError> { + pub(crate) fn write(&self, writer: W) -> Result<(), ParquetError> { + // Use a BufWriter to avoid costs of writing individual blocks + let mut writer = BufWriter::new(writer); let mut protocol = TCompactOutputProtocol::new(&mut writer); let header = self.header(); header.write_to_out_protocol(&mut protocol).map_err(|e| { @@ -185,6 +187,7 @@ impl Sbbf { })?; protocol.flush()?; self.write_bitset(&mut writer)?; + writer.flush()?; Ok(()) } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index ae13eff201bd..7d20b736ea0c 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -630,7 +630,7 @@ struct ColumnProperties { statistics_enabled: Option, max_statistics_size: Option, /// bloom filter related properties - bloom_filter_properies: Option, + bloom_filter_properties: Option, } impl ColumnProperties { @@ -674,10 +674,10 @@ impl ColumnProperties { /// otherwise it is a no-op. /// If `value` is `false`, resets bloom filter properties to `None`. fn set_bloom_filter_enabled(&mut self, value: bool) { - if value && self.bloom_filter_properies.is_none() { - self.bloom_filter_properies = Some(Default::default()) + if value && self.bloom_filter_properties.is_none() { + self.bloom_filter_properties = Some(Default::default()) } else if !value { - self.bloom_filter_properies = None + self.bloom_filter_properties = None } } @@ -694,7 +694,7 @@ impl ColumnProperties { value ); - self.bloom_filter_properies + self.bloom_filter_properties .get_or_insert_with(Default::default) .fpp = value; } @@ -702,7 +702,7 @@ impl ColumnProperties { /// Sets the number of distinct (unique) values for bloom filter for this column, and implicitly /// enables bloom filter if not previously enabled. fn set_bloom_filter_ndv(&mut self, value: u64) { - self.bloom_filter_properies + self.bloom_filter_properties .get_or_insert_with(Default::default) .ndv = value; } @@ -737,7 +737,7 @@ impl ColumnProperties { /// Returns the bloom filter properties, or `None` if not enabled fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> { - self.bloom_filter_properies.as_ref() + self.bloom_filter_properties.as_ref() } }