Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: missing filter index over the primary keys #1456

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use crate::{
metrics::MaybeTableLevelMetrics,
parquet::{
encoding::ParquetDecoder,
meta_data::{ColumnValueSet, ParquetFilter},
meta_data::{filter::ParquetFilter, ColumnValueSet},
row_group_pruner::RowGroupPruner,
},
reader::{error::*, Result, SstReader},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,66 +15,21 @@
// specific language governing permissions and limitations
// under the License.

// MetaData for SST based on parquet.

use std::{collections::HashSet, fmt, ops::Index, sync::Arc};

use bytes_ext::Bytes;
use common_types::{
datum::DatumKind,
schema::{RecordSchemaWithKey, Schema},
time::TimeRange,
SequenceNumber,
};
use horaedbproto::{schema as schema_pb, sst as sst_pb};
use macros::define_result;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use xorfilter::xor8::{Xor8, Xor8Builder};
// TODO: Better module name should be index.

use crate::sst::writer::MetaData;

/// Error of sst file.
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Time range is not found.\nBacktrace\n:{}", backtrace))]
TimeRangeNotFound { backtrace: Backtrace },

#[snafu(display("Table schema is not found.\nBacktrace\n:{}", backtrace))]
TableSchemaNotFound { backtrace: Backtrace },

#[snafu(display(
"Failed to parse Xor8Filter from bytes, err:{}.\nBacktrace\n:{}",
source,
backtrace
))]
ParseXor8Filter {
source: std::io::Error,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to build Xor8Filter, err:{}.\nBacktrace\n:{}",
source,
backtrace
))]
BuildXor8Filter {
source: xorfilter::Error,
backtrace: Backtrace,
},

#[snafu(display("Failed to convert time range, err:{}", source))]
ConvertTimeRange { source: common_types::time::Error },

#[snafu(display("Failed to convert table schema, err:{}", source))]
ConvertTableSchema { source: common_types::schema::Error },
}
use std::{fmt, ops::Index};

define_result!(Error);
use common_types::{datum::DatumKind, schema::Schema};
use horaedbproto::sst as sst_pb;
use snafu::ResultExt;
use xorfilter::xor8::{Xor8, Xor8Builder};

use crate::sst::parquet::meta_data::{BuildXor8Filter, Error, ParseXor8Filter, Result};

// TODO: move this to sst module, and add a FilterBuild trait
/// Filter can be used to test whether an element is a member of a set.
/// False positive matches are possible if space-efficient probabilistic data
/// structure are used.
// TODO: move this to sst module, and add a FilterBuild trait
trait Filter: fmt::Debug {
fn r#type(&self) -> FilterType;

Expand All @@ -89,7 +44,7 @@ trait Filter: fmt::Debug {
self.to_bytes().len()
}

/// Deserialize the binary array to bitmap index.
/// Deserialize the binary array to specific filter.
fn from_bytes(buf: Vec<u8>) -> Result<Self>
where
Self: Sized;
Expand Down Expand Up @@ -140,13 +95,19 @@ pub struct RowGroupFilterBuilder {
}

impl RowGroupFilterBuilder {
pub(crate) fn new(record_schema: &RecordSchemaWithKey) -> Self {
let builders = record_schema
pub(crate) fn new(schema: &Schema) -> Self {
let builders = schema
.columns()
.iter()
.enumerate()
.map(|(i, col)| {
if record_schema.is_primary_key_index(i) {
// No need to create filter index over the timestamp column.
if schema.timestamp_index() == i {
return None;
}

// No need to create filter index over the tsid column.
if schema.index_of_tsid().map(|idx| idx == i).unwrap_or(false) {
return None;
}

Expand Down Expand Up @@ -340,185 +301,6 @@ impl TryFrom<sst_pb::ParquetFilter> for ParquetFilter {
}
}

/// Meta data of a sst file
#[derive(Clone, PartialEq)]
pub struct ParquetMetaData {
pub min_key: Bytes,
pub max_key: Bytes,
/// Time Range of the sst
pub time_range: TimeRange,
/// Max sequence number in the sst
pub max_sequence: SequenceNumber,
pub schema: Schema,
pub parquet_filter: Option<ParquetFilter>,
pub column_values: Option<Vec<Option<ColumnValueSet>>>,
}

pub type ParquetMetaDataRef = Arc<ParquetMetaData>;

impl From<&MetaData> for ParquetMetaData {
fn from(meta: &MetaData) -> Self {
Self {
min_key: meta.min_key.clone(),
max_key: meta.max_key.clone(),
time_range: meta.time_range,
max_sequence: meta.max_sequence,
schema: meta.schema.clone(),
parquet_filter: None,
column_values: None,
}
}
}

impl From<ParquetMetaData> for MetaData {
fn from(meta: ParquetMetaData) -> Self {
Self {
min_key: meta.min_key,
max_key: meta.max_key,
time_range: meta.time_range,
max_sequence: meta.max_sequence,
schema: meta.schema,
}
}
}

impl From<Arc<ParquetMetaData>> for MetaData {
fn from(meta: Arc<ParquetMetaData>) -> Self {
Self {
min_key: meta.min_key.clone(),
max_key: meta.max_key.clone(),
time_range: meta.time_range,
max_sequence: meta.max_sequence,
schema: meta.schema.clone(),
}
}
}

impl fmt::Debug for ParquetMetaData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ParquetMetaData")
.field("min_key", &hex::encode(&self.min_key))
.field("max_key", &hex::encode(&self.max_key))
.field("time_range", &self.time_range)
.field("max_sequence", &self.max_sequence)
.field("schema", &self.schema)
.field("column_values", &self.column_values)
.field(
"filter_size",
&self
.parquet_filter
.as_ref()
.map(|filter| filter.size())
.unwrap_or(0),
)
.finish()
}
}

impl From<ParquetMetaData> for sst_pb::ParquetMetaData {
fn from(src: ParquetMetaData) -> Self {
let column_values = if let Some(v) = src.column_values {
v.into_iter()
.map(|col| sst_pb::ColumnValueSet {
value: col.map(|col| col.into()),
})
.collect()
} else {
Vec::new()
};
sst_pb::ParquetMetaData {
min_key: src.min_key.to_vec(),
max_key: src.max_key.to_vec(),
max_sequence: src.max_sequence,
time_range: Some(src.time_range.into()),
schema: Some(schema_pb::TableSchema::from(&src.schema)),
filter: src.parquet_filter.map(|v| v.into()),
// collapsible_cols_idx is used in hybrid format ,and it's deprecated.
collapsible_cols_idx: Vec::new(),
column_values,
}
}
}

impl TryFrom<sst_pb::ParquetMetaData> for ParquetMetaData {
type Error = Error;

fn try_from(src: sst_pb::ParquetMetaData) -> Result<Self> {
let time_range = {
let time_range = src.time_range.context(TimeRangeNotFound)?;
TimeRange::try_from(time_range).context(ConvertTimeRange)?
};
let schema = {
let schema = src.schema.context(TableSchemaNotFound)?;
Schema::try_from(schema).context(ConvertTableSchema)?
};
let parquet_filter = src.filter.map(ParquetFilter::try_from).transpose()?;
let column_values = if src.column_values.is_empty() {
// Old version sst don't has this, so set to none.
None
} else {
Some(
src.column_values
.into_iter()
.map(|v| v.value.map(|v| v.into()))
.collect(),
)
};

Ok(Self {
min_key: src.min_key.into(),
max_key: src.max_key.into(),
time_range,
max_sequence: src.max_sequence,
schema,
parquet_filter,
column_values,
})
}
}

#[derive(Debug, PartialEq, Clone)]
pub enum ColumnValueSet {
StringValue(HashSet<String>),
}

impl ColumnValueSet {
pub fn is_empty(&self) -> bool {
match self {
Self::StringValue(sv) => sv.is_empty(),
}
}

pub fn len(&self) -> usize {
match self {
Self::StringValue(sv) => sv.len(),
}
}
}

impl From<ColumnValueSet> for sst_pb::column_value_set::Value {
fn from(value: ColumnValueSet) -> Self {
match value {
ColumnValueSet::StringValue(values) => {
let values = values.into_iter().collect();
sst_pb::column_value_set::Value::StringSet(sst_pb::column_value_set::StringSet {
values,
})
}
}
}
}

impl From<sst_pb::column_value_set::Value> for ColumnValueSet {
fn from(value: sst_pb::column_value_set::Value) -> Self {
match value {
sst_pb::column_value_set::Value::StringSet(ss) => {
ColumnValueSet::StringValue(HashSet::from_iter(ss.values))
}
}
}
}

#[cfg(test)]
mod tests {
use common_types::tests::build_schema;
Expand Down Expand Up @@ -569,8 +351,7 @@ mod tests {
fn test_row_group_filter_builder() {
// (key1(varbinary), key2(timestamp), field1(double), field2(string))
let schema = build_schema();
let record_schema = schema.to_record_schema_with_key();
let mut builders = RowGroupFilterBuilder::new(&record_schema);
let mut builders = RowGroupFilterBuilder::new(&schema);
for key in ["host-123", "host-456", "host-789"] {
builders.add_key(3, key.as_bytes());
}
Expand Down
Loading
Loading