From ed6a0c7682eede432e39c1d2d43bdcb0a9ff7cf9 Mon Sep 17 00:00:00 2001 From: Jk Xu <54522439+Dousir9@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:24:18 +0800 Subject: [PATCH] chore(query): improve distributed runtime filter (#16862) * chore(query): improve distributed runtime filter * chore(code): refine code * chore(code): refine code * chore(code); fix make lint * chore(query): add scan id to fix same table index * chore(code): make lint * chore(pipeline): fix deserializer runtime filter key --- .../src/plan/datasource/datasource_plan.rs | 1 + src/query/catalog/src/runtime_filter_info.rs | 23 +++++++++ src/query/catalog/src/table_context.rs | 9 ++++ .../interpreter_copy_into_table.rs | 1 + .../pipelines/builders/builder_aggregate.rs | 1 + .../src/pipelines/builders/builder_join.rs | 1 + .../pipelines/builders/builder_recluster.rs | 1 + .../src/pipelines/builders/builder_scan.rs | 2 + .../service/src/pipelines/pipeline_builder.rs | 3 ++ .../transforms/hash_join/build_state.rs | 5 ++ .../hash_join/hash_join_build_state.rs | 51 +++++++++++++++++++ .../src/schedulers/fragments/plan_fragment.rs | 1 + src/query/service/src/sessions/query_ctx.rs | 34 +++++++++++++ .../service/src/sessions/query_ctx_shared.rs | 8 +++ .../tests/it/sql/exec/get_table_bind_test.rs | 17 +++++++ .../it/storages/fuse/operations/commit.rs | 17 +++++++ .../physical_plans/physical_hash_join.rs | 9 +++- .../physical_plans/physical_table_scan.rs | 4 ++ src/query/sql/src/executor/table_read_plan.rs | 1 + src/query/sql/src/planner/binder/table.rs | 8 +++ src/query/sql/src/planner/metadata.rs | 26 +++++++++- .../optimizer/decorrelate/flatten_plan.rs | 1 + src/query/sql/src/planner/plans/scan.rs | 2 + .../read/parquet_data_source_deserializer.rs | 41 +++++++++++++++ 24 files changed, 265 insertions(+), 2 deletions(-) diff --git a/src/query/catalog/src/plan/datasource/datasource_plan.rs b/src/query/catalog/src/plan/datasource/datasource_plan.rs index 649c87da63bac..a8d56c2bacaef 100644 --- a/src/query/catalog/src/plan/datasource/datasource_plan.rs +++ b/src/query/catalog/src/plan/datasource/datasource_plan.rs @@ -47,6 +47,7 @@ pub struct DataSourcePlan { pub data_mask_policy: Option>, pub table_index: usize, + pub scan_id: usize, } impl DataSourcePlan { diff --git a/src/query/catalog/src/runtime_filter_info.rs b/src/query/catalog/src/runtime_filter_info.rs index 7c29f193879f0..5caeae4090789 100644 --- a/src/query/catalog/src/runtime_filter_info.rs +++ b/src/query/catalog/src/runtime_filter_info.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_base::base::tokio::sync::watch; +use databend_common_base::base::tokio::sync::watch::Receiver; +use databend_common_base::base::tokio::sync::watch::Sender; use databend_common_expression::Expr; use xorf::BinaryFuse16; @@ -62,4 +65,24 @@ impl RuntimeFilterInfo { pub fn is_empty(&self) -> bool { self.inlist.is_empty() && self.bloom.is_empty() && self.min_max.is_empty() } + + pub fn is_blooms_empty(&self) -> bool { + self.bloom.is_empty() + } +} + +pub struct RuntimeFilterReady { + pub runtime_filter_watcher: Sender>, + /// A dummy receiver to make runtime_filter_watcher channel open. + pub _runtime_filter_dummy_receiver: Receiver>, +} + +impl Default for RuntimeFilterReady { + fn default() -> Self { + let (watcher, dummy_receiver) = watch::channel(None); + Self { + runtime_filter_watcher: watcher, + _runtime_filter_dummy_receiver: dummy_receiver, + } + } } diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 6683f176875a7..9ff28e69347ee 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -74,6 +74,7 @@ use crate::plan::PartInfoPtr; use crate::plan::Partitions; use crate::query_kind::QueryKind; use crate::runtime_filter_info::RuntimeFilterInfo; +use crate::runtime_filter_info::RuntimeFilterReady; use crate::statistics::data_cache_statistics::DataCacheMetrics; use crate::table::Table; @@ -317,6 +318,14 @@ pub trait TableContext: Send + Sync { fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo)); + fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc); + + fn get_runtime_filter_ready(&self, table_index: usize) -> Vec>; + + fn set_wait_runtime_filter(&self, table_index: usize, need_to_wait: bool); + + fn get_wait_runtime_filter(&self, table_index: usize) -> bool; + fn clear_runtime_filter(&self); fn set_merge_into_join(&self, join: MergeIntoJoin); diff --git a/src/query/service/src/interpreters/interpreter_copy_into_table.rs b/src/query/service/src/interpreters/interpreter_copy_into_table.rs index e03d5500c87ce..ff1894c9db06a 100644 --- a/src/query/service/src/interpreters/interpreter_copy_into_table.rs +++ b/src/query/service/src/interpreters/interpreter_copy_into_table.rs @@ -137,6 +137,7 @@ impl CopyIntoTableInterpreter { ( CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan { plan_id: 0, + scan_id: 0, name_mapping, stat_info: None, table_index: None, diff --git a/src/query/service/src/pipelines/builders/builder_aggregate.rs b/src/query/service/src/pipelines/builders/builder_aggregate.rs index bdd562923c8c4..9c16033ae25d4 100644 --- a/src/query/service/src/pipelines/builders/builder_aggregate.rs +++ b/src/query/service/src/pipelines/builders/builder_aggregate.rs @@ -97,6 +97,7 @@ impl PipelineBuilder { } pub(crate) fn build_aggregate_partial(&mut self, aggregate: &AggregatePartial) -> Result<()> { + self.contain_sink_processor = true; self.build_pipeline(&aggregate.input)?; let max_block_size = self.settings.get_max_block_size()?; diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index fd5ef8c2f906a..07289a4ae51df 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -163,6 +163,7 @@ impl PipelineBuilder { join_state.clone(), output_len, )?; + build_state.add_runtime_filter_ready(); let create_sink_processor = |input| { Ok(ProcessorPtr::create(TransformHashJoinBuild::try_create( diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 0690db51ebf4a..dfc414490f9bb 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -68,6 +68,7 @@ impl PipelineBuilder { update_stream_columns: table.change_tracking_enabled(), data_mask_policy: None, table_index: usize::MAX, + scan_id: usize::MAX, }; { diff --git a/src/query/service/src/pipelines/builders/builder_scan.rs b/src/query/service/src/pipelines/builders/builder_scan.rs index 2ba6f9ce135ef..4e85b4e6542e0 100644 --- a/src/query/service/src/pipelines/builders/builder_scan.rs +++ b/src/query/service/src/pipelines/builders/builder_scan.rs @@ -41,6 +41,8 @@ impl PipelineBuilder { pub(crate) fn build_table_scan(&mut self, scan: &TableScan) -> Result<()> { let table = self.ctx.build_table_from_source_plan(&scan.source)?; self.ctx.set_partitions(scan.source.parts.clone())?; + self.ctx + .set_wait_runtime_filter(scan.scan_id, self.contain_sink_processor); table.read_data( self.ctx.clone(), &scan.source, diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 653324ffbdab3..12e8463ee028c 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -60,6 +60,8 @@ pub struct PipelineBuilder { pub r_cte_scan_interpreters: Vec, pub(crate) is_exchange_neighbor: bool, + + pub contain_sink_processor: bool, } impl PipelineBuilder { @@ -83,6 +85,7 @@ impl PipelineBuilder { hash_join_states: HashMap::new(), r_cte_scan_interpreters: vec![], is_exchange_neighbor: false, + contain_sink_processor: false, } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/build_state.rs index bddb87c0cf2a4..59dbf495053f9 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/build_state.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_expression::types::DataType; use databend_common_expression::ColumnVec; use databend_common_expression::DataBlock; @@ -23,6 +26,7 @@ pub struct BuildState { pub(crate) outer_scan_map: Vec>, /// LeftMarkScan map, initialized at `HashJoinBuildState`, used in `HashJoinProbeState` pub(crate) mark_scan_map: Vec>, + pub(crate) runtime_filter_ready: Vec>, } impl BuildState { @@ -31,6 +35,7 @@ impl BuildState { generation_state: BuildBlockGenerationState::new(), outer_scan_map: Vec::new(), mark_scan_map: Vec::new(), + runtime_filter_ready: Vec::new(), } } } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs index 3af426c9401ac..54e6e73e52368 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_build_state.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_base::base::tokio::sync::Barrier; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; +use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -331,6 +332,7 @@ impl HashJoinBuildState { .build_watcher .send(HashTableType::Empty) .map_err(|_| ErrorCode::TokioError("build_watcher channel is closed"))?; + self.set_bloom_filter_ready(false)?; return Ok(()); } @@ -348,6 +350,8 @@ impl HashJoinBuildState { // If spilling happened, skip adding runtime filter, because probe data is ready and spilled. if self.hash_join_state.spilled_partitions.read().is_empty() { self.add_runtime_filter(&build_chunks, build_num_rows)?; + } else { + self.set_bloom_filter_ready(false)?; } // Divide the finalize phase into multiple tasks. @@ -848,7 +852,52 @@ impl HashJoinBuildState { Ok(()) } + pub fn add_runtime_filter_ready(&self) { + if self.ctx.get_cluster().is_empty() { + return; + } + + let mut wait_runtime_filter_table_indexes = HashSet::new(); + for (build_key, probe_key, table_index) in self + .hash_join_state + .hash_join_desc + .build_keys + .iter() + .zip(self.hash_join_state.hash_join_desc.probe_keys_rt.iter()) + .filter_map(|(b, p)| p.as_ref().map(|(p, index)| (b, p, index))) + { + if !build_key.data_type().remove_nullable().is_numeric() + && !build_key.data_type().remove_nullable().is_string() + { + continue; + } + if let Expr::ColumnRef { .. } = probe_key { + wait_runtime_filter_table_indexes.insert(*table_index); + } + } + + let build_state = unsafe { &mut *self.hash_join_state.build_state.get() }; + let runtime_filter_ready = &mut build_state.runtime_filter_ready; + for table_index in wait_runtime_filter_table_indexes.into_iter() { + let ready = Arc::new(RuntimeFilterReady::default()); + runtime_filter_ready.push(ready.clone()); + self.ctx.set_runtime_filter_ready(table_index, ready); + } + } + + pub fn set_bloom_filter_ready(&self, ready: bool) -> Result<()> { + let build_state = unsafe { &mut *self.hash_join_state.build_state.get() }; + for runtime_filter_ready in build_state.runtime_filter_ready.iter() { + runtime_filter_ready + .runtime_filter_watcher + .send(Some(ready)) + .map_err(|_| ErrorCode::TokioError("watcher channel is closed"))?; + } + Ok(()) + } + fn add_runtime_filter(&self, build_chunks: &[DataBlock], build_num_rows: usize) -> Result<()> { + let mut bloom_filter_ready = false; for (build_key, probe_key, table_index) in self .hash_join_state .hash_join_desc @@ -879,9 +928,11 @@ impl HashJoinBuildState { )?; } if !runtime_filter.is_empty() { + bloom_filter_ready |= !runtime_filter.is_blooms_empty(); self.ctx.set_runtime_filter((*table_index, runtime_filter)); } } + self.set_bloom_filter_ready(bloom_filter_ready)?; Ok(()) } diff --git a/src/query/service/src/schedulers/fragments/plan_fragment.rs b/src/query/service/src/schedulers/fragments/plan_fragment.rs index db415a5bc29c9..da76a11358dfe 100644 --- a/src/query/service/src/schedulers/fragments/plan_fragment.rs +++ b/src/query/service/src/schedulers/fragments/plan_fragment.rs @@ -509,6 +509,7 @@ impl PhysicalPlanReplacer for ReplaceReadSource { Ok(PhysicalPlan::TableScan(TableScan { plan_id: plan.plan_id, + scan_id: plan.scan_id, source: Box::new(source), name_mapping: plan.name_mapping.clone(), table_index: plan.table_index, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 30f64ea683775..c5822b6da492c 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -50,6 +50,7 @@ use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::StageTableInfo; use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; +use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table_args::TableArgs; use databend_common_catalog::table_context::ContextError; @@ -1177,6 +1178,39 @@ impl TableContext for QueryContext { } } + fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc) { + let mut runtime_filter_ready = self.shared.runtime_filter_ready.write(); + match runtime_filter_ready.entry(table_index) { + Entry::Vacant(v) => { + v.insert(vec![ready]); + } + Entry::Occupied(mut v) => { + v.get_mut().push(ready); + } + } + } + + fn get_runtime_filter_ready(&self, table_index: usize) -> Vec> { + let runtime_filter_ready = self.shared.runtime_filter_ready.read(); + match runtime_filter_ready.get(&table_index) { + Some(v) => v.to_vec(), + None => vec![], + } + } + + fn set_wait_runtime_filter(&self, table_index: usize, need_to_wait: bool) { + let mut wait_runtime_filter = self.shared.wait_runtime_filter.write(); + wait_runtime_filter.insert(table_index, need_to_wait); + } + + fn get_wait_runtime_filter(&self, table_index: usize) -> bool { + let wait_runtime_filter = self.shared.wait_runtime_filter.read(); + match wait_runtime_filter.get(&table_index) { + Some(v) => *v, + None => false, + } + } + fn get_merge_into_join(&self) -> MergeIntoJoin { let merge_into_join = self.shared.merge_into_join.read(); MergeIntoJoin { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 547babc3b6061..145057beb63e5 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -31,6 +31,7 @@ use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::merge_into_join::MergeIntoJoin; use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; +use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table_context::ContextError; use databend_common_catalog::table_context::MaterializedCtesBlocks; @@ -132,6 +133,11 @@ pub struct QueryContextShared { pub(in crate::sessions) runtime_filters: Arc>>, + pub(in crate::sessions) runtime_filter_ready: + Arc>>>>, + + pub(in crate::sessions) wait_runtime_filter: Arc>>, + pub(in crate::sessions) merge_into_join: Arc>, // Records query level data cache metrics @@ -189,6 +195,8 @@ impl QueryContextShared { query_cache_metrics: DataCacheMetrics::new(), query_profiles: Arc::new(RwLock::new(HashMap::new())), runtime_filters: Default::default(), + runtime_filter_ready: Default::default(), + wait_runtime_filter: Default::default(), merge_into_join: Default::default(), multi_table_insert_status: Default::default(), query_queued_duration: Arc::new(RwLock::new(Duration::from_secs(0))), diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 9bd031cf55985..16a8824240a89 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -33,6 +33,7 @@ use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; +use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::ContextError; @@ -928,6 +929,22 @@ impl TableContext for CtxDelegation { todo!() } + fn set_runtime_filter_ready(&self, _table_index: usize, _ready: Arc) { + todo!() + } + + fn get_runtime_filter_ready(&self, _table_index: usize) -> Vec> { + todo!() + } + + fn set_wait_runtime_filter(&self, _table_index: usize, _need_to_wait: bool) { + todo!() + } + + fn get_wait_runtime_filter(&self, _table_index: usize) -> bool { + todo!() + } + fn clear_runtime_filter(&self) { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 3453e4d96647d..94b3f39457c9e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -32,6 +32,7 @@ use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Partitions; use databend_common_catalog::query_kind::QueryKind; use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo; +use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::ContextError; @@ -808,6 +809,22 @@ impl TableContext for CtxDelegation { todo!() } + fn set_runtime_filter_ready(&self, _table_index: usize, _ready: Arc) { + todo!() + } + + fn get_runtime_filter_ready(&self, _table_index: usize) -> Vec> { + todo!() + } + + fn set_wait_runtime_filter(&self, _table_index: usize, _need_to_wait: bool) { + todo!() + } + + fn get_wait_runtime_filter(&self, _table_index: usize) -> bool { + todo!() + } + fn clear_runtime_filter(&self) { todo!() } diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index 0d947551bd4bb..641648cce95ee 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -215,6 +215,7 @@ impl PhysicalPlanBuilder { let mut left_join_conditions_rt = Vec::new(); let mut probe_to_build_index = Vec::new(); let mut table_index = None; + let mut scan_id = None; for condition in join.equi_conditions.iter() { let left_condition = &condition.left; let right_condition = &condition.right; @@ -243,13 +244,19 @@ impl PhysicalPlanBuilder { .table_index() .unwrap(), ); + scan_id = Some( + self.metadata + .read() + .base_column_scan_id(*column_idx) + .unwrap(), + ); } Some(( left_condition .as_raw_expr() .type_check(&*self.metadata.read())? .project_column_ref(|col| col.column_name.clone()), - table_index.unwrap(), + scan_id.unwrap(), )) } else { None diff --git a/src/query/sql/src/executor/physical_plans/physical_table_scan.rs b/src/query/sql/src/executor/physical_plans/physical_table_scan.rs index 09d3bc58b6a4b..891c892f7531f 100644 --- a/src/query/sql/src/executor/physical_plans/physical_table_scan.rs +++ b/src/query/sql/src/executor/physical_plans/physical_table_scan.rs @@ -71,6 +71,7 @@ use crate::DUMMY_TABLE_INDEX; pub struct TableScan { // A unique id of operator in a `PhysicalPlan` tree, only used for display. pub plan_id: u32, + pub scan_id: usize, pub name_mapping: BTreeMap, pub source: Box, pub internal_column: Option>, @@ -263,6 +264,7 @@ impl PhysicalPlanBuilder { } } source.table_index = scan.table_index; + source.scan_id = scan.scan_id; if let Some(agg_index) = &scan.agg_index { let source_schema = source.schema(); let push_down = source.push_downs.as_mut().unwrap(); @@ -283,6 +285,7 @@ impl PhysicalPlanBuilder { let mut plan = PhysicalPlan::TableScan(TableScan { plan_id: 0, + scan_id: scan.scan_id, name_mapping, source: Box::new(source), table_index: Some(scan.table_index), @@ -319,6 +322,7 @@ impl PhysicalPlanBuilder { .await?; Ok(PhysicalPlan::TableScan(TableScan { plan_id: 0, + scan_id: DUMMY_TABLE_INDEX, name_mapping: BTreeMap::from([("dummy".to_string(), DUMMY_COLUMN_INDEX)]), source: Box::new(source), table_index: Some(DUMMY_TABLE_INDEX), diff --git a/src/query/sql/src/executor/table_read_plan.rs b/src/query/sql/src/executor/table_read_plan.rs index 83a39b4e286c3..2d401e806503a 100644 --- a/src/query/sql/src/executor/table_read_plan.rs +++ b/src/query/sql/src/executor/table_read_plan.rs @@ -281,6 +281,7 @@ impl ToReadDataSourcePlan for dyn Table { data_mask_policy, // Set a dummy id, will be set real id later table_index: usize::MAX, + scan_id: usize::MAX, }) } } diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index c8136d07d0891..45c646fbc2c8d 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::collections::HashMap; use std::default::Default; use std::sync::Arc; @@ -455,6 +456,8 @@ impl Binder { let table = self.metadata.read().table(table_index).clone(); let table_name = table.name(); let columns = self.metadata.read().columns_by_table_index(table_index); + let scan_id = self.metadata.write().next_scan_id(); + let mut base_column_scan_id = HashMap::new(); for column in columns.iter() { match column { ColumnEntry::BaseTableColumn(BaseTableColumn { @@ -484,6 +487,7 @@ impl Binder { .virtual_computed_expr(virtual_computed_expr.clone()) .build(); bind_context.add_column_binding(column_binding); + base_column_scan_id.insert(*column_index, scan_id); } other => { return Err(ErrorCode::Internal(format!( @@ -494,6 +498,9 @@ impl Binder { } } } + self.metadata + .write() + .add_base_column_scan_id(base_column_scan_id); Ok(( SExpr::create_leaf(Arc::new( @@ -503,6 +510,7 @@ impl Binder { statistics: Arc::new(Statistics::default()), change_type, sample: sample.clone(), + scan_id, ..Default::default() } .into(), diff --git a/src/query/sql/src/planner/metadata.rs b/src/query/sql/src/planner/metadata.rs index 7ddce096658de..042766c848d55 100644 --- a/src/query/sql/src/planner/metadata.rs +++ b/src/query/sql/src/planner/metadata.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use ahash::HashMap; use databend_common_ast::ast::Expr; use databend_common_ast::ast::Literal; use databend_common_catalog::plan::DataSourcePlan; @@ -74,6 +74,11 @@ pub struct Metadata { table_row_id_index: HashMap, agg_indexes: HashMap>, max_column_position: usize, // for CSV + + /// Scan id of each scan operator. + next_scan_id: usize, + /// Mappings from base column index to scan id. + base_column_scan_id: HashMap, } impl Metadata { @@ -462,9 +467,24 @@ impl Metadata { pub fn set_max_column_position(&mut self, max_pos: usize) { self.max_column_position = max_pos } + pub fn get_max_column_position(&self) -> usize { self.max_column_position } + + pub fn next_scan_id(&mut self) -> usize { + let next_scan_id = self.next_scan_id; + self.next_scan_id += 1; + next_scan_id + } + + pub fn add_base_column_scan_id(&mut self, base_column_scan_id: HashMap) { + self.base_column_scan_id.extend(base_column_scan_id); + } + + pub fn base_column_scan_id(&self, column_index: usize) -> Option { + self.base_column_scan_id.get(&column_index).cloned() + } } #[derive(Clone)] @@ -569,6 +589,10 @@ impl TableEntry { pub fn is_consume(&self) -> bool { self.consume } + + pub fn update_table_index(&mut self, table_index: IndexType) { + self.index = table_index; + } } #[derive(Clone, Debug)] diff --git a/src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs b/src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs index 3d18059714edd..c208878c78fda 100644 --- a/src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs +++ b/src/query/sql/src/planner/optimizer/decorrelate/flatten_plan.rs @@ -108,6 +108,7 @@ impl SubqueryRewriter { Scan { table_index, columns: scan_columns, + scan_id: metadata.next_scan_id(), ..Default::default() } .into(), diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index 72124bf43945d..7a7c8f2d18e36 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -107,6 +107,7 @@ pub struct Scan { // Lazy row fetch. pub is_lazy_table: bool, pub sample: Option, + pub scan_id: usize, pub statistics: Arc, } @@ -147,6 +148,7 @@ impl Scan { inverted_index: self.inverted_index.clone(), is_lazy_table: self.is_lazy_table, sample: self.sample.clone(), + scan_id: self.scan_id, } } diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs index c4c6d9d916f63..d58dffde4a7ab 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs @@ -25,7 +25,9 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::runtime_filter_info::RuntimeFilterReady; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::BlockMetaInfoDowncast; @@ -56,6 +58,7 @@ use crate::operations::read::runtime_filter_prunner::update_bitmap_with_bloom_fi pub struct DeserializeDataTransform { ctx: Arc, table_index: IndexType, + scan_id: usize, scan_progress: Arc, block_reader: Arc, @@ -74,6 +77,8 @@ pub struct DeserializeDataTransform { cached_runtime_filter: Option>, // for merge_into target build. need_reserve_block_info: bool, + need_wait_runtime_filter: bool, + runtime_filter_ready: Option>, } unsafe impl Send for DeserializeDataTransform {} @@ -89,6 +94,8 @@ impl DeserializeDataTransform { virtual_reader: Arc>, ) -> Result { let scan_progress = ctx.get_scan_progress(); + let need_wait_runtime_filter = + !ctx.get_cluster().is_empty() && ctx.get_wait_runtime_filter(plan.scan_id); let mut src_schema: DataSchema = (block_reader.schema().as_ref()).into(); if let Some(virtual_reader) = virtual_reader.as_ref() { @@ -110,6 +117,7 @@ impl DeserializeDataTransform { Ok(ProcessorPtr::create(Box::new(DeserializeDataTransform { ctx, table_index: plan.table_index, + scan_id: plan.scan_id, scan_progress, block_reader, input, @@ -124,6 +132,8 @@ impl DeserializeDataTransform { base_block_ids: plan.base_block_ids.clone(), cached_runtime_filter: None, need_reserve_block_info, + need_wait_runtime_filter, + runtime_filter_ready: None, }))) } @@ -169,6 +179,20 @@ impl DeserializeDataTransform { Ok(None) } } + + fn need_wait_runtime_filter(&mut self) -> bool { + if !self.need_wait_runtime_filter { + return false; + } + self.need_wait_runtime_filter = false; + let runtime_filter_ready = self.ctx.get_runtime_filter_ready(self.scan_id); + if runtime_filter_ready.len() == 1 { + self.runtime_filter_ready = Some(runtime_filter_ready[0].clone()); + true + } else { + false + } + } } #[async_trait::async_trait] @@ -182,6 +206,10 @@ impl Processor for DeserializeDataTransform { } fn event(&mut self) -> Result { + if self.need_wait_runtime_filter() { + return Ok(Event::Async); + } + if self.output.is_finished() { self.input.finish(); return Ok(Event::Finished); @@ -326,4 +354,17 @@ impl Processor for DeserializeDataTransform { Ok(()) } + + #[async_backtrace::framed] + async fn async_process(&mut self) -> Result<()> { + let runtime_filter_ready = self.runtime_filter_ready.as_mut().unwrap(); + let mut rx = runtime_filter_ready.runtime_filter_watcher.subscribe(); + if (*rx.borrow()).is_some() { + return Ok(()); + } + rx.changed() + .await + .map_err(|_| ErrorCode::TokioError("watcher's sender is dropped"))?; + Ok(()) + } }