Skip to content

Commit

Permalink
Merge pull request #9648 from BohuTANG/dev-storage-index
Browse files Browse the repository at this point in the history
refactor(storage/index): make the storage/index more index
  • Loading branch information
BohuTANG authored Jan 17, 2023
2 parents e80d713 + 54f8aa7 commit 148fb5d
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 90 deletions.
4 changes: 2 additions & 2 deletions src/query/service/tests/it/storages/fuse/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_storages_fuse::io::WriteSettings;
use common_storages_fuse::FuseStorageFormat;
use opendal::Operator;
use storages_common_blocks::blocks_to_parquet;
use storages_common_index::BlockFilter;
use storages_common_index::BloomIndex;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::ClusterStatistics;
use storages_common_table_meta::meta::Compression;
Expand Down Expand Up @@ -104,7 +104,7 @@ impl<'a> BlockWriter<'a> {
.block_bloom_index_location(&block_id);

let bloom_index =
BlockFilter::try_create(FunctionContext::default(), schema, location.1, &[block])?;
BloomIndex::try_create(FunctionContext::default(), schema, location.1, &[block])?;
if let Some(bloom_index) = bloom_index {
let index_block = bloom_index.filter_block;
let mut data = Vec::with_capacity(DEFAULT_BLOOM_INDEX_WRITE_BUFFER_SIZE);
Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/common/index/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "storages-common-index"
version = "0.1.0"
edition = "2021"
authors = ["Databend Authors <opensource@datafuselabs.com>"]
license = "Apache-2.0"
publish = false
version = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
edition = { workspace = true }

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/common/index/benches/build_from_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use rand::SeedableRng;
use storages_common_index::filters::Filter;
use storages_common_index::filters::FilterBuilder;
use storages_common_index::filters::Xor8Builder;
use storages_common_index::BlockFilter;
use storages_common_index::BloomIndex;

/// Benchmark building BlockFilter from DataBlock.
///
Expand Down Expand Up @@ -102,7 +102,7 @@ fn bench_u64_using_digests(c: &mut Criterion) {

let mut builder = Xor8Builder::create();
let func_ctx = FunctionContext::default();
let col = BlockFilter::calculate_column_digest(
let col = BloomIndex::calculate_column_digest(
func_ctx,
&column,
&DataType::Number(NumberDataType::Int64),
Expand All @@ -124,7 +124,7 @@ fn bench_u64_using_digests(c: &mut Criterion) {
b.iter(|| {
let mut builder = Xor8Builder::create();
let func_ctx = FunctionContext::default();
let col = BlockFilter::calculate_column_digest(
let col = BloomIndex::calculate_column_digest(
func_ctx,
&column,
&DataType::Number(NumberDataType::Int64),
Expand All @@ -144,7 +144,7 @@ fn bench_string_using_digests(c: &mut Criterion) {

let mut builder = Xor8Builder::create();
let func_ctx = FunctionContext::default();
let col = BlockFilter::calculate_column_digest(
let col = BloomIndex::calculate_column_digest(
func_ctx,
&column,
&DataType::String,
Expand All @@ -166,7 +166,7 @@ fn bench_string_using_digests(c: &mut Criterion) {
b.iter(|| {
let mut builder = Xor8Builder::create();
let func_ctx = FunctionContext::default();
let col = BlockFilter::calculate_column_digest(
let col = BloomIndex::calculate_column_digest(
func_ctx,
&column,
&DataType::String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::filters::Filter;
use crate::filters::FilterBuilder;
use crate::filters::Xor8Builder;
use crate::filters::Xor8Filter;
use crate::SupportedType;
use crate::Index;

/// BlockFilter represents multiple per-column filters(bloom filter or xor filter etc) for data block.
///
Expand All @@ -69,7 +69,7 @@ use crate::SupportedType;
/// | 123456789abcd | ac2345bcd |
/// +----------------+--------------+
/// ```
pub struct BlockFilter {
pub struct BloomIndex {
pub func_ctx: FunctionContext,

/// The schema of the source table, which the filter work for.
Expand Down Expand Up @@ -100,7 +100,7 @@ pub enum FilterEvalResult {
Uncertain,
}

impl BlockFilter {
impl BloomIndex {
/// Load a filter directly from the source table's schema and the corresponding filter parquet file.
#[tracing::instrument(level = "debug", skip_all)]
pub fn from_filter_block(
Expand Down Expand Up @@ -137,7 +137,7 @@ impl BlockFilter {
let mut columns = Vec::new();
for i in 0..blocks[0].num_columns() {
let data_type = &blocks[0].get_by_offset(i).data_type;
if Xor8Filter::is_supported_type(data_type) {
if Xor8Filter::supported_type(data_type) {
let source_field = source_schema.field(i);
let return_type = if data_type.is_nullable() {
DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64)))
Expand Down Expand Up @@ -227,7 +227,7 @@ impl BlockFilter {
///
/// Otherwise return `Uncertain`.
#[tracing::instrument(level = "debug", name = "block_filter_index_eval", skip_all)]
pub fn eval(
pub fn apply(
&self,
mut expr: Expr<String>,
scalar_map: &HashMap<Scalar, u64>,
Expand Down Expand Up @@ -303,7 +303,7 @@ impl BlockFilter {
pub fn find_eq_columns(expr: &Expr<String>) -> Result<Vec<(String, Scalar, DataType)>> {
let mut cols = Vec::new();
visit_expr_column_eq_constant(&mut expr.clone(), &mut |_, col_name, scalar, ty, _| {
if Xor8Filter::is_supported_type(ty) && !scalar.is_null() {
if Xor8Filter::supported_type(ty) && !scalar.is_null() {
cols.push((col_name.to_string(), scalar.clone(), ty.clone()));
}
Ok(None)
Expand All @@ -327,7 +327,7 @@ impl BlockFilter {
let filter_column = &Self::build_filter_column_name(column_name);

if !self.filter_schema.has_field(filter_column)
|| !Xor8Filter::is_supported_type(ty)
|| !Xor8Filter::supported_type(ty)
|| target.is_null()
{
// The column doesn't have a filter.
Expand Down
6 changes: 3 additions & 3 deletions src/query/storages/common/index/src/filters/xor8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use xorfilter::Xor8;

use crate::filters::Filter;
use crate::filters::FilterBuilder;
use crate::SupportedType;
use crate::Index;

/// A builder that builds a xor8 filter.
///
Expand Down Expand Up @@ -126,8 +126,8 @@ impl Filter for Xor8Filter {
}
}

impl SupportedType for Xor8Filter {
fn is_supported_type(data_type: &DataType) -> bool {
impl Index for Xor8Filter {
fn supported_type(data_type: &DataType) -> bool {
let inner_type = data_type.remove_nullable();
matches!(
inner_type,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 Datafuse Labs.
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

/// MinMaxIndex is absorbed into the table meta statics
use crate::SupportedType;
use common_expression::types::DataType;

/// Min and Max index.
pub struct MinMaxIndex {}

/// using the default implementation
impl SupportedType for MinMaxIndex {}
pub trait Index {
fn supported_type(data_type: &DataType) -> bool {
// we support nullable column but Nulls are not added into the bloom filter.
let inner_type = data_type.remove_nullable();
matches!(
inner_type,
DataType::Number(_) | DataType::Date | DataType::Timestamp | DataType::String
)
}
}
31 changes: 7 additions & 24 deletions src/query/storages/common/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,12 @@

#![allow(clippy::uninlined_format_args)]

mod bloom;
mod bloom_index;
pub mod filters;
pub mod index_min_max;
pub mod range_filter;
mod index;
mod range_index;

pub use bloom::BlockFilter;
pub use bloom::FilterEvalResult;
use common_expression::types::DataType;
pub use index_min_max::*;
pub use range_filter::*;

#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum IndexSchemaVersion {
V1,
}

pub trait SupportedType {
fn is_supported_type(data_type: &DataType) -> bool {
// we support nullable column but Nulls are not added into the bloom filter.
let inner_type = data_type.remove_nullable();
matches!(
inner_type,
DataType::Number(_) | DataType::Date | DataType::Timestamp | DataType::String
)
}
}
pub use bloom_index::BloomIndex;
pub use bloom_index::FilterEvalResult;
pub use index::Index;
pub use range_index::RangeIndex;
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ use common_functions::scalars::BUILTIN_FUNCTIONS;
use storages_common_table_meta::meta::ColumnStatistics;
use storages_common_table_meta::meta::StatisticsOfColumns;

use crate::Index;

#[derive(Clone)]
pub struct RangeFilter {
pub struct RangeIndex {
expr: Expr<String>,
func_ctx: FunctionContext,
column_indices: HashMap<String, usize>,
}

impl RangeFilter {
impl RangeIndex {
pub fn try_create(
func_ctx: FunctionContext,
exprs: &[Expr<String>],
Expand Down Expand Up @@ -73,7 +75,7 @@ impl RangeFilter {
})
}

pub fn try_eval_const(&self) -> Result<bool> {
pub fn try_apply_const(&self) -> Result<bool> {
// Only return false, which means to skip this block, when the expression is folded to a constant false.
Ok(!matches!(self.expr, Expr::Constant {
scalar: Scalar::Boolean(false),
Expand All @@ -82,7 +84,7 @@ impl RangeFilter {
}

#[tracing::instrument(level = "debug", name = "range_filter_eval", skip_all)]
pub fn eval(&self, stats: &StatisticsOfColumns) -> Result<bool> {
pub fn apply(&self, stats: &StatisticsOfColumns) -> Result<bool> {
let input_domains = self
.expr
.column_refs()
Expand Down Expand Up @@ -159,3 +161,5 @@ fn statistics_to_domain(stat: Option<&ColumnStatistics>, data_type: &DataType) -
_ => Domain::full(data_type),
})
}

impl Index for RangeIndex {}
12 changes: 6 additions & 6 deletions src/query/storages/common/index/tests/it/filters/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use common_expression::TableField;
use common_expression::TableSchema;
use common_expression::Value;
use common_functions::scalars::BUILTIN_FUNCTIONS;
use storages_common_index::BlockFilter;
use storages_common_index::BloomIndex;
use storages_common_index::FilterEvalResult;
use storages_common_table_meta::meta::BlockFilter as LatestBloom;
use storages_common_table_meta::meta::Versioned;
Expand Down Expand Up @@ -66,7 +66,7 @@ fn test_bloom_filter() -> Result<()> {
];
let blocks_ref = blocks.iter().collect::<Vec<_>>();

let index = BlockFilter::try_create(
let index = BloomIndex::try_create(
FunctionContext::default(),
schema,
LatestBloom::VERSION,
Expand Down Expand Up @@ -117,7 +117,7 @@ fn test_bloom_filter() -> Result<()> {
Ok(())
}

fn eval_index(index: &BlockFilter, col_name: &str, val: Scalar, ty: DataType) -> FilterEvalResult {
fn eval_index(index: &BloomIndex, col_name: &str, val: Scalar, ty: DataType) -> FilterEvalResult {
let expr = check_function(
None,
"eq",
Expand All @@ -138,16 +138,16 @@ fn eval_index(index: &BlockFilter, col_name: &str, val: Scalar, ty: DataType) ->
)
.unwrap();

let point_query_cols = BlockFilter::find_eq_columns(&expr).unwrap();
let point_query_cols = BloomIndex::find_eq_columns(&expr).unwrap();

let mut scalar_map = HashMap::<Scalar, u64>::new();
let func_ctx = FunctionContext::default();
for (_, scalar, ty) in point_query_cols.iter() {
if !scalar_map.contains_key(scalar) {
let digest = BlockFilter::calculate_scalar_digest(func_ctx, scalar, ty).unwrap();
let digest = BloomIndex::calculate_scalar_digest(func_ctx, scalar, ty).unwrap();
scalar_map.insert(scalar.clone(), digest);
}
}

index.eval(expr, &scalar_map).unwrap()
index.apply(expr, &scalar_map).unwrap()
}
10 changes: 5 additions & 5 deletions src/query/storages/common/pruner/src/range_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_exception::Result;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_expression::TableSchemaRef;
use storages_common_index::RangeFilter;
use storages_common_index::RangeIndex;
use storages_common_table_meta::meta::StatisticsOfColumns;

pub trait RangePruner {
Expand All @@ -42,9 +42,9 @@ impl RangePruner for KeepFalse {
}
}

impl RangePruner for RangeFilter {
impl RangePruner for RangeIndex {
fn should_keep(&self, stats: &StatisticsOfColumns) -> bool {
match self.eval(stats) {
match self.apply(stats) {
Ok(r) => r,
Err(e) => {
// swallow exceptions intentionally, corrupted index should not prevent execution
Expand All @@ -68,8 +68,8 @@ impl RangePrunerCreator {
) -> Result<Arc<dyn RangePruner + Send + Sync>> {
Ok(match filter_expr {
Some(exprs) if !exprs.is_empty() => {
let range_filter = RangeFilter::try_create(func_ctx, exprs, schema.clone())?;
match range_filter.try_eval_const() {
let range_filter = RangeIndex::try_create(func_ctx, exprs, schema.clone())?;
match range_filter.try_apply_const() {
Ok(v) => {
if v {
Arc::new(range_filter)
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/fuse_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl BloomIndexState {
location: Location,
) -> Result<Option<Self>> {
// write index
let bloom_index = BlockFilter::try_create(
let bloom_index = BloomIndex::try_create(
ctx.try_get_function_context()?,
source_schema,
location.1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_expression::DataBlock;
use common_expression::TableSchemaRef;
use opendal::Operator;
use storages_common_blocks::blocks_to_parquet;
use storages_common_index::BlockFilter;
use storages_common_index::BloomIndex;
use storages_common_table_meta::caches::CacheManager;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::SegmentInfo;
Expand Down Expand Up @@ -232,7 +232,7 @@ impl Processor for CompactTransform {

// build block index.
let func_ctx = self.ctx.try_get_function_context()?;
let bloom_index = BlockFilter::try_create(
let bloom_index = BloomIndex::try_create(
func_ctx,
self.schema.clone(),
block_location.1,
Expand Down
Loading

1 comment on commit 148fb5d

@vercel
Copy link

@vercel vercel bot commented on 148fb5d Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend.rs
databend-databend.vercel.app
databend-git-main-databend.vercel.app

Please sign in to comment.