From 09299f909ab1198ce34011f2ef37c5ce8d4448ce Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Fri, 16 Dec 2022 12:13:17 +0800 Subject: [PATCH] feat: make the bloom filter optional in sst meta --- analytic_engine/src/sst/file.rs | 12 ++----- .../src/sst/parquet/async_reader.rs | 4 +-- analytic_engine/src/sst/parquet/builder.rs | 4 +-- analytic_engine/src/sst/parquet/reader.rs | 2 +- .../src/sst/parquet/row_group_filter.rs | 33 +++++++++++-------- 5 files changed, 27 insertions(+), 28 deletions(-) diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 3a62a6f706..37b92e0289 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -57,9 +57,6 @@ pub enum Error { #[snafu(display("Storage format options are not found.\nBacktrace\n:{}", backtrace))] StorageFormatOptionsNotFound { backtrace: Backtrace }, - #[snafu(display("Bloom filter options are not found.\nBacktrace\n:{}", backtrace))] - BloomFilterNotFound { backtrace: Backtrace }, - #[snafu(display( "Bloom filter should be 256 byte, current:{}.\nBacktrace\n:{}", size, @@ -516,7 +513,7 @@ pub struct SstMetaData { // total row number pub row_num: u64, pub storage_format_opts: StorageFormatOptions, - pub bloom_filter: BloomFilter, + pub bloom_filter: Option, } pub type SstMetaDataRef = Arc; @@ -538,7 +535,7 @@ impl From for sst_pb::SstMetaData { size: src.size, row_num: src.row_num, storage_format_opts: Some(src.storage_format_opts.into()), - bloom_filter: Some(src.bloom_filter.into()), + bloom_filter: src.bloom_filter.map(|v| v.into()), } } } @@ -559,10 +556,7 @@ impl TryFrom for SstMetaData { src.storage_format_opts .context(StorageFormatOptionsNotFound)?, ); - let bloom_filter = { - let pb_filter = src.bloom_filter.context(BloomFilterNotFound)?; - BloomFilter::try_from(pb_filter)? - }; + let bloom_filter = src.bloom_filter.map(BloomFilter::try_from).transpose()?; Ok(Self { min_key: src.min_key.into(), diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 7c59d34c04..338514cd86 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -80,12 +80,12 @@ impl<'a> Reader<'a> { &self, schema: SchemaRef, row_groups: &[RowGroupMetaData], - bloom_filter: &BloomFilter, + bloom_filter: &Option, ) -> Result> { let filter = RowGroupFilter::try_new( &schema, row_groups, - bloom_filter.filters(), + bloom_filter.as_ref().map(|v| v.filters()), self.predicate.exprs(), )?; diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index cc5da7adc7..6fa9330937 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -159,8 +159,8 @@ impl RecordBytesReader { async fn read_all(mut self) -> Result> { self.partition_record_batch().await?; - let filters = self.build_bloom_filter(); - self.meta_data.bloom_filter = filters; + let filter = self.build_bloom_filter(); + self.meta_data.bloom_filter = Some(filter); let mut parquet_encoder = ParquetEncoder::try_new( self.num_rows_per_row_group, diff --git a/analytic_engine/src/sst/parquet/reader.rs b/analytic_engine/src/sst/parquet/reader.rs index 4c24db1a0b..dc4914b109 100644 --- a/analytic_engine/src/sst/parquet/reader.rs +++ b/analytic_engine/src/sst/parquet/reader.rs @@ -252,7 +252,7 @@ impl ProjectAndFilterReader { let filter = RowGroupFilter::try_new( self.schema.as_arrow_schema_ref(), row_groups, - self.meta_data.bloom_filter.filters(), + self.meta_data.bloom_filter.as_ref().map(|v| v.filters()), self.predicate.exprs(), )?; diff --git a/analytic_engine/src/sst/parquet/row_group_filter.rs b/analytic_engine/src/sst/parquet/row_group_filter.rs index 61d01d310a..33cd967a47 100644 --- a/analytic_engine/src/sst/parquet/row_group_filter.rs +++ b/analytic_engine/src/sst/parquet/row_group_filter.rs @@ -24,7 +24,7 @@ use crate::sst::reader::error::{OtherNoCause, Result}; pub struct RowGroupFilter<'a> { schema: &'a SchemaRef, row_groups: &'a [RowGroupMetaData], - blooms: &'a [Vec], + blooms: Option<&'a [Vec]>, predicates: &'a [Expr], } @@ -32,12 +32,14 @@ impl<'a> RowGroupFilter<'a> { pub fn try_new( schema: &'a SchemaRef, row_groups: &'a [RowGroupMetaData], - blooms: &'a [Vec], + blooms: Option<&'a [Vec]>, predicates: &'a [Expr], ) -> Result { - ensure!(blooms.len() == row_groups.len(), OtherNoCause { - msg: format!("expect the same number of bloom filter as the number of row groups, num_bloom_filters:{}, num_row_groups:{}", blooms.len(), row_groups.len()), - }); + if let Some(blooms) = blooms { + ensure!(blooms.len() == row_groups.len(), OtherNoCause { + msg: format!("expect the same number of bloom filter as the number of row groups, num_bloom_filters:{}, num_row_groups:{}", blooms.len(), row_groups.len()), + }); + } Ok(Self { schema, @@ -49,24 +51,27 @@ impl<'a> RowGroupFilter<'a> { pub fn filter(&self) -> Vec { let filtered0 = self.filter_by_min_max(); - // TODO: We can do continuous filtering based on the `filtered0` to reduce the - // filtering cost. - let filtered1 = self.filter_by_bloom(); - Self::intersect_filtered_row_groups(&filtered0, &filtered1) + match self.blooms { + Some(v) => { + // TODO: We can do continuous filtering based on the `filtered0` to reduce the + // filtering cost. + let filtered1 = self.filter_by_bloom(v); + Self::intersect_filtered_row_groups(&filtered0, &filtered1) + } + None => filtered0, + } } fn filter_by_min_max(&self) -> Vec { min_max::filter_row_groups(self.schema.clone(), self.predicates, self.row_groups) } - fn filter_by_bloom(&self) -> Vec { + /// Filter row groups according to the bloom filter. + fn filter_by_bloom(&self, blooms: &[Vec]) -> Vec { let is_equal = |col_pos: ColumnPosition, val: &ScalarValue, negated: bool| -> Option { let datum = Datum::from_scalar_value(val)?; - let col_bloom = self - .blooms - .get(col_pos.row_group_idx)? - .get(col_pos.column_idx)?; + let col_bloom = blooms.get(col_pos.row_group_idx)?.get(col_pos.column_idx)?; let exist = col_bloom.contains_input(Input::Raw(&datum.to_bytes())); if exist { // bloom filter has false positivity, that is to say we are unsure whether this