diff --git a/src/analytic_engine/src/sst/parquet/async_reader.rs b/src/analytic_engine/src/sst/parquet/async_reader.rs index c56836c9d1..94feeab2c5 100644 --- a/src/analytic_engine/src/sst/parquet/async_reader.rs +++ b/src/analytic_engine/src/sst/parquet/async_reader.rs @@ -71,7 +71,7 @@ use crate::{ metrics::MaybeTableLevelMetrics, parquet::{ encoding::ParquetDecoder, - meta_data::{ColumnValueSet, ParquetFilter}, + meta_data::{filter::ParquetFilter, ColumnValueSet}, row_group_pruner::RowGroupPruner, }, reader::{error::*, Result, SstReader}, diff --git a/src/analytic_engine/src/sst/parquet/meta_data.rs b/src/analytic_engine/src/sst/parquet/meta_data/filter.rs similarity index 59% rename from src/analytic_engine/src/sst/parquet/meta_data.rs rename to src/analytic_engine/src/sst/parquet/meta_data/filter.rs index 4c8b51ce85..d64ad974cc 100644 --- a/src/analytic_engine/src/sst/parquet/meta_data.rs +++ b/src/analytic_engine/src/sst/parquet/meta_data/filter.rs @@ -15,66 +15,21 @@ // specific language governing permissions and limitations // under the License. -// MetaData for SST based on parquet. - -use std::{collections::HashSet, fmt, ops::Index, sync::Arc}; - -use bytes_ext::Bytes; -use common_types::{ - datum::DatumKind, - schema::{RecordSchemaWithKey, Schema}, - time::TimeRange, - SequenceNumber, -}; -use horaedbproto::{schema as schema_pb, sst as sst_pb}; -use macros::define_result; -use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; -use xorfilter::xor8::{Xor8, Xor8Builder}; +// TODO: Better module name should be index. -use crate::sst::writer::MetaData; - -/// Error of sst file. -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Time range is not found.\nBacktrace\n:{}", backtrace))] - TimeRangeNotFound { backtrace: Backtrace }, - - #[snafu(display("Table schema is not found.\nBacktrace\n:{}", backtrace))] - TableSchemaNotFound { backtrace: Backtrace }, - - #[snafu(display( - "Failed to parse Xor8Filter from bytes, err:{}.\nBacktrace\n:{}", - source, - backtrace - ))] - ParseXor8Filter { - source: std::io::Error, - backtrace: Backtrace, - }, - - #[snafu(display( - "Failed to build Xor8Filter, err:{}.\nBacktrace\n:{}", - source, - backtrace - ))] - BuildXor8Filter { - source: xorfilter::Error, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to convert time range, err:{}", source))] - ConvertTimeRange { source: common_types::time::Error }, - - #[snafu(display("Failed to convert table schema, err:{}", source))] - ConvertTableSchema { source: common_types::schema::Error }, -} +use std::{fmt, ops::Index}; -define_result!(Error); +use common_types::{datum::DatumKind, schema::Schema}; +use horaedbproto::sst as sst_pb; +use snafu::ResultExt; +use xorfilter::xor8::{Xor8, Xor8Builder}; +use crate::sst::parquet::meta_data::{BuildXor8Filter, Error, ParseXor8Filter, Result}; + +// TODO: move this to sst module, and add a FilterBuild trait /// Filter can be used to test whether an element is a member of a set. /// False positive matches are possible if space-efficient probabilistic data /// structure are used. -// TODO: move this to sst module, and add a FilterBuild trait trait Filter: fmt::Debug { fn r#type(&self) -> FilterType; @@ -89,7 +44,7 @@ trait Filter: fmt::Debug { self.to_bytes().len() } - /// Deserialize the binary array to bitmap index. + /// Deserialize the binary array to specific filter. fn from_bytes(buf: Vec) -> Result where Self: Sized; @@ -140,13 +95,19 @@ pub struct RowGroupFilterBuilder { } impl RowGroupFilterBuilder { - pub(crate) fn new(record_schema: &RecordSchemaWithKey) -> Self { - let builders = record_schema + pub(crate) fn new(schema: &Schema) -> Self { + let builders = schema .columns() .iter() .enumerate() .map(|(i, col)| { - if record_schema.is_primary_key_index(i) { + // No need to create filter index over the timestamp column. + if schema.timestamp_index() == i { + return None; + } + + // No need to create filter index over the tsid column. + if schema.index_of_tsid().map(|idx| idx == i).unwrap_or(false) { return None; } @@ -340,185 +301,6 @@ impl TryFrom for ParquetFilter { } } -/// Meta data of a sst file -#[derive(Clone, PartialEq)] -pub struct ParquetMetaData { - pub min_key: Bytes, - pub max_key: Bytes, - /// Time Range of the sst - pub time_range: TimeRange, - /// Max sequence number in the sst - pub max_sequence: SequenceNumber, - pub schema: Schema, - pub parquet_filter: Option, - pub column_values: Option>>, -} - -pub type ParquetMetaDataRef = Arc; - -impl From<&MetaData> for ParquetMetaData { - fn from(meta: &MetaData) -> Self { - Self { - min_key: meta.min_key.clone(), - max_key: meta.max_key.clone(), - time_range: meta.time_range, - max_sequence: meta.max_sequence, - schema: meta.schema.clone(), - parquet_filter: None, - column_values: None, - } - } -} - -impl From for MetaData { - fn from(meta: ParquetMetaData) -> Self { - Self { - min_key: meta.min_key, - max_key: meta.max_key, - time_range: meta.time_range, - max_sequence: meta.max_sequence, - schema: meta.schema, - } - } -} - -impl From> for MetaData { - fn from(meta: Arc) -> Self { - Self { - min_key: meta.min_key.clone(), - max_key: meta.max_key.clone(), - time_range: meta.time_range, - max_sequence: meta.max_sequence, - schema: meta.schema.clone(), - } - } -} - -impl fmt::Debug for ParquetMetaData { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ParquetMetaData") - .field("min_key", &hex::encode(&self.min_key)) - .field("max_key", &hex::encode(&self.max_key)) - .field("time_range", &self.time_range) - .field("max_sequence", &self.max_sequence) - .field("schema", &self.schema) - .field("column_values", &self.column_values) - .field( - "filter_size", - &self - .parquet_filter - .as_ref() - .map(|filter| filter.size()) - .unwrap_or(0), - ) - .finish() - } -} - -impl From for sst_pb::ParquetMetaData { - fn from(src: ParquetMetaData) -> Self { - let column_values = if let Some(v) = src.column_values { - v.into_iter() - .map(|col| sst_pb::ColumnValueSet { - value: col.map(|col| col.into()), - }) - .collect() - } else { - Vec::new() - }; - sst_pb::ParquetMetaData { - min_key: src.min_key.to_vec(), - max_key: src.max_key.to_vec(), - max_sequence: src.max_sequence, - time_range: Some(src.time_range.into()), - schema: Some(schema_pb::TableSchema::from(&src.schema)), - filter: src.parquet_filter.map(|v| v.into()), - // collapsible_cols_idx is used in hybrid format ,and it's deprecated. - collapsible_cols_idx: Vec::new(), - column_values, - } - } -} - -impl TryFrom for ParquetMetaData { - type Error = Error; - - fn try_from(src: sst_pb::ParquetMetaData) -> Result { - let time_range = { - let time_range = src.time_range.context(TimeRangeNotFound)?; - TimeRange::try_from(time_range).context(ConvertTimeRange)? - }; - let schema = { - let schema = src.schema.context(TableSchemaNotFound)?; - Schema::try_from(schema).context(ConvertTableSchema)? - }; - let parquet_filter = src.filter.map(ParquetFilter::try_from).transpose()?; - let column_values = if src.column_values.is_empty() { - // Old version sst don't has this, so set to none. - None - } else { - Some( - src.column_values - .into_iter() - .map(|v| v.value.map(|v| v.into())) - .collect(), - ) - }; - - Ok(Self { - min_key: src.min_key.into(), - max_key: src.max_key.into(), - time_range, - max_sequence: src.max_sequence, - schema, - parquet_filter, - column_values, - }) - } -} - -#[derive(Debug, PartialEq, Clone)] -pub enum ColumnValueSet { - StringValue(HashSet), -} - -impl ColumnValueSet { - pub fn is_empty(&self) -> bool { - match self { - Self::StringValue(sv) => sv.is_empty(), - } - } - - pub fn len(&self) -> usize { - match self { - Self::StringValue(sv) => sv.len(), - } - } -} - -impl From for sst_pb::column_value_set::Value { - fn from(value: ColumnValueSet) -> Self { - match value { - ColumnValueSet::StringValue(values) => { - let values = values.into_iter().collect(); - sst_pb::column_value_set::Value::StringSet(sst_pb::column_value_set::StringSet { - values, - }) - } - } - } -} - -impl From for ColumnValueSet { - fn from(value: sst_pb::column_value_set::Value) -> Self { - match value { - sst_pb::column_value_set::Value::StringSet(ss) => { - ColumnValueSet::StringValue(HashSet::from_iter(ss.values)) - } - } - } -} - #[cfg(test)] mod tests { use common_types::tests::build_schema; @@ -569,8 +351,7 @@ mod tests { fn test_row_group_filter_builder() { // (key1(varbinary), key2(timestamp), field1(double), field2(string)) let schema = build_schema(); - let record_schema = schema.to_record_schema_with_key(); - let mut builders = RowGroupFilterBuilder::new(&record_schema); + let mut builders = RowGroupFilterBuilder::new(&schema); for key in ["host-123", "host-456", "host-789"] { builders.add_key(3, key.as_bytes()); } diff --git a/src/analytic_engine/src/sst/parquet/meta_data/mod.rs b/src/analytic_engine/src/sst/parquet/meta_data/mod.rs new file mode 100644 index 0000000000..0120d64944 --- /dev/null +++ b/src/analytic_engine/src/sst/parquet/meta_data/mod.rs @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// MetaData for SST based on parquet. + +use std::{collections::HashSet, fmt, sync::Arc}; + +use bytes_ext::Bytes; +use common_types::{schema::Schema, time::TimeRange, SequenceNumber}; +use horaedbproto::{schema as schema_pb, sst as sst_pb}; +use macros::define_result; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; + +use crate::sst::{parquet::meta_data::filter::ParquetFilter, writer::MetaData}; + +pub mod filter; + +/// Error of sst file. +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Time range is not found.\nBacktrace\n:{}", backtrace))] + TimeRangeNotFound { backtrace: Backtrace }, + + #[snafu(display("Table schema is not found.\nBacktrace\n:{}", backtrace))] + TableSchemaNotFound { backtrace: Backtrace }, + + #[snafu(display( + "Failed to parse Xor8Filter from bytes, err:{}.\nBacktrace\n:{}", + source, + backtrace + ))] + ParseXor8Filter { + source: std::io::Error, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to build Xor8Filter, err:{}.\nBacktrace\n:{}", + source, + backtrace + ))] + BuildXor8Filter { + source: xorfilter::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to convert time range, err:{}", source))] + ConvertTimeRange { source: common_types::time::Error }, + + #[snafu(display("Failed to convert table schema, err:{}", source))] + ConvertTableSchema { source: common_types::schema::Error }, +} + +define_result!(Error); + +/// Meta data of a sst file +#[derive(Clone, PartialEq)] +pub struct ParquetMetaData { + pub min_key: Bytes, + pub max_key: Bytes, + /// Time Range of the sst + pub time_range: TimeRange, + /// Max sequence number in the sst + pub max_sequence: SequenceNumber, + pub schema: Schema, + pub parquet_filter: Option, + pub column_values: Option>>, +} + +pub type ParquetMetaDataRef = Arc; + +impl From<&MetaData> for ParquetMetaData { + fn from(meta: &MetaData) -> Self { + Self { + min_key: meta.min_key.clone(), + max_key: meta.max_key.clone(), + time_range: meta.time_range, + max_sequence: meta.max_sequence, + schema: meta.schema.clone(), + parquet_filter: None, + column_values: None, + } + } +} + +impl From for MetaData { + fn from(meta: ParquetMetaData) -> Self { + Self { + min_key: meta.min_key, + max_key: meta.max_key, + time_range: meta.time_range, + max_sequence: meta.max_sequence, + schema: meta.schema, + } + } +} + +impl From> for MetaData { + fn from(meta: Arc) -> Self { + Self { + min_key: meta.min_key.clone(), + max_key: meta.max_key.clone(), + time_range: meta.time_range, + max_sequence: meta.max_sequence, + schema: meta.schema.clone(), + } + } +} + +impl fmt::Debug for ParquetMetaData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetMetaData") + .field("min_key", &hex::encode(&self.min_key)) + .field("max_key", &hex::encode(&self.max_key)) + .field("time_range", &self.time_range) + .field("max_sequence", &self.max_sequence) + .field("schema", &self.schema) + .field("column_values", &self.column_values) + .field( + "filter_size", + &self + .parquet_filter + .as_ref() + .map(|filter| filter.size()) + .unwrap_or(0), + ) + .finish() + } +} + +impl From for sst_pb::ParquetMetaData { + fn from(src: ParquetMetaData) -> Self { + let column_values = if let Some(v) = src.column_values { + v.into_iter() + .map(|col| sst_pb::ColumnValueSet { + value: col.map(|col| col.into()), + }) + .collect() + } else { + Vec::new() + }; + sst_pb::ParquetMetaData { + min_key: src.min_key.to_vec(), + max_key: src.max_key.to_vec(), + max_sequence: src.max_sequence, + time_range: Some(src.time_range.into()), + schema: Some(schema_pb::TableSchema::from(&src.schema)), + filter: src.parquet_filter.map(|v| v.into()), + // collapsible_cols_idx is used in hybrid format ,and it's deprecated. + collapsible_cols_idx: Vec::new(), + column_values, + } + } +} + +impl TryFrom for ParquetMetaData { + type Error = Error; + + fn try_from(src: sst_pb::ParquetMetaData) -> Result { + let time_range = { + let time_range = src.time_range.context(TimeRangeNotFound)?; + TimeRange::try_from(time_range).context(ConvertTimeRange)? + }; + let schema = { + let schema = src.schema.context(TableSchemaNotFound)?; + Schema::try_from(schema).context(ConvertTableSchema)? + }; + let parquet_filter = src.filter.map(ParquetFilter::try_from).transpose()?; + let column_values = if src.column_values.is_empty() { + // Old version sst don't has this, so set to none. + None + } else { + Some( + src.column_values + .into_iter() + .map(|v| v.value.map(|v| v.into())) + .collect(), + ) + }; + + Ok(Self { + min_key: src.min_key.into(), + max_key: src.max_key.into(), + time_range, + max_sequence: src.max_sequence, + schema, + parquet_filter, + column_values, + }) + } +} + +#[derive(Debug, PartialEq, Clone)] +pub enum ColumnValueSet { + StringValue(HashSet), +} + +impl ColumnValueSet { + pub fn is_empty(&self) -> bool { + match self { + Self::StringValue(sv) => sv.is_empty(), + } + } + + pub fn len(&self) -> usize { + match self { + Self::StringValue(sv) => sv.len(), + } + } +} + +impl From for sst_pb::column_value_set::Value { + fn from(value: ColumnValueSet) -> Self { + match value { + ColumnValueSet::StringValue(values) => { + let values = values.into_iter().collect(); + sst_pb::column_value_set::Value::StringSet(sst_pb::column_value_set::StringSet { + values, + }) + } + } + } +} + +impl From for ColumnValueSet { + fn from(value: sst_pb::column_value_set::Value) -> Self { + match value { + sst_pb::column_value_set::Value::StringSet(ss) => { + ColumnValueSet::StringValue(HashSet::from_iter(ss.values)) + } + } + } +} diff --git a/src/analytic_engine/src/sst/parquet/row_group_pruner.rs b/src/analytic_engine/src/sst/parquet/row_group_pruner.rs index a101ff05e2..3aa0c43cc0 100644 --- a/src/analytic_engine/src/sst/parquet/row_group_pruner.rs +++ b/src/analytic_engine/src/sst/parquet/row_group_pruner.rs @@ -40,7 +40,7 @@ use snafu::ensure; use trace_metric::{MetricsCollector, TraceMetricWhenDrop}; use crate::sst::{ - parquet::meta_data::{ColumnValueSet, ParquetFilter}, + parquet::meta_data::{filter::ParquetFilter, ColumnValueSet}, reader::error::{OtherNoCause, Result}, }; diff --git a/src/analytic_engine/src/sst/parquet/writer.rs b/src/analytic_engine/src/sst/parquet/writer.rs index 5be8712c8e..8829349752 100644 --- a/src/analytic_engine/src/sst/parquet/writer.rs +++ b/src/analytic_engine/src/sst/parquet/writer.rs @@ -21,7 +21,8 @@ use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use common_types::{ - datum::DatumKind, record_batch::FetchedRecordBatch, request_id::RequestId, time::TimeRange, + datum::DatumKind, record_batch::FetchedRecordBatch, request_id::RequestId, schema::Schema, + time::TimeRange, }; use datafusion::parquet::basic::Compression; use futures::StreamExt; @@ -39,14 +40,13 @@ use crate::{ parquet::{ encoding::{encode_sst_meta_data, ColumnEncoding, EncodeOptions, ParquetEncoder}, meta_data::{ - ColumnValueSet, ParquetFilter, ParquetMetaData, RowGroupFilter, - RowGroupFilterBuilder, + filter::{ParquetFilter, RowGroupFilter, RowGroupFilterBuilder}, + ColumnValueSet, ParquetMetaData, }, }, writer::{ - self, BuildParquetFilter, BuildParquetFilterNoCause, EncodePbData, EncodeRecordBatch, - ExpectTimestampColumn, Io, MetaData, PollRecordBatch, RecordBatchStream, Result, - SstInfo, SstWriter, Storage, + self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, ExpectTimestampColumn, Io, + MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, Storage, }, }, table::sst_util, @@ -237,15 +237,10 @@ impl<'a> RecordBatchGroupWriter<'a> { /// Build the parquet filter for the given `row_group`. fn build_row_group_filter( &self, + schema: &Schema, row_group_batch: &[FetchedRecordBatch], ) -> Result { - let schema_with_key = - row_group_batch[0] - .schema_with_key() - .with_context(|| BuildParquetFilterNoCause { - msg: "primary key indexes not exist", - })?; - let mut builder = RowGroupFilterBuilder::new(&schema_with_key); + let mut builder = RowGroupFilterBuilder::new(schema); for partial_batch in row_group_batch { for (col_idx, column) in partial_batch.columns().iter().enumerate() { @@ -356,7 +351,9 @@ impl<'a> RecordBatchGroupWriter<'a> { let timestamp_index = self.meta_data.schema.timestamp_index(); while !row_group.is_empty() { if let Some(filter) = &mut parquet_filter { - filter.push_row_group_filter(self.build_row_group_filter(&row_group)?); + filter.push_row_group_filter( + self.build_row_group_filter(&self.meta_data.schema, &row_group)?, + ); } let num_batches = row_group.len();