From b4191fa7b64531719965d0462b9ba29bd93fbc70 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 13 Feb 2023 14:51:50 +0800 Subject: [PATCH] rename sst filter to parquet filter --- analytic_engine/src/compaction/picker.rs | 2 +- analytic_engine/src/sst/meta_data/cache.rs | 2 +- .../src/sst/parquet/async_reader.rs | 20 +++--- analytic_engine/src/sst/parquet/encoding.rs | 2 +- analytic_engine/src/sst/parquet/meta_data.rs | 70 ++++++++++--------- .../src/sst/parquet/row_group_pruner.rs | 31 ++++---- analytic_engine/src/sst/parquet/writer.rs | 25 ++++--- analytic_engine/src/sst/writer.rs | 3 + components/parquet_ext/src/prune/equal.rs | 4 +- proto/protos/sst.proto | 16 ++--- 10 files changed, 92 insertions(+), 83 deletions(-) diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index ec5f97504f..3e10dd1379 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -627,7 +627,7 @@ mod tests { time_range, max_sequence: 200, schema: build_schema(), - sst_filter: Default::default(), + parquet_filter: Default::default(), collapsible_cols_idx: Vec::new(), }; diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index 6f1778c948..ea773334a2 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -47,7 +47,7 @@ impl MetaData { let mut sst_meta = encoding::decode_sst_meta_data(&kv_metas[0]).context(DecodeCustomMetaData)?; if ignore_sst_filter { - sst_meta.sst_filter = None; + sst_meta.parquet_filter = None; } Arc::new(sst_meta) diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 7548e1f689..a843ecdd65 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -44,7 +44,7 @@ use crate::sst::{ metrics, parquet::{ encoding::ParquetDecoder, - meta_data::{ParquetMetaDataRef, SstFilter}, + meta_data::{ParquetFilter, ParquetMetaDataRef}, row_group_pruner::RowGroupPruner, }, reader::{error::*, Result, SstReader}, @@ -147,10 +147,10 @@ impl<'a> Reader<'a> { &self, schema: SchemaRef, row_groups: &[RowGroupMetaData], - sst_filter: Option<&SstFilter>, + parquet_filter: Option<&ParquetFilter>, ) -> Result> { let pruner = - RowGroupPruner::try_new(&schema, row_groups, sst_filter, self.predicate.exprs())?; + RowGroupPruner::try_new(&schema, row_groups, parquet_filter, self.predicate.exprs())?; Ok(pruner.prune()) } @@ -165,38 +165,38 @@ impl<'a> Reader<'a> { let row_projector = self.row_projector.as_ref().unwrap(); // Get target row groups. - let pruned_row_groups = self.prune_row_groups( + let target_row_groups = self.prune_row_groups( meta_data.custom().schema.to_arrow_schema_ref(), meta_data.parquet().row_groups(), - meta_data.custom().sst_filter.as_ref(), + meta_data.custom().parquet_filter.as_ref(), )?; info!( "Reader fetch record batches, path:{}, row_groups total:{}, after prune:{}", self.path, meta_data.parquet().num_row_groups(), - pruned_row_groups.len(), + target_row_groups.len(), ); - if pruned_row_groups.is_empty() { + if target_row_groups.is_empty() { return Ok(Vec::new()); } // Partition the batches by `read_parallelism`. let suggest_read_parallelism = read_parallelism; - let read_parallelism = std::cmp::min(pruned_row_groups.len(), suggest_read_parallelism); + let read_parallelism = std::cmp::min(target_row_groups.len(), suggest_read_parallelism); // TODO: we only support read parallelly when `batch_size` == // `num_rows_per_row_group`, so this placing method is ok, we should // adjust it when supporting it other situations. let chunks_num = read_parallelism; - let chunk_size = pruned_row_groups.len() / read_parallelism + 1; + let chunk_size = target_row_groups.len() / read_parallelism + 1; info!( "Reader fetch record batches parallelly, parallelism suggest:{}, real:{}, chunk_size:{}", suggest_read_parallelism, read_parallelism, chunk_size ); let mut filtered_row_group_chunks = vec![Vec::with_capacity(chunk_size); chunks_num]; - for (row_group_idx, row_group) in pruned_row_groups.into_iter().enumerate() { + for (row_group_idx, row_group) in target_row_groups.into_iter().enumerate() { let chunk_idx = row_group_idx % chunks_num; filtered_row_group_chunks[chunk_idx].push(row_group); } diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 17e7849b6c..671cbcf32b 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -862,7 +862,7 @@ mod tests { time_range: TimeRange::new_unchecked(Timestamp::new(100), Timestamp::new(101)), max_sequence: 200, schema: schema.clone(), - sst_filter: Default::default(), + parquet_filter: Default::default(), collapsible_cols_idx: Vec::new(), }; let mut encoder = diff --git a/analytic_engine/src/sst/parquet/meta_data.rs b/analytic_engine/src/sst/parquet/meta_data.rs index 3ebb6ebecc..06e97e3535 100644 --- a/analytic_engine/src/sst/parquet/meta_data.rs +++ b/analytic_engine/src/sst/parquet/meta_data.rs @@ -43,7 +43,7 @@ pub enum Error { }, #[snafu(display( - "Unsupported sst_filter version, version:{}.\nBacktrace\n:{}", + "Unsupported parquet_filter version, version:{}.\nBacktrace\n:{}", version, backtrace ))] @@ -63,6 +63,7 @@ const DEFAULT_FILTER_VERSION: u32 = 0; /// Filter can be used to test whether an element is a member of a set. /// False positive matches are possible if space-efficient probabilistic data /// structure are used. +// TODO: move this to sst module, and add a FilterBuild trait trait Filter: fmt::Debug { /// Check the key is in the bitmap index. fn contains(&self, key: &[u8]) -> bool; @@ -190,14 +191,13 @@ impl RowGroupFilter { } } -// TODO: move this to sst module #[derive(Debug, Clone, PartialEq, Default)] -pub struct SstFilter { +pub struct ParquetFilter { /// Every filter is a row group filter consists of column filters. row_group_filters: Vec, } -impl SstFilter { +impl ParquetFilter { pub fn new(row_group_filters: Vec) -> Self { Self { row_group_filters } } @@ -211,7 +211,7 @@ impl SstFilter { } } -impl Index for SstFilter { +impl Index for ParquetFilter { type Output = RowGroupFilter; fn index(&self, index: usize) -> &Self::Output { @@ -219,9 +219,9 @@ impl Index for SstFilter { } } -impl From for sst_pb::SstFilter { - fn from(sst_filter: SstFilter) -> Self { - let row_group_filters = sst_filter +impl From for sst_pb::ParquetFilter { + fn from(parquet_filter: ParquetFilter) -> Self { + let row_group_filters = parquet_filter .row_group_filters .into_iter() .map(|row_group_filter| { @@ -235,21 +235,21 @@ impl From for sst_pb::SstFilter { .unwrap_or_default() }) .collect::>(); - sst_pb::sst_filter::RowGroupFilter { column_filters } + sst_pb::parquet_filter::RowGroupFilter { column_filters } }) .collect::>(); - sst_pb::SstFilter { + sst_pb::ParquetFilter { version: DEFAULT_FILTER_VERSION, row_group_filters, } } } -impl TryFrom for SstFilter { +impl TryFrom for ParquetFilter { type Error = Error; - fn try_from(src: sst_pb::SstFilter) -> Result { + fn try_from(src: sst_pb::ParquetFilter) -> Result { ensure!( src.version == DEFAULT_FILTER_VERSION, UnsupportedSstFilter { @@ -280,7 +280,7 @@ impl TryFrom for SstFilter { }) .collect::>>()?; - Ok(SstFilter { row_group_filters }) + Ok(ParquetFilter { row_group_filters }) } } @@ -294,7 +294,7 @@ pub struct ParquetMetaData { /// Max sequence number in the sst pub max_sequence: SequenceNumber, pub schema: Schema, - pub sst_filter: Option, + pub parquet_filter: Option, pub collapsible_cols_idx: Vec, } @@ -308,7 +308,7 @@ impl From for ParquetMetaData { time_range: meta.time_range, max_sequence: meta.max_sequence, schema: meta.schema, - sst_filter: None, + parquet_filter: None, collapsible_cols_idx: Vec::new(), } } @@ -335,7 +335,7 @@ impl fmt::Debug for ParquetMetaData { .field("max_sequence", &self.max_sequence) .field("schema", &self.schema) // Avoid the messy output from filter. - .field("has_filter", &self.sst_filter.is_some()) + .field("has_filter", &self.parquet_filter.is_some()) .field("collapsible_cols_idx", &self.collapsible_cols_idx) .finish() } @@ -349,7 +349,7 @@ impl From for sst_pb::ParquetMetaData { max_sequence: src.max_sequence, time_range: Some(src.time_range.into()), schema: Some(common_pb::TableSchema::from(&src.schema)), - filter: src.sst_filter.map(|v| v.into()), + filter: src.parquet_filter.map(|v| v.into()), collapsible_cols_idx: src.collapsible_cols_idx, } } @@ -367,7 +367,7 @@ impl TryFrom for ParquetMetaData { let schema = src.schema.context(TableSchemaNotFound)?; Schema::try_from(schema).context(ConvertTableSchema)? }; - let sst_filter = src.filter.map(SstFilter::try_from).transpose()?; + let parquet_filter = src.filter.map(ParquetFilter::try_from).transpose()?; Ok(Self { min_key: src.min_key.into(), @@ -375,7 +375,7 @@ impl TryFrom for ParquetMetaData { time_range, max_sequence: src.max_sequence, schema, - sst_filter, + parquet_filter, collapsible_cols_idx: src.collapsible_cols_idx, }) } @@ -386,8 +386,8 @@ mod tests { use super::*; #[test] - fn test_conversion_sst_filter() { - let sst_filter = SstFilter { + fn test_conversion_parquet_filter() { + let parquet_filter = ParquetFilter { row_group_filters: vec![ RowGroupFilter { column_filters: vec![None, Some(Box::new(Xor8Filter::default()))], @@ -398,23 +398,29 @@ mod tests { ], }; - let sst_filter_pb: sst_pb::SstFilter = sst_filter.clone().into(); - assert_eq!(sst_filter_pb.version, DEFAULT_FILTER_VERSION); - assert_eq!(sst_filter_pb.row_group_filters.len(), 2); - assert_eq!(sst_filter_pb.row_group_filters[0].column_filters.len(), 2); - assert_eq!(sst_filter_pb.row_group_filters[1].column_filters.len(), 2); - assert!(sst_filter_pb.row_group_filters[0].column_filters[0].is_empty()); + let parquet_filter_pb: sst_pb::ParquetFilter = parquet_filter.clone().into(); + assert_eq!(parquet_filter_pb.version, DEFAULT_FILTER_VERSION); + assert_eq!(parquet_filter_pb.row_group_filters.len(), 2); assert_eq!( - sst_filter_pb.row_group_filters[0].column_filters[1].len(), + parquet_filter_pb.row_group_filters[0].column_filters.len(), + 2 + ); + assert_eq!( + parquet_filter_pb.row_group_filters[1].column_filters.len(), + 2 + ); + assert!(parquet_filter_pb.row_group_filters[0].column_filters[0].is_empty()); + assert_eq!( + parquet_filter_pb.row_group_filters[0].column_filters[1].len(), 24 ); assert_eq!( - sst_filter_pb.row_group_filters[1].column_filters[0].len(), + parquet_filter_pb.row_group_filters[1].column_filters[0].len(), 24 ); - assert!(sst_filter_pb.row_group_filters[1].column_filters[1].is_empty()); + assert!(parquet_filter_pb.row_group_filters[1].column_filters[1].is_empty()); - let decoded_sst_filter = SstFilter::try_from(sst_filter_pb).unwrap(); - assert_eq!(decoded_sst_filter, sst_filter); + let decoded_parquet_filter = ParquetFilter::try_from(parquet_filter_pb).unwrap(); + assert_eq!(decoded_parquet_filter, parquet_filter); } } diff --git a/analytic_engine/src/sst/parquet/row_group_pruner.rs b/analytic_engine/src/sst/parquet/row_group_pruner.rs index 6e505a75c0..b67a24d128 100644 --- a/analytic_engine/src/sst/parquet/row_group_pruner.rs +++ b/analytic_engine/src/sst/parquet/row_group_pruner.rs @@ -16,7 +16,7 @@ use parquet_ext::prune::{ use snafu::ensure; use crate::sst::{ - parquet::meta_data::SstFilter, + parquet::meta_data::ParquetFilter, reader::error::{OtherNoCause, Result}, }; @@ -24,11 +24,11 @@ use crate::sst::{ /// predicates and filters. /// /// Currently, two kinds of filters will be applied to such filtering: -/// min max & sst_filter. +/// min max & parquet_filter. pub struct RowGroupPruner<'a> { schema: &'a SchemaRef, row_groups: &'a [RowGroupMetaData], - sst_filter: Option<&'a SstFilter>, + parquet_filter: Option<&'a ParquetFilter>, predicates: &'a [Expr], } @@ -36,33 +36,33 @@ impl<'a> RowGroupPruner<'a> { pub fn try_new( schema: &'a SchemaRef, row_groups: &'a [RowGroupMetaData], - sst_filter: Option<&'a SstFilter>, + parquet_filter: Option<&'a ParquetFilter>, predicates: &'a [Expr], ) -> Result { - if let Some(f) = sst_filter { + if let Some(f) = parquet_filter { ensure!(f.len() == row_groups.len(), OtherNoCause { - msg: format!("expect the same number of ss_filter as the number of row groups, num_sst_filters:{}, num_row_groups:{}", f.len(), row_groups.len()), + msg: format!("expect the same number of ss_filter as the number of row groups, num_parquet_filters:{}, num_row_groups:{}", f.len(), row_groups.len()), }); } Ok(Self { schema, row_groups, - sst_filter, + parquet_filter, predicates, }) } pub fn prune(&self) -> Vec { debug!( - "Begin to prune row groups, total_row_groups:{}, sst_filter:{}, predicates:{:?}", + "Begin to prune row groups, total_row_groups:{}, parquet_filter:{}, predicates:{:?}", self.row_groups.len(), - self.sst_filter.is_some(), + self.parquet_filter.is_some(), self.predicates, ); let pruned0 = self.prune_by_min_max(); - match self.sst_filter { + match self.parquet_filter { Some(v) => { // TODO: We can do continuous prune based on the `pruned0` to reduce the // filtering cost. @@ -70,7 +70,7 @@ impl<'a> RowGroupPruner<'a> { let pruned = Self::intersect_pruned_row_groups(&pruned0, &pruned1); debug!( - "Finish prune row groups by sst_filter and min_max, total_row_groups:{}, pruned_by_min_max:{}, pruned_by_blooms:{}, pruned_by_both:{}", + "Finish pruning row groups by parquet_filter and min_max, total_row_groups:{}, pruned_by_min_max:{}, pruned_by_blooms:{}, pruned_by_both:{}", self.row_groups.len(), pruned0.len(), pruned1.len(), @@ -95,15 +95,16 @@ impl<'a> RowGroupPruner<'a> { } /// Prune row groups according to the filter. - fn prune_by_filters(&self, sst_filter: &SstFilter) -> Vec { + fn prune_by_filters(&self, parquet_filter: &ParquetFilter) -> Vec { let is_equal = |col_pos: ColumnPosition, val: &ScalarValue, negated: bool| -> Option { let datum = Datum::from_scalar_value(val)?; - let exist = sst_filter[col_pos.row_group_idx] + let exist = parquet_filter[col_pos.row_group_idx] .contains_column_data(col_pos.column_idx, &datum.to_bytes())?; if exist { - // sst_filter has false positivity, that is to say we are unsure whether this - // value exists even if the sst_filter says it exists. + // parquet_filter has false positivity, that is to say we are unsure whether + // this value exists even if the parquet_filter says it + // exists. None } else { Some(negated) diff --git a/analytic_engine/src/sst/parquet/writer.rs b/analytic_engine/src/sst/parquet/writer.rs index 2cddacf83b..9a5589a0d9 100644 --- a/analytic_engine/src/sst/parquet/writer.rs +++ b/analytic_engine/src/sst/parquet/writer.rs @@ -21,11 +21,11 @@ use crate::{ factory::{ObjectStorePickerRef, SstWriteOptions}, parquet::{ encoding::ParquetEncoder, - meta_data::{ParquetMetaData, RowGroupFilterBuilder, SstFilter}, + meta_data::{ParquetFilter, ParquetMetaData, RowGroupFilterBuilder}, }, writer::{ - self, EncodeRecordBatch, MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, - SstWriter, Storage, + self, BuildParquetFilter, EncodeRecordBatch, MetaData, PollRecordBatch, + RecordBatchStream, Result, SstInfo, SstWriter, Storage, }, }, table_options::StorageFormat, @@ -149,10 +149,10 @@ impl RecordBytesReader { Ok(curr_row_group) } - fn build_sst_filter(&self) -> SstFilter { - // TODO: support sst filter in hybrid storage format [#435](https://github.com/CeresDB/ceresdb/issues/435) + fn build_parquet_filter(&self) -> Result { + // TODO: support filter in hybrid storage format [#435](https://github.com/CeresDB/ceresdb/issues/435) if self.hybrid_encoding { - return SstFilter::default(); + return Ok(ParquetFilter::default()); } let filters = self .partitioned_record_batch @@ -171,21 +171,20 @@ impl RecordBytesReader { } } - // remove unwrap - builder.build().unwrap() + builder.build().box_err().context(BuildParquetFilter) }) - .collect::>(); + .collect::>>()?; - SstFilter::new(filters) + Ok(ParquetFilter::new(filters)) } async fn read_all(mut self) -> Result> { self.partition_record_batch().await?; let parquet_meta_data = { - let sst_filter = self.build_sst_filter(); + let sst_filter = self.build_parquet_filter()?; let mut parquet_meta_data = ParquetMetaData::from(self.meta_data); - parquet_meta_data.sst_filter = Some(sst_filter); + parquet_meta_data.parquet_filter = Some(sst_filter); parquet_meta_data }; @@ -394,7 +393,7 @@ mod tests { .clone(); // sst filter is built insider sst writer, so overwrite to default for // comparison. - sst_meta_readback.sst_filter = Default::default(); + sst_meta_readback.parquet_filter = Default::default(); assert_eq!(&sst_meta_readback, &ParquetMetaData::from(sst_meta)); assert_eq!( expected_num_rows, diff --git a/analytic_engine/src/sst/writer.rs b/analytic_engine/src/sst/writer.rs index 4dc90467c3..87a78e30b2 100644 --- a/analytic_engine/src/sst/writer.rs +++ b/analytic_engine/src/sst/writer.rs @@ -40,6 +40,9 @@ pub mod error { backtrace: Backtrace, }, + #[snafu(display("Failed to build parquet filter, err:{}", source))] + BuildParquetFilter { source: GenericError }, + #[snafu(display("Failed to poll record batch, err:{}", source))] PollRecordBatch { source: GenericError }, diff --git a/components/parquet_ext/src/prune/equal.rs b/components/parquet_ext/src/prune/equal.rs index f594339140..2c67def187 100644 --- a/components/parquet_ext/src/prune/equal.rs +++ b/components/parquet_ext/src/prune/equal.rs @@ -477,9 +477,9 @@ mod tests { Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Int32, false), ]); - let filtered_row_groups = + let target_row_groups = prune_row_groups(Arc::new(schema), &vec![predicate1, predicate2], 3, is_equal); - assert_eq!(vec![1, 2], filtered_row_groups) + assert_eq!(vec![1, 2], target_row_groups) } } diff --git a/proto/protos/sst.proto b/proto/protos/sst.proto index e2608ed868..1c63521e6f 100644 --- a/proto/protos/sst.proto +++ b/proto/protos/sst.proto @@ -7,7 +7,13 @@ package sst; import "common.proto"; import "analytic_common.proto"; -message SstFilter { +message SstMetaData { + oneof MetaData { + ParquetMetaData parquet = 1; + } +} + +message ParquetFilter { message RowGroupFilter { repeated bytes column_filters = 1; }; @@ -16,12 +22,6 @@ message SstFilter { repeated RowGroupFilter row_group_filters = 2; } -message SstMetaData { - oneof MetaData { - ParquetMetaData parquet = 1; - } -} - /// Used by ssts encoded by parquet, incuding columar&hybrid storage formats. message ParquetMetaData { // Min key in the sst @@ -33,6 +33,6 @@ message ParquetMetaData { // The time range of the sst common.TimeRange time_range = 4; common.TableSchema schema = 5; - SstFilter filter = 6; + ParquetFilter filter = 6; repeated uint32 collapsible_cols_idx = 7; }