From 7de6c06a3b786117aa8f61b0e110d1a250a3e927 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 16 Dec 2025 16:26:07 +0800 Subject: [PATCH 1/4] [refactor](predicate) Initialize topN predicate with correct cid --- be/src/exec/olap_common.h | 44 +---------- be/src/olap/shared_predicate.h | 1 - be/src/olap/tablet_reader.cpp | 25 ------ be/src/olap/tablet_reader.h | 3 - be/src/pipeline/exec/file_scan_operator.h | 2 - be/src/pipeline/exec/olap_scan_operator.cpp | 15 ++++ be/src/pipeline/exec/olap_scan_operator.h | 13 +++ be/src/pipeline/exec/scan_operator.cpp | 88 +++++---------------- be/src/pipeline/exec/scan_operator.h | 23 +----- be/src/runtime/runtime_predicate.cpp | 14 +--- be/src/runtime/runtime_predicate.h | 5 +- be/src/vec/exec/scan/file_scanner.h | 2 - 12 files changed, 57 insertions(+), 178 deletions(-) diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h index eb2f543fca6081..ac6ed60848b8d2 100644 --- a/be/src/exec/olap_common.h +++ b/be/src/exec/olap_common.h @@ -207,27 +207,6 @@ class ColumnValueRange { _contain_null = _is_nullable_col && contain_null; } - void attach_profile_counter( - int runtime_filter_id, - std::shared_ptr predicate_filtered_rows_counter, - std::shared_ptr predicate_input_rows_counter, - std::shared_ptr predicate_always_true_rows_counter) { - DCHECK(predicate_filtered_rows_counter != nullptr); - DCHECK(predicate_input_rows_counter != nullptr); - - _runtime_filter_id = runtime_filter_id; - - if (predicate_filtered_rows_counter != nullptr) { - _predicate_filtered_rows_counter = predicate_filtered_rows_counter; - } - if (predicate_input_rows_counter != nullptr) { - _predicate_input_rows_counter = predicate_input_rows_counter; - } - if (predicate_always_true_rows_counter != nullptr) { - _predicate_always_true_rows_counter = predicate_always_true_rows_counter; - } - } - int precision() const { return _precision; } int scale() const { return _scale; } @@ -294,15 +273,6 @@ class ColumnValueRange { primitive_type == PrimitiveType::TYPE_DATETIMEV2 || primitive_type == PrimitiveType::TYPE_TIMESTAMPTZ || primitive_type == PrimitiveType::TYPE_DECIMAL256; - - int _runtime_filter_id = -1; - - std::shared_ptr _predicate_filtered_rows_counter = - std::make_shared(TUnit::UNIT, 0); - std::shared_ptr _predicate_input_rows_counter = - std::make_shared(TUnit::UNIT, 0); - std::shared_ptr _predicate_always_true_rows_counter = - std::make_shared(TUnit::UNIT, 0); }; template <> const typename ColumnValueRange::CppType ColumnValueRange::TYPE_MIN; @@ -315,12 +285,6 @@ const typename ColumnValueRange::CppType ColumnValueRange Status extend_scan_key(ColumnValueRange& range, int32_t max_scan_key_num, @@ -358,10 +322,10 @@ class OlapScanKeys { private: std::vector _begin_scan_keys; std::vector _end_scan_keys; - bool _has_range_value; - bool _begin_include; - bool _end_include; - bool _is_convertible; + bool _has_range_value = false; + bool _begin_include = false; + bool _end_include = false; + bool _is_convertible = false; }; using ColumnValueRangeType = std::variant< diff --git a/be/src/olap/shared_predicate.h b/be/src/olap/shared_predicate.h index da6b131db97e0d..c06591cc79c728 100644 --- a/be/src/olap/shared_predicate.h +++ b/be/src/olap/shared_predicate.h @@ -54,7 +54,6 @@ class SharedPredicate final : public ColumnPredicate { ColumnPredicate::debug_string(), _nested ? _nested->debug_string() : "null"); return fmt::to_string(debug_string_buffer); } - void set_column_id(uint32_t column_id) { _column_id = column_id; } std::shared_ptr clone(uint32_t column_id) const override { // All scanner thread should share the same SharedPredicate object. return std::const_pointer_cast(shared_from_this()); diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 258d1d97752c03..8cc909dd42f654 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -116,31 +116,6 @@ Status TabletReader::init(const ReaderParams& read_params) { return res; } -// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation -bool TabletReader::_optimize_for_single_rowset( - const std::vector& rs_readers) { - bool has_delete_rowset = false; - bool has_overlapping = false; - int nonoverlapping_count = 0; - for (const auto& rs_reader : rs_readers) { - if (rs_reader->rowset()->rowset_meta()->delete_flag()) { - has_delete_rowset = true; - break; - } - if (rs_reader->rowset()->rowset_meta()->num_rows() > 0) { - if (rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) { - // when there are overlapping segments, can not do directly read - has_overlapping = true; - break; - } else if (++nonoverlapping_count > 1) { - break; - } - } - } - - return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset; -} - Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { SCOPED_RAW_TIMER(&_stats.tablet_reader_capture_rs_readers_timer_ns); if (read_params.rs_splits.empty()) { diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index 0dad5424611a81..7e8c3630f510da 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -162,7 +162,6 @@ class TabletReader { std::vector* origin_return_columns = nullptr; std::unordered_set* tablet_columns_convert_to_null_set = nullptr; TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; - vectorized::VExpr* remaining_vconjunct_root = nullptr; std::vector remaining_conjunct_roots; vectorized::VExprContextSPtrs common_expr_ctxs_push_down; @@ -255,8 +254,6 @@ class TabletReader { Status _capture_rs_readers(const ReaderParams& read_params); - bool _optimize_for_single_rowset(const std::vector& rs_readers); - Status _init_keys_param(const ReaderParams& read_params); Status _init_orderby_keys_param(const ReaderParams& read_params); diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 12b303a02c9375..a2d834bb0d1bf2 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -83,8 +83,6 @@ class FileScanOperatorX final : public ScanOperatorX { Status prepare(RuntimeState* state) override; - bool is_file_scan_operator() const override { return true; } - // There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance. int parallelism(RuntimeState* state) const override { return _batch_split_mode ? 1 : ScanOperatorX::parallelism(state); diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 090e59fa5d0d04..7738c195474f64 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -961,6 +961,21 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i << ", sort_limit: " << _olap_scan_node.sort_limit << ", isset.sort_limit: " << _olap_scan_node.__isset.sort_limit; }) + + if (_olap_scan_node.__isset.columns_desc && !_olap_scan_node.columns_desc.empty() && + _olap_scan_node.columns_desc[0].col_unique_id >= 0) { + _tablet_schema = std::make_shared(); + _tablet_schema->clear_columns(); + for (const auto& column_desc : _olap_scan_node.columns_desc) { + _tablet_schema->append_column(TabletColumn(column_desc)); + } + if (_olap_scan_node.__isset.schema_version) { + _tablet_schema->set_schema_version(_olap_scan_node.schema_version); + } + if (_olap_scan_node.__isset.indexes_desc) { + _tablet_schema->update_indexes_from_thrift(_olap_scan_node.indexes_desc); + } + } } #include "common/compile_check_end.h" diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index ed7da700d48929..eb583d839618f0 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -93,6 +93,11 @@ class OlapScanLocalState final : public ScanLocalState { if (!predicate.target_is_slot(_parent->node_id())) { return false; } + if (!olap_scan_node().__isset.columns_desc || olap_scan_node().columns_desc.empty() || + olap_scan_node().columns_desc[0].col_unique_id < 0) { + // Disable topN filter if there is no schema info + return false; + } return _is_key_column(predicate.get_col_name(_parent->node_id())); } @@ -298,10 +303,18 @@ class OlapScanOperatorX final : public ScanOperatorX { const DescriptorTbl& descs, int parallel_tasks, const TQueryCacheParam& cache_param); + uint32_t _get_column_id(const std::string& col_name) const override { + if (!_tablet_schema) { + return -1; + } + const auto& column = *DORIS_TRY(_tablet_schema->column(col_name)); + return _tablet_schema->field_index(column.unique_id()); + } private: friend class OlapScanLocalState; TOlapScanNode _olap_scan_node; TQueryCacheParam _cache_param; + TabletSchemaSPtr _tablet_schema; }; #include "common/compile_check_end.h" diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 1a9d2ee9ee608c..7ea93008d49483 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -298,45 +298,6 @@ Status ScanLocalState::_normalize_predicate(vectorized::VExprContext* c vectorized::VExprSPtr& output_expr) { const auto expr_root = context->root(); static constexpr auto is_leaf = [](auto&& expr) { return !expr->is_and_expr(); }; - auto in_predicate_checker = [&](const vectorized::VExprSPtrs& children, - SlotDescriptor** slot_desc, ColumnValueRangeType** range) { - if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) { - // not a slot ref(column) - return false; - } - std::shared_ptr slot = - std::dynamic_pointer_cast(children[0]); - *slot_desc = - _parent->cast()._slot_id_to_slot_desc[slot->slot_id()]; - return _is_predicate_acting_on_slot(slot, range); - }; - auto eq_predicate_checker = [&](const vectorized::VExprSPtrs& children, - SlotDescriptor** slot_desc, ColumnValueRangeType** range) { - if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) { - // not a slot ref(column) - return false; - } - std::shared_ptr slot = - std::dynamic_pointer_cast(children[0]); - CHECK(slot != nullptr); - *slot_desc = - _parent->cast()._slot_id_to_slot_desc[slot->slot_id()]; - return _is_predicate_acting_on_slot(slot, range); - }; - auto topn_predicate_checker = [&](const vectorized::VExprSPtrs& children, - SlotDescriptor** slot_desc, ColumnValueRangeType** range) { - if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) { - // not a slot ref(column) - return false; - } - std::shared_ptr slot = - std::dynamic_pointer_cast(children[0]); - CHECK(slot != nullptr); - *slot_desc = - _parent->cast()._slot_id_to_slot_desc[slot->slot_id()]; - return _is_predicate_acting_on_slot(slot, range); - }; - if (expr_root != nullptr) { if (is_leaf(expr_root)) { if (dynamic_cast(expr_root.get())) { @@ -363,30 +324,10 @@ Status ScanLocalState::_normalize_predicate(vectorized::VExprContext* c slotref = std::dynamic_pointer_cast( vectorized::VExpr::expr_without_cast(child)); } - if (in_predicate_checker(expr_root->children(), &slot, &range) || - eq_predicate_checker(expr_root->children(), &slot, &range) || - topn_predicate_checker(expr_root->children(), &slot, &range)) { + if (_is_predicate_acting_on_slot(expr_root->children(), &slot, &range)) { Status status = Status::OK(); std::visit( [&](auto& value_range) { - bool need_set_runtime_filter_id = value_range.is_whole_value_range() && - expr_root->is_rf_wrapper(); - Defer set_runtime_filter_id {[&]() { - // rf predicates is always appended to the end of conjuncts. We need to ensure that there is no non-rf predicate after rf-predicate - // If it is not a whole range, it means that the column has other non-rf predicates, so it cannot be marked as rf predicate. - // If the range where non-rf predicates are located is incorrectly marked as rf, can_ignore will return true, resulting in the predicate not taking effect and getting an incorrect result. - if (need_set_runtime_filter_id) { - auto* rf_expr = assert_cast( - expr_root.get()); - DCHECK(rf_expr->predicate_filtered_rows_counter() != nullptr); - DCHECK(rf_expr->predicate_input_rows_counter() != nullptr); - value_range.attach_profile_counter( - rf_expr->filter_id(), - rf_expr->predicate_filtered_rows_counter(), - rf_expr->predicate_input_rows_counter(), - rf_expr->predicate_always_true_rows_counter()); - } - }}; RETURN_IF_PUSH_DOWN( _normalize_in_and_eq_predicate( context, slot, _slot_id_to_predicates[slot->id()], @@ -562,8 +503,17 @@ Status ScanLocalState::_normalize_function_filters(vectorized::VExprCon } template -bool ScanLocalState::_is_predicate_acting_on_slot( - const std::shared_ptr& slot_ref, ColumnValueRangeType** range) { +bool ScanLocalState::_is_predicate_acting_on_slot(const vectorized::VExprSPtrs& children, + SlotDescriptor** slot_desc, + ColumnValueRangeType** range) { + if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + return false; + } + std::shared_ptr slot_ref = + std::dynamic_pointer_cast(children[0]); + *slot_desc = + _parent->cast()._slot_id_to_slot_desc[slot_ref->slot_id()]; auto entry = _slot_id_to_predicates.find(slot_ref->slot_id()); if (_slot_id_to_predicates.end() == entry) { return false; @@ -1173,11 +1123,6 @@ TPushAggOp::type ScanLocalState::get_push_down_agg_type() { return _parent->cast()._push_down_agg_type; } -template -int64_t ScanLocalState::get_push_down_count() { - return _parent->cast()._push_down_count; -} - template int64_t ScanLocalState::limit_per_scanner() { return _parent->cast()._limit_per_scanner; @@ -1386,9 +1331,14 @@ Status ScanOperatorX::prepare(RuntimeState* state) { continue; } + auto col_name = _slot_id_to_slot_desc[state->get_query_ctx() + ->get_runtime_predicate(id) + .get_texpr(node_id()) + .nodes[0] + .slot_ref.slot_id] + ->col_name(); RETURN_IF_ERROR(state->get_query_ctx()->get_runtime_predicate(id).init_target( - node_id(), _slot_id_to_slot_desc, - OperatorX::intermediate_row_desc())); + node_id(), _slot_id_to_slot_desc, _get_column_id(col_name))); } RETURN_IF_CANCELLED(state); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index f6d45bc3354d8e..f9545c1ed7c0ed 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -53,16 +53,6 @@ enum class PushDownType { PARTIAL_ACCEPTABLE }; -struct FilterPredicates { - // Save all runtime filter predicates which may be pushed down to data source. - // column name -> bloom filter function - std::vector>> bloom_filters; - - std::vector>> bitmap_filters; - - std::vector>> in_filters; -}; - class ScanLocalStateBase : public PipelineXLocalState<> { public: ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) @@ -83,7 +73,6 @@ class ScanLocalStateBase : public PipelineXLocalState<> { const std::vector& scan_ranges) = 0; virtual TPushAggOp::type get_push_down_agg_type() = 0; - virtual int64_t get_push_down_count() = 0; // If scan operator is serial operator(like topn), its real parallelism is 1. // Otherwise, its real parallelism is query_parallel_instance_num. // query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num. @@ -122,7 +111,6 @@ class ScanLocalStateBase : public PipelineXLocalState<> { RuntimeProfile::Counter* _scan_cpu_timer = nullptr; // time of filter output block from scanner RuntimeProfile::Counter* _filter_timer = nullptr; - RuntimeProfile::Counter* _memory_usage_counter = nullptr; // rows read from the scanner (including those discarded by (pre)filters) RuntimeProfile::Counter* _rows_read_counter = nullptr; @@ -170,8 +158,6 @@ class ScanLocalState : public ScanLocalStateBase { TPushAggOp::type get_push_down_agg_type() override; - int64_t get_push_down_count() override; - std::vector execution_dependencies() override { if (_filter_dependencies.empty()) { return {}; @@ -266,8 +252,8 @@ class ScanLocalState : public ScanLocalStateBase { Status _normalize_function_filters(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, PushDownType* pdt); - bool _is_predicate_acting_on_slot(const std::shared_ptr& slot_ref, - ColumnValueRangeType** range); + bool _is_predicate_acting_on_slot(const vectorized::VExprSPtrs& children, + SlotDescriptor** slot_desc, ColumnValueRangeType** range); template Status _normalize_in_and_eq_predicate(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, @@ -357,14 +343,14 @@ class ScanOperatorX : public OperatorX { } [[nodiscard]] bool is_source() const override { return true; } - [[nodiscard]] virtual bool is_file_scan_operator() const { return false; } - [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override; const std::vector& runtime_filter_descs() override { return _runtime_filter_descs; } + [[nodiscard]] virtual uint32_t _get_column_id(const std::string& col_name) const { return -1; } + TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { @@ -383,7 +369,6 @@ class ScanOperatorX : public OperatorX { } } - int64_t get_push_down_count() const { return _push_down_count; } using OperatorX::node_id; using OperatorX::operator_id; using OperatorX::get_local_state; diff --git a/be/src/runtime/runtime_predicate.cpp b/be/src/runtime/runtime_predicate.cpp index aa14e49a532974..80dbecd7fa2ccc 100644 --- a/be/src/runtime/runtime_predicate.cpp +++ b/be/src/runtime/runtime_predicate.cpp @@ -57,25 +57,13 @@ RuntimePredicate::RuntimePredicate(const TTopnFilterDesc& desc) Status RuntimePredicate::init_target( int32_t target_node_id, phmap::flat_hash_map slot_id_to_slot_desc, - const doris::RowDescriptor& desc) { + const uint32_t column_id) { std::unique_lock wlock(_rwlock); check_target_node_id(target_node_id); if (target_is_slot(target_node_id)) { _contexts[target_node_id].col_name = slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id] ->col_name(); - auto slot_id = get_texpr(target_node_id).nodes[0].slot_ref.slot_id; - auto column_id = desc.get_column_id(slot_id); - if (column_id < 0) { - return Status::Error( - "RuntimePredicate has invalid slot id: {}, name: {}, desc: {}, slot_desc: {}", - slot_id, - slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id] - ->col_name(), - desc.debug_string(), - slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id] - ->debug_string()); - } _contexts[target_node_id].predicate = SharedPredicate::create_shared(column_id); } _detected_target = true; diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index 34ada1ad53cb52..b25456e4936c78 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -46,7 +46,7 @@ class RuntimePredicate { Status init_target(int32_t target_node_id, phmap::flat_hash_map slot_id_to_slot_desc, - const doris::RowDescriptor& desc); + const uint32_t column_id); bool enable() const { // when sort node and scan node are not in the same fragment, predicate will be disabled @@ -67,10 +67,7 @@ class RuntimePredicate { } RETURN_IF_ERROR(tablet_schema->have_column(_contexts[target_node_id].col_name)); _contexts[target_node_id].tablet_schema = tablet_schema; - int64_t index = DORIS_TRY(_contexts[target_node_id].get_field_index()); DCHECK(_contexts[target_node_id].predicate != nullptr); - assert_cast(_contexts[target_node_id].predicate.get()) - ->set_column_id(cast_set(index)); return Status::OK(); } diff --git a/be/src/vec/exec/scan/file_scanner.h b/be/src/vec/exec/scan/file_scanner.h index d26186eeef621b..f5e86dc0ee53cf 100644 --- a/be/src/vec/exec/scan/file_scanner.h +++ b/be/src/vec/exec/scan/file_scanner.h @@ -283,8 +283,6 @@ class FileScanner : public Scanner { : _local_state->get_push_down_agg_type(); } - int64_t _get_push_down_count() { return _local_state->get_push_down_count(); } - // enable the file meta cache only when // 1. max_external_file_meta_cache_num is > 0 // 2. the file number is less than 1/3 of cache's capacibility From 2bc3d40e3bc1645e788f9a76db8d2748d3660fb5 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 16 Dec 2025 17:46:55 +0800 Subject: [PATCH 2/4] update --- be/src/pipeline/exec/olap_scan_operator.h | 3 ++- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/pipeline/exec/scan_operator.h | 2 +- be/src/runtime/runtime_predicate.cpp | 3 +++ 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index eb583d839618f0..9944cc0bed970e 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -303,13 +303,14 @@ class OlapScanOperatorX final : public ScanOperatorX { const DescriptorTbl& descs, int parallel_tasks, const TQueryCacheParam& cache_param); - uint32_t _get_column_id(const std::string& col_name) const override { + uint32_t get_column_id(const std::string& col_name) const override { if (!_tablet_schema) { return -1; } const auto& column = *DORIS_TRY(_tablet_schema->column(col_name)); return _tablet_schema->field_index(column.unique_id()); } + private: friend class OlapScanLocalState; TOlapScanNode _olap_scan_node; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 7ea93008d49483..4658643eb8544f 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1338,7 +1338,7 @@ Status ScanOperatorX::prepare(RuntimeState* state) { .slot_ref.slot_id] ->col_name(); RETURN_IF_ERROR(state->get_query_ctx()->get_runtime_predicate(id).init_target( - node_id(), _slot_id_to_slot_desc, _get_column_id(col_name))); + node_id(), _slot_id_to_slot_desc, get_column_id(col_name))); } RETURN_IF_CANCELLED(state); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index f9545c1ed7c0ed..648e70063739e5 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -349,7 +349,7 @@ class ScanOperatorX : public OperatorX { return _runtime_filter_descs; } - [[nodiscard]] virtual uint32_t _get_column_id(const std::string& col_name) const { return -1; } + [[nodiscard]] virtual uint32_t get_column_id(const std::string& col_name) const { return -1; } TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } diff --git a/be/src/runtime/runtime_predicate.cpp b/be/src/runtime/runtime_predicate.cpp index 80dbecd7fa2ccc..1f987eae5976da 100644 --- a/be/src/runtime/runtime_predicate.cpp +++ b/be/src/runtime/runtime_predicate.cpp @@ -58,6 +58,9 @@ RuntimePredicate::RuntimePredicate(const TTopnFilterDesc& desc) Status RuntimePredicate::init_target( int32_t target_node_id, phmap::flat_hash_map slot_id_to_slot_desc, const uint32_t column_id) { + if (column_id < 0) { + return Status::OK(); + } std::unique_lock wlock(_rwlock); check_target_node_id(target_node_id); if (target_is_slot(target_node_id)) { From 2a1c277e3193ef9e2f72531d905cbe04e1688402 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 17 Dec 2025 10:22:22 +0800 Subject: [PATCH 3/4] update --- be/src/pipeline/exec/olap_scan_operator.h | 2 +- be/src/pipeline/exec/scan_operator.h | 2 +- be/src/runtime/runtime_predicate.cpp | 5 +++-- be/src/runtime/runtime_predicate.h | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 9944cc0bed970e..331091a36504c4 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -303,7 +303,7 @@ class OlapScanOperatorX final : public ScanOperatorX { const DescriptorTbl& descs, int parallel_tasks, const TQueryCacheParam& cache_param); - uint32_t get_column_id(const std::string& col_name) const override { + int get_column_id(const std::string& col_name) const override { if (!_tablet_schema) { return -1; } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 648e70063739e5..8a984cedc6cddd 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -349,7 +349,7 @@ class ScanOperatorX : public OperatorX { return _runtime_filter_descs; } - [[nodiscard]] virtual uint32_t get_column_id(const std::string& col_name) const { return -1; } + [[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; } TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } diff --git a/be/src/runtime/runtime_predicate.cpp b/be/src/runtime/runtime_predicate.cpp index 1f987eae5976da..e8ff7a284a7839 100644 --- a/be/src/runtime/runtime_predicate.cpp +++ b/be/src/runtime/runtime_predicate.cpp @@ -57,7 +57,7 @@ RuntimePredicate::RuntimePredicate(const TTopnFilterDesc& desc) Status RuntimePredicate::init_target( int32_t target_node_id, phmap::flat_hash_map slot_id_to_slot_desc, - const uint32_t column_id) { + const int column_id) { if (column_id < 0) { return Status::OK(); } @@ -67,7 +67,8 @@ Status RuntimePredicate::init_target( _contexts[target_node_id].col_name = slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id] ->col_name(); - _contexts[target_node_id].predicate = SharedPredicate::create_shared(column_id); + _contexts[target_node_id].predicate = + SharedPredicate::create_shared(cast_set(column_id)); } _detected_target = true; return Status::OK(); diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index b25456e4936c78..aa1e52522f8550 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -46,7 +46,7 @@ class RuntimePredicate { Status init_target(int32_t target_node_id, phmap::flat_hash_map slot_id_to_slot_desc, - const uint32_t column_id); + const int column_id); bool enable() const { // when sort node and scan node are not in the same fragment, predicate will be disabled From 293836a74862682e3bd3e27d1e665d3562c29e7d Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 17 Dec 2025 14:30:37 +0800 Subject: [PATCH 4/4] update --- be/src/pipeline/exec/scan_operator.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 4658643eb8544f..9c61e75d114719 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1331,14 +1331,19 @@ Status ScanOperatorX::prepare(RuntimeState* state) { continue; } - auto col_name = _slot_id_to_slot_desc[state->get_query_ctx() - ->get_runtime_predicate(id) - .get_texpr(node_id()) - .nodes[0] - .slot_ref.slot_id] - ->col_name(); + int cid = -1; + if (state->get_query_ctx()->get_runtime_predicate(id).target_is_slot(node_id())) { + auto s = _slot_id_to_slot_desc[state->get_query_ctx() + ->get_runtime_predicate(id) + .get_texpr(node_id()) + .nodes[0] + .slot_ref.slot_id]; + DCHECK(s != nullptr); + auto col_name = s->col_name(); + cid = get_column_id(col_name); + } RETURN_IF_ERROR(state->get_query_ctx()->get_runtime_predicate(id).init_target( - node_id(), _slot_id_to_slot_desc, get_column_id(col_name))); + node_id(), _slot_id_to_slot_desc, cid)); } RETURN_IF_CANCELLED(state);