Skip to content

Commit

Permalink
feat: make the bloom filter optional in sst meta
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Dec 19, 2022
1 parent beedea2 commit c175737
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 28 deletions.
12 changes: 3 additions & 9 deletions analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ pub enum Error {
#[snafu(display("Storage format options are not found.\nBacktrace\n:{}", backtrace))]
StorageFormatOptionsNotFound { backtrace: Backtrace },

#[snafu(display("Bloom filter options are not found.\nBacktrace\n:{}", backtrace))]
BloomFilterNotFound { backtrace: Backtrace },

#[snafu(display(
"Bloom filter should be 256 byte, current:{}.\nBacktrace\n:{}",
size,
Expand Down Expand Up @@ -516,7 +513,7 @@ pub struct SstMetaData {
// total row number
pub row_num: u64,
pub storage_format_opts: StorageFormatOptions,
pub bloom_filter: BloomFilter,
pub bloom_filter: Option<BloomFilter>,
}

pub type SstMetaDataRef = Arc<SstMetaData>;
Expand All @@ -538,7 +535,7 @@ impl From<SstMetaData> for sst_pb::SstMetaData {
size: src.size,
row_num: src.row_num,
storage_format_opts: Some(src.storage_format_opts.into()),
bloom_filter: Some(src.bloom_filter.into()),
bloom_filter: src.bloom_filter.map(|v| v.into()),
}
}
}
Expand All @@ -559,10 +556,7 @@ impl TryFrom<sst_pb::SstMetaData> for SstMetaData {
src.storage_format_opts
.context(StorageFormatOptionsNotFound)?,
);
let bloom_filter = {
let pb_filter = src.bloom_filter.context(BloomFilterNotFound)?;
BloomFilter::try_from(pb_filter)?
};
let bloom_filter = src.bloom_filter.map(BloomFilter::try_from).transpose()?;

Ok(Self {
min_key: src.min_key.into(),
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ impl<'a> Reader<'a> {
&self,
schema: SchemaRef,
row_groups: &[RowGroupMetaData],
bloom_filter: &BloomFilter,
bloom_filter: &Option<BloomFilter>,
) -> Result<Vec<usize>> {
let filter = RowGroupFilter::try_new(
&schema,
row_groups,
bloom_filter.filters(),
bloom_filter.as_ref().map(|v| v.filters()),
self.predicate.exprs(),
)?;

Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ impl RecordBytesReader {

async fn read_all(mut self) -> Result<Vec<u8>> {
self.partition_record_batch().await?;
let filters = self.build_bloom_filter();
self.meta_data.bloom_filter = filters;
let filter = self.build_bloom_filter();
self.meta_data.bloom_filter = Some(filter);

let mut parquet_encoder = ParquetEncoder::try_new(
self.num_rows_per_row_group,
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl ProjectAndFilterReader {
let filter = RowGroupFilter::try_new(
self.schema.as_arrow_schema_ref(),
row_groups,
self.meta_data.bloom_filter.filters(),
self.meta_data.bloom_filter.as_ref().map(|v| v.filters()),
self.predicate.exprs(),
)?;

Expand Down
33 changes: 19 additions & 14 deletions analytic_engine/src/sst/parquet/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@ use crate::sst::reader::error::{OtherNoCause, Result};
pub struct RowGroupFilter<'a> {
schema: &'a SchemaRef,
row_groups: &'a [RowGroupMetaData],
blooms: &'a [Vec<Bloom>],
blooms: Option<&'a [Vec<Bloom>]>,
predicates: &'a [Expr],
}

impl<'a> RowGroupFilter<'a> {
pub fn try_new(
schema: &'a SchemaRef,
row_groups: &'a [RowGroupMetaData],
blooms: &'a [Vec<Bloom>],
blooms: Option<&'a [Vec<Bloom>]>,
predicates: &'a [Expr],
) -> Result<Self> {
ensure!(blooms.len() == row_groups.len(), OtherNoCause {
msg: format!("expect the same number of bloom filter as the number of row groups, num_bloom_filters:{}, num_row_groups:{}", blooms.len(), row_groups.len()),
});
if let Some(blooms) = blooms {
ensure!(blooms.len() == row_groups.len(), OtherNoCause {
msg: format!("expect the same number of bloom filter as the number of row groups, num_bloom_filters:{}, num_row_groups:{}", blooms.len(), row_groups.len()),
});
}

Ok(Self {
schema,
Expand All @@ -49,24 +51,27 @@ impl<'a> RowGroupFilter<'a> {

pub fn filter(&self) -> Vec<usize> {
let filtered0 = self.filter_by_min_max();
// TODO: We can do continuous filtering based on the `filtered0` to reduce the
// filtering cost.
let filtered1 = self.filter_by_bloom();
Self::intersect_filtered_row_groups(&filtered0, &filtered1)
match self.blooms {
Some(v) => {
// TODO: We can do continuous filtering based on the `filtered0` to reduce the
// filtering cost.
let filtered1 = self.filter_by_bloom(v);
Self::intersect_filtered_row_groups(&filtered0, &filtered1)
}
None => filtered0,
}
}

fn filter_by_min_max(&self) -> Vec<usize> {
min_max::filter_row_groups(self.schema.clone(), self.predicates, self.row_groups)
}

fn filter_by_bloom(&self) -> Vec<usize> {
/// Filter row groups according to the bloom filter.
fn filter_by_bloom(&self, blooms: &[Vec<Bloom>]) -> Vec<usize> {
let is_equal =
|col_pos: ColumnPosition, val: &ScalarValue, negated: bool| -> Option<bool> {
let datum = Datum::from_scalar_value(val)?;
let col_bloom = self
.blooms
.get(col_pos.row_group_idx)?
.get(col_pos.column_idx)?;
let col_bloom = blooms.get(col_pos.row_group_idx)?.get(col_pos.column_idx)?;
let exist = col_bloom.contains_input(Input::Raw(&datum.to_bytes()));
if exist {
// bloom filter has false positivity, that is to say we are unsure whether this
Expand Down

0 comments on commit c175737

Please sign in to comment.