Skip to content

Commit

Permalink
chore(query): improve distributed runtime filter (#16862)
Browse files Browse the repository at this point in the history
* chore(query): improve distributed runtime filter

* chore(code): refine code

* chore(code): refine code

* chore(code); fix make lint

* chore(query): add scan id to fix same table index

* chore(code): make lint

* chore(pipeline): fix deserializer runtime filter key
  • Loading branch information
Dousir9 authored Nov 18, 2024
1 parent 1d55d57 commit ed6a0c7
Show file tree
Hide file tree
Showing 24 changed files with 265 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/query/catalog/src/plan/datasource/datasource_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct DataSourcePlan {
pub data_mask_policy: Option<BTreeMap<FieldIndex, RemoteExpr>>,

pub table_index: usize,
pub scan_id: usize,
}

impl DataSourcePlan {
Expand Down
23 changes: 23 additions & 0 deletions src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_base::base::tokio::sync::watch;
use databend_common_base::base::tokio::sync::watch::Receiver;
use databend_common_base::base::tokio::sync::watch::Sender;
use databend_common_expression::Expr;
use xorf::BinaryFuse16;

Expand Down Expand Up @@ -62,4 +65,24 @@ impl RuntimeFilterInfo {
pub fn is_empty(&self) -> bool {
self.inlist.is_empty() && self.bloom.is_empty() && self.min_max.is_empty()
}

pub fn is_blooms_empty(&self) -> bool {
self.bloom.is_empty()
}
}

pub struct RuntimeFilterReady {
pub runtime_filter_watcher: Sender<Option<bool>>,
/// A dummy receiver to make runtime_filter_watcher channel open.
pub _runtime_filter_dummy_receiver: Receiver<Option<bool>>,
}

impl Default for RuntimeFilterReady {
fn default() -> Self {
let (watcher, dummy_receiver) = watch::channel(None);
Self {
runtime_filter_watcher: watcher,
_runtime_filter_dummy_receiver: dummy_receiver,
}
}
}
9 changes: 9 additions & 0 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use crate::plan::PartInfoPtr;
use crate::plan::Partitions;
use crate::query_kind::QueryKind;
use crate::runtime_filter_info::RuntimeFilterInfo;
use crate::runtime_filter_info::RuntimeFilterReady;
use crate::statistics::data_cache_statistics::DataCacheMetrics;
use crate::table::Table;

Expand Down Expand Up @@ -317,6 +318,14 @@ pub trait TableContext: Send + Sync {

fn set_runtime_filter(&self, filters: (usize, RuntimeFilterInfo));

fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc<RuntimeFilterReady>);

fn get_runtime_filter_ready(&self, table_index: usize) -> Vec<Arc<RuntimeFilterReady>>;

fn set_wait_runtime_filter(&self, table_index: usize, need_to_wait: bool);

fn get_wait_runtime_filter(&self, table_index: usize) -> bool;

fn clear_runtime_filter(&self);

fn set_merge_into_join(&self, join: MergeIntoJoin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl CopyIntoTableInterpreter {
(
CopyIntoTableSource::Stage(Box::new(PhysicalPlan::TableScan(TableScan {
plan_id: 0,
scan_id: 0,
name_mapping,
stat_info: None,
table_index: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl PipelineBuilder {
}

pub(crate) fn build_aggregate_partial(&mut self, aggregate: &AggregatePartial) -> Result<()> {
self.contain_sink_processor = true;
self.build_pipeline(&aggregate.input)?;

let max_block_size = self.settings.get_max_block_size()?;
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl PipelineBuilder {
join_state.clone(),
output_len,
)?;
build_state.add_runtime_filter_ready();

let create_sink_processor = |input| {
Ok(ProcessorPtr::create(TransformHashJoinBuild::try_create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl PipelineBuilder {
update_stream_columns: table.change_tracking_enabled(),
data_mask_policy: None,
table_index: usize::MAX,
scan_id: usize::MAX,
};

{
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ impl PipelineBuilder {
pub(crate) fn build_table_scan(&mut self, scan: &TableScan) -> Result<()> {
let table = self.ctx.build_table_from_source_plan(&scan.source)?;
self.ctx.set_partitions(scan.source.parts.clone())?;
self.ctx
.set_wait_runtime_filter(scan.scan_id, self.contain_sink_processor);
table.read_data(
self.ctx.clone(),
&scan.source,
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct PipelineBuilder {

pub r_cte_scan_interpreters: Vec<CreateTableInterpreter>,
pub(crate) is_exchange_neighbor: bool,

pub contain_sink_processor: bool,
}

impl PipelineBuilder {
Expand All @@ -83,6 +85,7 @@ impl PipelineBuilder {
hash_join_states: HashMap::new(),
r_cte_scan_interpreters: vec![],
is_exchange_neighbor: false,
contain_sink_processor: false,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
use databend_common_expression::types::DataType;
use databend_common_expression::ColumnVec;
use databend_common_expression::DataBlock;
Expand All @@ -23,6 +26,7 @@ pub struct BuildState {
pub(crate) outer_scan_map: Vec<Vec<bool>>,
/// LeftMarkScan map, initialized at `HashJoinBuildState`, used in `HashJoinProbeState`
pub(crate) mark_scan_map: Vec<Vec<u8>>,
pub(crate) runtime_filter_ready: Vec<Arc<RuntimeFilterReady>>,
}

impl BuildState {
Expand All @@ -31,6 +35,7 @@ impl BuildState {
generation_state: BuildBlockGenerationState::new(),
outer_scan_map: Vec::new(),
mark_scan_map: Vec::new(),
runtime_filter_ready: Vec::new(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_base::base::tokio::sync::Barrier;
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -331,6 +332,7 @@ impl HashJoinBuildState {
.build_watcher
.send(HashTableType::Empty)
.map_err(|_| ErrorCode::TokioError("build_watcher channel is closed"))?;
self.set_bloom_filter_ready(false)?;
return Ok(());
}

Expand All @@ -348,6 +350,8 @@ impl HashJoinBuildState {
// If spilling happened, skip adding runtime filter, because probe data is ready and spilled.
if self.hash_join_state.spilled_partitions.read().is_empty() {
self.add_runtime_filter(&build_chunks, build_num_rows)?;
} else {
self.set_bloom_filter_ready(false)?;
}

// Divide the finalize phase into multiple tasks.
Expand Down Expand Up @@ -848,7 +852,52 @@ impl HashJoinBuildState {
Ok(())
}

pub fn add_runtime_filter_ready(&self) {
if self.ctx.get_cluster().is_empty() {
return;
}

let mut wait_runtime_filter_table_indexes = HashSet::new();
for (build_key, probe_key, table_index) in self
.hash_join_state
.hash_join_desc
.build_keys
.iter()
.zip(self.hash_join_state.hash_join_desc.probe_keys_rt.iter())
.filter_map(|(b, p)| p.as_ref().map(|(p, index)| (b, p, index)))
{
if !build_key.data_type().remove_nullable().is_numeric()
&& !build_key.data_type().remove_nullable().is_string()
{
continue;
}
if let Expr::ColumnRef { .. } = probe_key {
wait_runtime_filter_table_indexes.insert(*table_index);
}
}

let build_state = unsafe { &mut *self.hash_join_state.build_state.get() };
let runtime_filter_ready = &mut build_state.runtime_filter_ready;
for table_index in wait_runtime_filter_table_indexes.into_iter() {
let ready = Arc::new(RuntimeFilterReady::default());
runtime_filter_ready.push(ready.clone());
self.ctx.set_runtime_filter_ready(table_index, ready);
}
}

pub fn set_bloom_filter_ready(&self, ready: bool) -> Result<()> {
let build_state = unsafe { &mut *self.hash_join_state.build_state.get() };
for runtime_filter_ready in build_state.runtime_filter_ready.iter() {
runtime_filter_ready
.runtime_filter_watcher
.send(Some(ready))
.map_err(|_| ErrorCode::TokioError("watcher channel is closed"))?;
}
Ok(())
}

fn add_runtime_filter(&self, build_chunks: &[DataBlock], build_num_rows: usize) -> Result<()> {
let mut bloom_filter_ready = false;
for (build_key, probe_key, table_index) in self
.hash_join_state
.hash_join_desc
Expand Down Expand Up @@ -879,9 +928,11 @@ impl HashJoinBuildState {
)?;
}
if !runtime_filter.is_empty() {
bloom_filter_ready |= !runtime_filter.is_blooms_empty();
self.ctx.set_runtime_filter((*table_index, runtime_filter));
}
}
self.set_bloom_filter_ready(bloom_filter_ready)?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ impl PhysicalPlanReplacer for ReplaceReadSource {

Ok(PhysicalPlan::TableScan(TableScan {
plan_id: plan.plan_id,
scan_id: plan.scan_id,
source: Box::new(source),
name_mapping: plan.name_mapping.clone(),
table_index: plan.table_index,
Expand Down
34 changes: 34 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use databend_common_catalog::plan::Partitions;
use databend_common_catalog::plan::StageTableInfo;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
use databend_common_catalog::table_args::TableArgs;
use databend_common_catalog::table_context::ContextError;
Expand Down Expand Up @@ -1177,6 +1178,39 @@ impl TableContext for QueryContext {
}
}

fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc<RuntimeFilterReady>) {
let mut runtime_filter_ready = self.shared.runtime_filter_ready.write();
match runtime_filter_ready.entry(table_index) {
Entry::Vacant(v) => {
v.insert(vec![ready]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(ready);
}
}
}

fn get_runtime_filter_ready(&self, table_index: usize) -> Vec<Arc<RuntimeFilterReady>> {
let runtime_filter_ready = self.shared.runtime_filter_ready.read();
match runtime_filter_ready.get(&table_index) {
Some(v) => v.to_vec(),
None => vec![],
}
}

fn set_wait_runtime_filter(&self, table_index: usize, need_to_wait: bool) {
let mut wait_runtime_filter = self.shared.wait_runtime_filter.write();
wait_runtime_filter.insert(table_index, need_to_wait);
}

fn get_wait_runtime_filter(&self, table_index: usize) -> bool {
let wait_runtime_filter = self.shared.wait_runtime_filter.read();
match wait_runtime_filter.get(&table_index) {
Some(v) => *v,
None => false,
}
}

fn get_merge_into_join(&self) -> MergeIntoJoin {
let merge_into_join = self.shared.merge_into_join.read();
MergeIntoJoin {
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_catalog::catalog::CatalogManager;
use databend_common_catalog::merge_into_join::MergeIntoJoin;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
use databend_common_catalog::table_context::ContextError;
use databend_common_catalog::table_context::MaterializedCtesBlocks;
Expand Down Expand Up @@ -132,6 +133,11 @@ pub struct QueryContextShared {

pub(in crate::sessions) runtime_filters: Arc<RwLock<HashMap<IndexType, RuntimeFilterInfo>>>,

pub(in crate::sessions) runtime_filter_ready:
Arc<RwLock<HashMap<IndexType, Vec<Arc<RuntimeFilterReady>>>>>,

pub(in crate::sessions) wait_runtime_filter: Arc<RwLock<HashMap<IndexType, bool>>>,

pub(in crate::sessions) merge_into_join: Arc<RwLock<MergeIntoJoin>>,

// Records query level data cache metrics
Expand Down Expand Up @@ -189,6 +195,8 @@ impl QueryContextShared {
query_cache_metrics: DataCacheMetrics::new(),
query_profiles: Arc::new(RwLock::new(HashMap::new())),
runtime_filters: Default::default(),
runtime_filter_ready: Default::default(),
wait_runtime_filter: Default::default(),
merge_into_join: Default::default(),
multi_table_insert_status: Default::default(),
query_queued_duration: Arc::new(RwLock::new(Duration::from_secs(0))),
Expand Down
17 changes: 17 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::ContextError;
Expand Down Expand Up @@ -928,6 +929,22 @@ impl TableContext for CtxDelegation {
todo!()
}

fn set_runtime_filter_ready(&self, _table_index: usize, _ready: Arc<RuntimeFilterReady>) {
todo!()
}

fn get_runtime_filter_ready(&self, _table_index: usize) -> Vec<Arc<RuntimeFilterReady>> {
todo!()
}

fn set_wait_runtime_filter(&self, _table_index: usize, _need_to_wait: bool) {
todo!()
}

fn get_wait_runtime_filter(&self, _table_index: usize) -> bool {
todo!()
}

fn clear_runtime_filter(&self) {
todo!()
}
Expand Down
17 changes: 17 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::plan::Partitions;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::runtime_filter_info::RuntimeFilterInfo;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
use databend_common_catalog::statistics::data_cache_statistics::DataCacheMetrics;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::ContextError;
Expand Down Expand Up @@ -808,6 +809,22 @@ impl TableContext for CtxDelegation {
todo!()
}

fn set_runtime_filter_ready(&self, _table_index: usize, _ready: Arc<RuntimeFilterReady>) {
todo!()
}

fn get_runtime_filter_ready(&self, _table_index: usize) -> Vec<Arc<RuntimeFilterReady>> {
todo!()
}

fn set_wait_runtime_filter(&self, _table_index: usize, _need_to_wait: bool) {
todo!()
}

fn get_wait_runtime_filter(&self, _table_index: usize) -> bool {
todo!()
}

fn clear_runtime_filter(&self) {
todo!()
}
Expand Down
Loading

0 comments on commit ed6a0c7

Please sign in to comment.