Skip to content

Commit

Permalink
[Bug](top-n) do not update topn filter when sort node and scan node a…
Browse files Browse the repository at this point in the history
…re not in the… (apache#32159)
  • Loading branch information
BiteTheDDDDt authored Mar 13, 2024
1 parent b4f4710 commit 99680e9
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 16 deletions.
2 changes: 0 additions & 2 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,6 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
for (int id : read_params.topn_filter_source_node_ids) {
auto& runtime_predicate =
read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id);
DCHECK(runtime_predicate.inited())
<< "runtime predicate not inited, source_node_id=" << id;
runtime_predicate.set_tablet_schema(_tablet_schema);
}
}
Expand Down
14 changes: 8 additions & 6 deletions be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,15 @@ Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in
local_state._mem_tracker->set_consumption(local_state._shared_state->sorter->data_size());
RETURN_IF_CANCELLED(state);

// update runtime predicate
if (_use_topn_opt) {
vectorized::Field new_top = local_state._shared_state->sorter->get_top_value();
if (!new_top.is_null() && new_top != local_state.old_top) {
auto* query_ctx = state->get_query_ctx();
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top));
local_state.old_top = std::move(new_top);
auto& predicate = state->get_query_ctx()->get_runtime_predicate(_node_id);
if (predicate.need_update()) {
vectorized::Field new_top = local_state._shared_state->sorter->get_top_value();
if (!new_top.is_null() && new_top != local_state.old_top) {
auto* query_ctx = state->get_query_ctx();
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_node_id).update(new_top));
local_state.old_top = std::move(new_top);
}
}
}
if (!_reuse_mem) {
Expand Down
10 changes: 8 additions & 2 deletions be/src/runtime/runtime_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ class RuntimePredicate {

Status init(PrimitiveType type, bool nulls_first, bool is_asc, const std::string& col_name);

bool inited() {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
bool inited() const {
std::shared_lock<std::shared_mutex> rlock(_rwlock);
return _inited;
}

bool need_update() const {
std::shared_lock<std::shared_mutex> rlock(_rwlock);
return _inited && _tablet_schema;
}

void set_tablet_schema(TabletSchemaSPtr tablet_schema) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
// when sort node and scan node are not in the same backend, predicate will not be initialized
if (_tablet_schema || !_inited) {
return;
}
Expand Down
14 changes: 8 additions & 6 deletions be/src/vec/exec/vsort_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,15 @@ Status VSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool
RETURN_IF_ERROR(_sorter->append_block(input_block));
RETURN_IF_CANCELLED(state);

// update runtime predicate
if (_use_topn_opt) {
Field new_top = _sorter->get_top_value();
if (!new_top.is_null() && new_top != old_top) {
auto* query_ctx = state->get_query_ctx();
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top));
old_top = std::move(new_top);
auto& predicate = state->get_query_ctx()->get_runtime_predicate(_id);
if (predicate.need_update()) {
vectorized::Field new_top = _sorter->get_top_value();
if (!new_top.is_null() && new_top != old_top) {
auto* query_ctx = state->get_query_ctx();
RETURN_IF_ERROR(query_ctx->get_runtime_predicate(_id).update(new_top));
old_top = std::move(new_top);
}
}
}
if (!_reuse_mem) {
Expand Down

0 comments on commit 99680e9

Please sign in to comment.