Skip to content

Commit

Permalink
rename sst filter to parquet filter
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Feb 13, 2023
1 parent 842e5c9 commit b4191fa
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 83 deletions.
2 changes: 1 addition & 1 deletion analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ mod tests {
time_range,
max_sequence: 200,
schema: build_schema(),
sst_filter: Default::default(),
parquet_filter: Default::default(),
collapsible_cols_idx: Vec::new(),
};

Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl MetaData {
let mut sst_meta =
encoding::decode_sst_meta_data(&kv_metas[0]).context(DecodeCustomMetaData)?;
if ignore_sst_filter {
sst_meta.sst_filter = None;
sst_meta.parquet_filter = None;
}

Arc::new(sst_meta)
Expand Down
20 changes: 10 additions & 10 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::sst::{
metrics,
parquet::{
encoding::ParquetDecoder,
meta_data::{ParquetMetaDataRef, SstFilter},
meta_data::{ParquetFilter, ParquetMetaDataRef},
row_group_pruner::RowGroupPruner,
},
reader::{error::*, Result, SstReader},
Expand Down Expand Up @@ -147,10 +147,10 @@ impl<'a> Reader<'a> {
&self,
schema: SchemaRef,
row_groups: &[RowGroupMetaData],
sst_filter: Option<&SstFilter>,
parquet_filter: Option<&ParquetFilter>,
) -> Result<Vec<usize>> {
let pruner =
RowGroupPruner::try_new(&schema, row_groups, sst_filter, self.predicate.exprs())?;
RowGroupPruner::try_new(&schema, row_groups, parquet_filter, self.predicate.exprs())?;

Ok(pruner.prune())
}
Expand All @@ -165,38 +165,38 @@ impl<'a> Reader<'a> {
let row_projector = self.row_projector.as_ref().unwrap();

// Get target row groups.
let pruned_row_groups = self.prune_row_groups(
let target_row_groups = self.prune_row_groups(
meta_data.custom().schema.to_arrow_schema_ref(),
meta_data.parquet().row_groups(),
meta_data.custom().sst_filter.as_ref(),
meta_data.custom().parquet_filter.as_ref(),
)?;

info!(
"Reader fetch record batches, path:{}, row_groups total:{}, after prune:{}",
self.path,
meta_data.parquet().num_row_groups(),
pruned_row_groups.len(),
target_row_groups.len(),
);

if pruned_row_groups.is_empty() {
if target_row_groups.is_empty() {
return Ok(Vec::new());
}

// Partition the batches by `read_parallelism`.
let suggest_read_parallelism = read_parallelism;
let read_parallelism = std::cmp::min(pruned_row_groups.len(), suggest_read_parallelism);
let read_parallelism = std::cmp::min(target_row_groups.len(), suggest_read_parallelism);

// TODO: we only support read parallelly when `batch_size` ==
// `num_rows_per_row_group`, so this placing method is ok, we should
// adjust it when supporting it other situations.
let chunks_num = read_parallelism;
let chunk_size = pruned_row_groups.len() / read_parallelism + 1;
let chunk_size = target_row_groups.len() / read_parallelism + 1;
info!(
"Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}",
suggest_read_parallelism, read_parallelism, chunk_size
);
let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num];
for (row_group_idx, row_group) in pruned_row_groups.into_iter().enumerate() {
for (row_group_idx, row_group) in target_row_groups.into_iter().enumerate() {
let chunk_idx = row_group_idx % chunks_num;
filtered_row_group_chunks[chunk_idx].push(row_group);
}
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ mod tests {
time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)),
max_sequence: 200,
schema: schema.clone(),
sst_filter: Default::default(),
parquet_filter: Default::default(),
collapsible_cols_idx: Vec::new(),
};
let mut encoder =
Expand Down
70 changes: 38 additions & 32 deletions analytic_engine/src/sst/parquet/meta_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum Error {
},

#[snafu(display(
"Unsupported sst_filter version, version:{}.\nBacktrace\n:{}",
"Unsupported parquet_filter version, version:{}.\nBacktrace\n:{}",
version,
backtrace
))]
Expand All @@ -63,6 +63,7 @@ const DEFAULT_FILTER_VERSION: u32 = 0;
/// Filter can be used to test whether an element is a member of a set.
/// False positive matches are possible if space-efficient probabilistic data
/// structure are used.
// TODO: move this to sst module, and add a FilterBuild trait
trait Filter: fmt::Debug {
/// Check the key is in the bitmap index.
fn contains(&self, key: &[u8]) -> bool;
Expand Down Expand Up @@ -190,14 +191,13 @@ impl RowGroupFilter {
}
}

// TODO: move this to sst module
#[derive(Debug, Clone, PartialEq, Default)]
pub struct SstFilter {
pub struct ParquetFilter {
/// Every filter is a row group filter consists of column filters.
row_group_filters: Vec<RowGroupFilter>,
}

impl SstFilter {
impl ParquetFilter {
pub fn new(row_group_filters: Vec<RowGroupFilter>) -> Self {
Self { row_group_filters }
}
Expand All @@ -211,17 +211,17 @@ impl SstFilter {
}
}

impl Index<usize> for SstFilter {
impl Index<usize> for ParquetFilter {
type Output = RowGroupFilter;

fn index(&self, index: usize) -> &Self::Output {
&self.row_group_filters[index]
}
}

impl From<SstFilter> for sst_pb::SstFilter {
fn from(sst_filter: SstFilter) -> Self {
let row_group_filters = sst_filter
impl From<ParquetFilter> for sst_pb::ParquetFilter {
fn from(parquet_filter: ParquetFilter) -> Self {
let row_group_filters = parquet_filter
.row_group_filters
.into_iter()
.map(|row_group_filter| {
Expand All @@ -235,21 +235,21 @@ impl From<SstFilter> for sst_pb::SstFilter {
.unwrap_or_default()
})
.collect::<Vec<_>>();
sst_pb::sst_filter::RowGroupFilter { column_filters }
sst_pb::parquet_filter::RowGroupFilter { column_filters }
})
.collect::<Vec<_>>();

sst_pb::SstFilter {
sst_pb::ParquetFilter {
version: DEFAULT_FILTER_VERSION,
row_group_filters,
}
}
}

impl TryFrom<sst_pb::SstFilter> for SstFilter {
impl TryFrom<sst_pb::ParquetFilter> for ParquetFilter {
type Error = Error;

fn try_from(src: sst_pb::SstFilter) -> Result<Self> {
fn try_from(src: sst_pb::ParquetFilter) -> Result<Self> {
ensure!(
src.version == DEFAULT_FILTER_VERSION,
UnsupportedSstFilter {
Expand Down Expand Up @@ -280,7 +280,7 @@ impl TryFrom<sst_pb::SstFilter> for SstFilter {
})
.collect::<Result<Vec<_>>>()?;

Ok(SstFilter { row_group_filters })
Ok(ParquetFilter { row_group_filters })
}
}

Expand All @@ -294,7 +294,7 @@ pub struct ParquetMetaData {
/// Max sequence number in the sst
pub max_sequence: SequenceNumber,
pub schema: Schema,
pub sst_filter: Option<SstFilter>,
pub parquet_filter: Option<ParquetFilter>,
pub collapsible_cols_idx: Vec<u32>,
}

Expand All @@ -308,7 +308,7 @@ impl From<MetaData> for ParquetMetaData {
time_range: meta.time_range,
max_sequence: meta.max_sequence,
schema: meta.schema,
sst_filter: None,
parquet_filter: None,
collapsible_cols_idx: Vec::new(),
}
}
Expand All @@ -335,7 +335,7 @@ impl fmt::Debug for ParquetMetaData {
.field("max_sequence", &self.max_sequence)
.field("schema", &self.schema)
// Avoid the messy output from filter.
.field("has_filter", &self.sst_filter.is_some())
.field("has_filter", &self.parquet_filter.is_some())
.field("collapsible_cols_idx", &self.collapsible_cols_idx)
.finish()
}
Expand All @@ -349,7 +349,7 @@ impl From<ParquetMetaData> for sst_pb::ParquetMetaData {
max_sequence: src.max_sequence,
time_range: Some(src.time_range.into()),
schema: Some(common_pb::TableSchema::from(&src.schema)),
filter: src.sst_filter.map(|v| v.into()),
filter: src.parquet_filter.map(|v| v.into()),
collapsible_cols_idx: src.collapsible_cols_idx,
}
}
Expand All @@ -367,15 +367,15 @@ impl TryFrom<sst_pb::ParquetMetaData> for ParquetMetaData {
let schema = src.schema.context(TableSchemaNotFound)?;
Schema::try_from(schema).context(ConvertTableSchema)?
};
let sst_filter = src.filter.map(SstFilter::try_from).transpose()?;
let parquet_filter = src.filter.map(ParquetFilter::try_from).transpose()?;

Ok(Self {
min_key: src.min_key.into(),
max_key: src.max_key.into(),
time_range,
max_sequence: src.max_sequence,
schema,
sst_filter,
parquet_filter,
collapsible_cols_idx: src.collapsible_cols_idx,
})
}
Expand All @@ -386,8 +386,8 @@ mod tests {
use super::*;

#[test]
fn test_conversion_sst_filter() {
let sst_filter = SstFilter {
fn test_conversion_parquet_filter() {
let parquet_filter = ParquetFilter {
row_group_filters: vec![
RowGroupFilter {
column_filters: vec![None, Some(Box::new(Xor8Filter::default()))],
Expand All @@ -398,23 +398,29 @@ mod tests {
],
};

let sst_filter_pb: sst_pb::SstFilter = sst_filter.clone().into();
assert_eq!(sst_filter_pb.version, DEFAULT_FILTER_VERSION);
assert_eq!(sst_filter_pb.row_group_filters.len(), 2);
assert_eq!(sst_filter_pb.row_group_filters[0].column_filters.len(), 2);
assert_eq!(sst_filter_pb.row_group_filters[1].column_filters.len(), 2);
assert!(sst_filter_pb.row_group_filters[0].column_filters[0].is_empty());
let parquet_filter_pb: sst_pb::ParquetFilter = parquet_filter.clone().into();
assert_eq!(parquet_filter_pb.version, DEFAULT_FILTER_VERSION);
assert_eq!(parquet_filter_pb.row_group_filters.len(), 2);
assert_eq!(
sst_filter_pb.row_group_filters[0].column_filters[1].len(),
parquet_filter_pb.row_group_filters[0].column_filters.len(),
2
);
assert_eq!(
parquet_filter_pb.row_group_filters[1].column_filters.len(),
2
);
assert!(parquet_filter_pb.row_group_filters[0].column_filters[0].is_empty());
assert_eq!(
parquet_filter_pb.row_group_filters[0].column_filters[1].len(),
24
);
assert_eq!(
sst_filter_pb.row_group_filters[1].column_filters[0].len(),
parquet_filter_pb.row_group_filters[1].column_filters[0].len(),
24
);
assert!(sst_filter_pb.row_group_filters[1].column_filters[1].is_empty());
assert!(parquet_filter_pb.row_group_filters[1].column_filters[1].is_empty());

let decoded_sst_filter = SstFilter::try_from(sst_filter_pb).unwrap();
assert_eq!(decoded_sst_filter, sst_filter);
let decoded_parquet_filter = ParquetFilter::try_from(parquet_filter_pb).unwrap();
assert_eq!(decoded_parquet_filter, parquet_filter);
}
}
31 changes: 16 additions & 15 deletions analytic_engine/src/sst/parquet/row_group_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,61 +16,61 @@ use parquet_ext::prune::{
use snafu::ensure;

use crate::sst::{
parquet::meta_data::SstFilter,
parquet::meta_data::ParquetFilter,
reader::error::{OtherNoCause, Result},
};

/// RowGroupPruner is used to prune row groups according to the provided
/// predicates and filters.
///
/// Currently, two kinds of filters will be applied to such filtering:
/// min max & sst_filter.
/// min max & parquet_filter.
pub struct RowGroupPruner<'a> {
schema: &'a SchemaRef,
row_groups: &'a [RowGroupMetaData],
sst_filter: Option<&'a SstFilter>,
parquet_filter: Option<&'a ParquetFilter>,
predicates: &'a [Expr],
}

impl<'a> RowGroupPruner<'a> {
pub fn try_new(
schema: &'a SchemaRef,
row_groups: &'a [RowGroupMetaData],
sst_filter: Option<&'a SstFilter>,
parquet_filter: Option<&'a ParquetFilter>,
predicates: &'a [Expr],
) -> Result<Self> {
if let Some(f) = sst_filter {
if let Some(f) = parquet_filter {
ensure!(f.len() == row_groups.len(), OtherNoCause {
msg: format!("expect the same number of ss_filter as the number of row groups, num_sst_filters:{}, num_row_groups:{}", f.len(), row_groups.len()),
msg: format!("expect the same number of ss_filter as the number of row groups, num_parquet_filters:{}, num_row_groups:{}", f.len(), row_groups.len()),
});
}

Ok(Self {
schema,
row_groups,
sst_filter,
parquet_filter,
predicates,
})
}

pub fn prune(&self) -> Vec<usize> {
debug!(
"Begin to prune row groups, total_row_groups:{}, sst_filter:{}, predicates:{:?}",
"Begin to prune row groups, total_row_groups:{}, parquet_filter:{}, predicates:{:?}",
self.row_groups.len(),
self.sst_filter.is_some(),
self.parquet_filter.is_some(),
self.predicates,
);

let pruned0 = self.prune_by_min_max();
match self.sst_filter {
match self.parquet_filter {
Some(v) => {
// TODO: We can do continuous prune based on the `pruned0` to reduce the
// filtering cost.
let pruned1 = self.prune_by_filters(v);
let pruned = Self::intersect_pruned_row_groups(&pruned0, &pruned1);

debug!(
"Finish prune row groups by sst_filter and min_max, total_row_groups:{}, pruned_by_min_max:{}, pruned_by_blooms:{}, pruned_by_both:{}",
"Finish pruning row groups by parquet_filter and min_max, total_row_groups:{}, pruned_by_min_max:{}, pruned_by_blooms:{}, pruned_by_both:{}",
self.row_groups.len(),
pruned0.len(),
pruned1.len(),
Expand All @@ -95,15 +95,16 @@ impl<'a> RowGroupPruner<'a> {
}

/// Prune row groups according to the filter.
fn prune_by_filters(&self, sst_filter: &SstFilter) -> Vec<usize> {
fn prune_by_filters(&self, parquet_filter: &ParquetFilter) -> Vec<usize> {
let is_equal =
|col_pos: ColumnPosition, val: &ScalarValue, negated: bool| -> Option<bool> {
let datum = Datum::from_scalar_value(val)?;
let exist = sst_filter[col_pos.row_group_idx]
let exist = parquet_filter[col_pos.row_group_idx]
.contains_column_data(col_pos.column_idx, &datum.to_bytes())?;
if exist {
// sst_filter has false positivity, that is to say we are unsure whether this
// value exists even if the sst_filter says it exists.
// parquet_filter has false positivity, that is to say we are unsure whether
// this value exists even if the parquet_filter says it
// exists.
None
} else {
Some(negated)
Expand Down
Loading

0 comments on commit b4191fa

Please sign in to comment.