Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState* state)
}

Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
return _acquire_runtime_filter(state);
return _acquire_runtime_filter();
}

bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
}
IRuntimeFilter* filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc,

&options, RuntimeFilterRole::CONSUMER, node_id,
&filter, build_bf_exactly));
_consumer_map[key].emplace_back(node_id, filter);
Expand Down
7 changes: 3 additions & 4 deletions be/src/vec/exec/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,19 @@ bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
return true;
}

Status RuntimeFilterConsumer::_acquire_runtime_filter(bool wait) {
Status RuntimeFilterConsumer::_acquire_runtime_filter() {
SCOPED_TIMER(_acquire_runtime_filter_timer);
VExprSPtrs vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
bool ready = runtime_filter->is_ready();
if (!ready && wait) {
if (!ready) {
ready = runtime_filter->await();
}
if (ready && !_runtime_filter_ctxs[i].apply_mark) {
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
_runtime_filter_ctxs[i].apply_mark = true;
} else if ((wait || !runtime_filter->is_ready_or_timeout()) &&
runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
} else if (runtime_filter->current_state() == RuntimeFilterState::NOT_READY &&
!_runtime_filter_ctxs[i].apply_mark) {
_blocked_by_rf = true;
} else if (!_runtime_filter_ctxs[i].apply_mark) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/runtime_filter_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class RuntimeFilterConsumer {
// Register and get all runtime filters at Init phase.
Status _register_runtime_filter();
// Get all arrived runtime filters at Open phase.
Status _acquire_runtime_filter(bool wait = true);
Status _acquire_runtime_filter();
// Append late-arrival runtime filters to the vconjunct_ctx.
Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);

Expand Down