diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index e5b12248bbb079..924db8a72973ca 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -71,7 +71,7 @@ Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState* state) } Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) { - return _acquire_runtime_filter(state); + return _acquire_runtime_filter(); } bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() { diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index e2b4c525acf2c8..14ee165684ef16 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -132,7 +132,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc } IRuntimeFilter* filter; RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, - &options, RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly)); _consumer_map[key].emplace_back(node_id, filter); diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index ed74d7ecd0568e..4f51d99ccccea5 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -76,20 +76,19 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() { return true; } -Status RuntimeFilterConsumer::_acquire_runtime_filter(bool wait) { +Status RuntimeFilterConsumer::_acquire_runtime_filter() { SCOPED_TIMER(_acquire_runtime_filter_timer); VExprSPtrs vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; bool ready = runtime_filter->is_ready(); - if (!ready && wait) { + if (!ready) { ready = runtime_filter->await(); } if (ready && !_runtime_filter_ctxs[i].apply_mark) { RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs)); _runtime_filter_ctxs[i].apply_mark = true; - } else if ((wait || !runtime_filter->is_ready_or_timeout()) && - runtime_filter->current_state() == RuntimeFilterState::NOT_READY && + } else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY && !_runtime_filter_ctxs[i].apply_mark) { _blocked_by_rf = true; } else if (!_runtime_filter_ctxs[i].apply_mark) { diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index 18e92abc909d64..c938e8510b580c 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -41,7 +41,7 @@ class RuntimeFilterConsumer { // Register and get all runtime filters at Init phase. Status _register_runtime_filter(); // Get all arrived runtime filters at Open phase. - Status _acquire_runtime_filter(bool wait = true); + Status _acquire_runtime_filter(); // Append late-arrival runtime filters to the vconjunct_ctx. Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);