Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rebuild missing bloom index while pruning #15738

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/query/ee/tests/it/inverted_index/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn apply_block_pruning(
let segment_locs = table_snapshot.segments.clone();
let segment_locs = create_segment_location_vector(segment_locs, None);

FusePruner::create(&ctx, dal, schema, push_down, bloom_index_cols)?
FusePruner::create(&ctx, dal, schema, push_down, bloom_index_cols, None)?
.read_pruning(segment_locs)
.await
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/storages/fuse/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn apply_block_pruning(
let ctx: Arc<dyn TableContext> = ctx;
let segment_locs = table_snapshot.segments.clone();
let segment_locs = create_segment_location_vector(segment_locs, None);
FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols)?
FusePruner::create(&ctx, op, schema, push_down, bloom_index_cols, None)?
.read_pruning(segment_locs)
.await
.map(|v| v.into_iter().map(|(_, v)| v).collect())
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,13 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),

("enable_auto_fix_missing_bloom_index", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables auto fix missing bloom index",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("max_vacuum_temp_files_after_query", DefaultSettingValue {
value: UserSettingValue::UInt64(u64::MAX),
desc: "The maximum temp files will be removed after query. please enable vacuum feature. disable if 0",
Expand Down
5 changes: 5 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ impl Settings {
pub fn get_enable_clickhouse_handler(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_clickhouse_handler")? != 0)
}

pub fn get_enable_auto_fix_missing_bloom_index(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_auto_fix_missing_bloom_index")? != 0)
}

// Get max_block_size.
pub fn get_max_block_size(&self) -> Result<u64> {
self.try_get_u64("max_block_size")
Expand Down
6 changes: 6 additions & 0 deletions src/query/storages/common/index/src/bloom_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use databend_storages_common_table_meta::meta::Versioned;
use parquet::format::FileMetaData;

use crate::filters::BlockBloomFilterIndexVersion;
use crate::filters::BlockFilter;
use crate::filters::Filter;
use crate::filters::FilterBuilder;
use crate::filters::V2BloomBlock;
Expand Down Expand Up @@ -185,6 +186,11 @@ impl BloomIndex {
data_blocks_tobe_indexed: &[&DataBlock],
bloom_columns_map: BTreeMap<FieldIndex, TableField>,
) -> Result<Option<Self>> {
// TODO refactor :
// if only current version is allowed, just use the current version
// instead of passing it in
assert_eq!(version, BlockFilter::VERSION);

if data_blocks_tobe_indexed.is_empty() {
return Err(ErrorCode::BadArguments("block is empty"));
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub use write::write_data;
pub use write::BlockBuilder;
pub use write::BlockSerialization;
pub use write::BlockWriter;
pub use write::BloomIndexBuilder;
pub use write::BloomIndexState;
pub use write::CachedMetaWriter;
pub use write::InvertedIndexBuilder;
pub use write::InvertedIndexWriter;
Expand Down
111 changes: 91 additions & 20 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::time::Instant;
use chrono::Utc;
use databend_common_arrow::arrow::chunk::Chunk as ArrowChunk;
use databend_common_arrow::native::write::NativeWriter;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::ColumnId;
Expand Down Expand Up @@ -51,7 +52,9 @@ use log::info;
use opendal::Operator;

use crate::io::write::WriteSettings;
use crate::io::BlockReader;
use crate::io::InvertedIndexWriter;
use crate::io::ReadSettings;
use crate::io::TableMetaLocationGenerator;
use crate::operations::column_parquet_metas;
use crate::statistics::gen_columns_statistics;
Expand Down Expand Up @@ -126,8 +129,93 @@ pub struct BloomIndexState {
pub(crate) column_distinct_count: HashMap<FieldIndex, usize>,
}

pub struct BloomIndexBuilder {
pub table_ctx: Arc<dyn TableContext>,
pub table_schema: TableSchemaRef,
pub table_dal: Operator,
pub storage_format: FuseStorageFormat,
pub bloom_columns_map: BTreeMap<FieldIndex, TableField>,
}

impl BloomIndexBuilder {
pub fn bloom_index_state_from_data_block(
&self,
block: &DataBlock,
bloom_location: Location,
) -> Result<Option<(BloomIndexState, BloomIndex)>> {
let maybe_bloom_index = BloomIndex::try_create(
self.table_ctx.get_function_context()?,
bloom_location.1,
&[block],
self.bloom_columns_map.clone(),
)?;

match maybe_bloom_index {
None => Ok(None),
Some(bloom_index) => Ok(Some((
BloomIndexState::from_bloom_index(&bloom_index, bloom_location)?,
bloom_index,
))),
}
}

pub async fn bloom_index_state_from_block_meta(
&self,
block_meta: &BlockMeta,
) -> Result<Option<(BloomIndexState, BloomIndex)>> {
let ctx = self.table_ctx.clone();

// the caller should not pass a block meta without a bloom index location here.
assert!(block_meta.bloom_filter_index_location.is_some());

let projection =
Projection::Columns((0..self.table_schema.fields().len()).collect::<Vec<usize>>());

let block_reader = BlockReader::create(
ctx,
self.table_dal.clone(),
self.table_schema.clone(),
projection,
false,
false,
false,
)?;

let settings = ReadSettings::from_ctx(&self.table_ctx)?;

let data_block = block_reader
.read_by_meta(&settings, block_meta, &self.storage_format)
.await?;

self.bloom_index_state_from_data_block(
&data_block,
block_meta.bloom_filter_index_location.clone().unwrap(),
)
}
}

impl BloomIndexState {
pub fn try_create(
pub fn from_bloom_index(bloom_index: &BloomIndex, location: Location) -> Result<Self> {
let index_block = bloom_index.serialize_to_data_block()?;
let filter_schema = &bloom_index.filter_schema;
let column_distinct_count = bloom_index.column_distinct_count.clone();
let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE);
let index_block_schema = filter_schema;
let _ = blocks_to_parquet(
index_block_schema,
vec![index_block],
&mut data,
TableCompression::None,
)?;
let data_size = data.len() as u64;
Ok(Self {
data,
size: data_size,
location,
column_distinct_count,
})
}
pub fn from_data_block(
ctx: Arc<dyn TableContext>,
block: &DataBlock,
location: Location,
Expand All @@ -141,24 +229,7 @@ impl BloomIndexState {
bloom_columns_map,
)?;
if let Some(bloom_index) = maybe_bloom_index {
let index_block = bloom_index.serialize_to_data_block()?;
let filter_schema = bloom_index.filter_schema;
let column_distinct_count = bloom_index.column_distinct_count;
let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE);
let index_block_schema = &filter_schema;
let _ = blocks_to_parquet(
index_block_schema,
vec![index_block],
&mut data,
TableCompression::None,
)?;
let data_size = data.len() as u64;
Ok(Some(Self {
data,
size: data_size,
location,
column_distinct_count,
}))
Ok(Some(Self::from_bloom_index(&bloom_index, location)?))
} else {
Ok(None)
}
Expand Down Expand Up @@ -276,7 +347,7 @@ impl BlockBuilder {
let (block_location, block_id) = self.meta_locations.gen_block_location();

let bloom_index_location = self.meta_locations.block_bloom_index_location(&block_id);
let bloom_index_state = BloomIndexState::try_create(
let bloom_index_state = BloomIndexState::from_data_block(
self.ctx.clone(),
&data_block,
bloom_index_location,
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/io/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub use block_writer::write_data;
pub use block_writer::BlockBuilder;
pub use block_writer::BlockSerialization;
pub use block_writer::BlockWriter;
pub use block_writer::BloomIndexBuilder;
pub use block_writer::BloomIndexState;
pub use block_writer::InvertedIndexBuilder;
pub(crate) use inverted_index_writer::create_index_schema;
pub(crate) use inverted_index_writer::create_tokenizer_manager;
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ impl FuseTable {
cluster_key_meta,
cluster_keys,
bloom_index_cols,
None,
)?;

let block_metas = pruner.stream_pruning(blocks).await?;
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl FuseTable {
self.schema_with_stream(),
&push_down,
self.bloom_index_cols(),
None,
)?;

if let Some(inverse) = filters.map(|f| f.inverted_filter) {
Expand Down
25 changes: 25 additions & 0 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use databend_common_sql::field_default_value;
use databend_common_storage::ColumnNodes;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache_manager::CachedObject;
use databend_storages_common_index::BloomIndex;
use databend_storages_common_pruner::BlockMetaIndex;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ColumnStatistics;
Expand All @@ -43,6 +44,7 @@ use sha2::Digest;
use sha2::Sha256;

use crate::fuse_part::FuseBlockPartInfo;
use crate::io::BloomIndexBuilder;
use crate::pruning::create_segment_location_vector;
use crate::pruning::FusePruner;
use crate::pruning::SegmentLocation;
Expand Down Expand Up @@ -168,13 +170,35 @@ impl FuseTable {
}
}

let bloom_index_builder = if ctx
.get_settings()
.get_enable_auto_fix_missing_bloom_index()?
{
let storage_format = self.storage_format;

let bloom_columns_map = self
.bloom_index_cols()
.bloom_index_fields(table_schema.clone(), BloomIndex::supported_type)?;

Some(BloomIndexBuilder {
table_ctx: ctx.clone(),
table_schema: table_schema.clone(),
table_dal: dal.clone(),
storage_format,
bloom_columns_map,
})
} else {
None
};

let mut pruner = if !self.is_native() || self.cluster_key_meta.is_none() {
FusePruner::create(
&ctx,
dal,
table_schema.clone(),
&push_downs,
self.bloom_index_cols(),
bloom_index_builder,
)?
} else {
let cluster_keys = self.cluster_keys(ctx.clone());
Expand All @@ -187,6 +211,7 @@ impl FuseTable {
self.cluster_key_meta.clone(),
cluster_keys,
self.bloom_index_cols(),
bloom_index_builder,
)?
};
let block_metas = pruner.read_pruning(segments_location).await?;
Expand Down
3 changes: 3 additions & 0 deletions src/query/storages/fuse/src/operations/recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ impl FuseTable {
v
};

// during re-cluster, we do not rebuild missing bloom index
let bloom_index_builder = None;
// Only use push_down here.
let pruning_ctx = PruningContext::try_create(
ctx,
Expand All @@ -208,6 +210,7 @@ impl FuseTable {
vec![],
BloomIndexColumns::None,
max_concurrency,
bloom_index_builder,
)?;

let segment_pruner = SegmentPruner::create(pruning_ctx.clone(), schema)?;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/pruning/block_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl BlockPruner {
pruning_stats.set_blocks_bloom_pruning_before(1);
}
let keep = bloom_pruner
.should_keep(&index_location, index_size, &block_meta.col_stats, column_ids)
.should_keep(&index_location, index_size, &block_meta.col_stats, column_ids, &block_meta)
.await
&& limit_pruner.within_limit(row_count);
if keep {
Expand Down
Loading
Loading