diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h index 04a798d373c0e4..c5b0e6f67ddbcd 100644 --- a/be/src/olap/column_predicate.h +++ b/be/src/olap/column_predicate.h @@ -158,7 +158,7 @@ struct PredicateTypeTraits { } \ } -class ColumnPredicate { +class ColumnPredicate : public std::enable_shared_from_this { public: explicit ColumnPredicate(uint32_t column_id, PrimitiveType primitive_type, bool opposite = false) diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index e5a9b8a9d7f1d1..0b2f64ee4f31ef 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -275,39 +275,6 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o } } - if (!read_options.topn_filter_source_node_ids.empty()) { - auto* query_ctx = read_options.runtime_state->get_query_ctx(); - for (int id : read_options.topn_filter_source_node_ids) { - auto runtime_predicate = query_ctx->get_runtime_predicate(id).get_predicate( - read_options.topn_filter_target_node_id); - - AndBlockColumnPredicate and_predicate; - and_predicate.add_column_predicate( - SingleColumnBlockPredicate::create_unique(runtime_predicate)); - std::shared_ptr reader; - Status st = get_column_reader( - read_options.tablet_schema->column(runtime_predicate->column_id()), &reader, - read_options.stats); - if (st.is()) { - continue; - } - RETURN_IF_ERROR(st); - DCHECK(reader != nullptr); - if (can_apply_predicate_safely(runtime_predicate->column_id(), *schema, - read_options.target_cast_type_for_variants, - read_options)) { - bool matched = true; - RETURN_IF_ERROR(reader->match_condition(&and_predicate, &matched)); - if (!matched) { - // any condition not satisfied, return. - *iter = std::make_unique(*schema); - read_options.stats->filtered_segment_number++; - return Status::OK(); - } - } - } - } - { SCOPED_RAW_TIMER(&read_options.stats->segment_load_index_timer_ns); RETURN_IF_ERROR(load_index(read_options.stats)); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index e67d0ce86a47d3..9197c5a0a7b524 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -180,6 +180,7 @@ class Segment : public std::enable_shared_from_this, public MetadataAdd const std::map& target_cast_type_for_variants, const StorageReadOptions& read_options) { const doris::Field* col = schema.column(cid); + DCHECK(col != nullptr) << "Column not found in schema for cid=" << cid; vectorized::DataTypePtr storage_column_type = get_data_type_of(col->get_desc(), read_options); if (storage_column_type == nullptr || col->type() != FieldType::OLAP_FIELD_TYPE_VARIANT || diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 4616235cba6fbd..3e52a9815abd36 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -946,31 +946,6 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges, condition_row_ranges); - if (!_opts.topn_filter_source_node_ids.empty()) { - auto* query_ctx = _opts.runtime_state->get_query_ctx(); - for (int id : _opts.topn_filter_source_node_ids) { - std::shared_ptr runtime_predicate = - query_ctx->get_runtime_predicate(id).get_predicate( - _opts.topn_filter_target_node_id); - if (_segment->can_apply_predicate_safely(runtime_predicate->column_id(), *_schema, - _opts.target_cast_type_for_variants, - _opts)) { - AndBlockColumnPredicate and_predicate; - and_predicate.add_column_predicate( - SingleColumnBlockPredicate::create_unique(runtime_predicate)); - - RowRanges column_rp_row_ranges = RowRanges::create_single(num_rows()); - RETURN_IF_ERROR(_column_iterators[runtime_predicate->column_id()] - ->get_row_ranges_by_zone_map(&and_predicate, nullptr, - &column_rp_row_ranges)); - - // intersect different columns's row ranges to get final row ranges by zone map - RowRanges::ranges_intersection(zone_map_row_ranges, column_rp_row_ranges, - &zone_map_row_ranges); - } - } - } - size_t pre_size2 = condition_row_ranges->count(); RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges, condition_row_ranges); @@ -1688,26 +1663,6 @@ Status SegmentIterator::_vec_init_lazy_materialization() { } } - // add runtime predicate to _col_predicates - // should NOT add for order by key, - // since key is already sorted and topn_next only need first N rows from each segment, - // but runtime predicate will filter some rows and read more than N rows. - // should add add for order by none-key column, since none-key column is not sorted and - // all rows should be read, so runtime predicate will reduce rows for topn node - if (!_opts.topn_filter_source_node_ids.empty() && - (_opts.read_orderby_key_columns == nullptr || _opts.read_orderby_key_columns->empty())) { - for (int id : _opts.topn_filter_source_node_ids) { - auto& runtime_predicate = - _opts.runtime_state->get_query_ctx()->get_runtime_predicate(id); - _col_predicates.push_back( - runtime_predicate.get_predicate(_opts.topn_filter_target_node_id)); - VLOG_DEBUG << fmt::format( - "After appending topn filter to col_predicates, " - "col_predicates size: {}, col_predicate: {}", - _col_predicates.size(), _col_predicates.back()->debug_string()); - } - } - // Step1: extract columns that can be lazy materialization if (!_col_predicates.empty() || !del_cond_id_set.empty()) { std::set short_cir_pred_col_id_set; // using set for distinct cid diff --git a/be/src/olap/shared_predicate.h b/be/src/olap/shared_predicate.h index 2b0c32c8246450..da6b131db97e0d 100644 --- a/be/src/olap/shared_predicate.h +++ b/be/src/olap/shared_predicate.h @@ -54,8 +54,10 @@ class SharedPredicate final : public ColumnPredicate { ColumnPredicate::debug_string(), _nested ? _nested->debug_string() : "null"); return fmt::to_string(debug_string_buffer); } + void set_column_id(uint32_t column_id) { _column_id = column_id; } std::shared_ptr clone(uint32_t column_id) const override { - return SharedPredicate::create_shared(*this, column_id); + // All scanner thread should share the same SharedPredicate object. + return std::const_pointer_cast(shared_from_this()); } PredicateType type() const override { diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 210a725a8bf060..ed7da700d48929 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -79,6 +79,7 @@ class OlapScanLocalState final : public ScanLocalState { PushDownType& pdt) override; PushDownType _should_push_down_bloom_filter() override { return PushDownType::ACCEPTABLE; } + PushDownType _should_push_down_topn_filter() override { return PushDownType::ACCEPTABLE; } PushDownType _should_push_down_bitmap_filter() override { return PushDownType::ACCEPTABLE; } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 4b936c9237e7a3..d3ea9f1ee193a7 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -325,6 +325,19 @@ Status ScanLocalState::_normalize_predicate(vectorized::VExprContext* c _parent->cast()._slot_id_to_slot_desc[slot->slot_id()]; return _is_predicate_acting_on_slot(slot, range); }; + auto topn_predicate_checker = [&](const vectorized::VExprSPtrs& children, + SlotDescriptor** slot_desc, ColumnValueRangeType** range) { + if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) { + // not a slot ref(column) + return false; + } + std::shared_ptr slot = + std::dynamic_pointer_cast(children[0]); + CHECK(slot != nullptr); + *slot_desc = + _parent->cast()._slot_id_to_slot_desc[slot->slot_id()]; + return _is_predicate_acting_on_slot(slot, range); + }; if (expr_root != nullptr) { if (is_leaf(expr_root)) { @@ -353,7 +366,8 @@ Status ScanLocalState::_normalize_predicate(vectorized::VExprContext* c vectorized::VExpr::expr_without_cast(child)); } if (in_predicate_checker(expr_root->children(), &slot, &range) || - eq_predicate_checker(expr_root->children(), &slot, &range)) { + eq_predicate_checker(expr_root->children(), &slot, &range) || + topn_predicate_checker(expr_root->children(), &slot, &range)) { Status status = Status::OK(); std::visit( [&](auto& value_range) { @@ -403,7 +417,14 @@ Status ScanLocalState::_normalize_predicate(vectorized::VExprContext* c context, slot, _slot_id_to_predicates[slot->id()], &pdt), status); - + RETURN_IF_PUSH_DOWN(_normalize_bloom_filter( + context, slot, + _slot_id_to_predicates[slot->id()], &pdt), + status); + RETURN_IF_PUSH_DOWN(_normalize_topn_filter( + context, slot, + _slot_id_to_predicates[slot->id()], &pdt), + status); if (state()->enable_function_pushdown()) { RETURN_IF_PUSH_DOWN( _normalize_function_filters(context, slot, &pdt), status); @@ -465,6 +486,26 @@ Status ScanLocalState::_normalize_bloom_filter( return Status::OK(); } +template +Status ScanLocalState::_normalize_topn_filter( + vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, + std::vector>& predicates, PushDownType* pdt) { + auto expr = expr_ctx->root()->is_rf_wrapper() ? expr_ctx->root()->get_impl() : expr_ctx->root(); + if (expr->is_topn_filter()) { + PushDownType temp_pdt = _should_push_down_topn_filter(); + if (temp_pdt != PushDownType::UNACCEPTABLE) { + auto& p = _parent->cast(); + auto& pred = _state->get_query_ctx()->get_runtime_predicate( + assert_cast(expr.get())->source_node_id()); + if (_push_down_topn(pred)) { + predicates.emplace_back(pred.get_predicate(p.node_id())); + *pdt = temp_pdt; + } + } + } + return Status::OK(); +} + template Status ScanLocalState::_normalize_bitmap_filter( vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, @@ -1211,6 +1252,18 @@ Status ScanLocalState::_get_topn_filters(RuntimeState* state) { RETURN_IF_ERROR(conjunct->open(state)); _conjuncts.emplace_back(conjunct); } + for (auto id : get_topn_filter_source_node_ids(state, true)) { + const auto& pred = state->get_query_ctx()->get_runtime_predicate(id); + vectorized::VExprSPtr topn_pred; + RETURN_IF_ERROR(vectorized::VTopNPred::create_vtopn_pred(pred.get_texpr(p.node_id()), id, + topn_pred)); + + vectorized::VExprContextSPtr conjunct = vectorized::VExprContext::create_shared(topn_pred); + RETURN_IF_ERROR(conjunct->prepare( + state, _parent->cast().row_descriptor())); + RETURN_IF_ERROR(conjunct->open(state)); + _conjuncts.emplace_back(conjunct); + } return Status::OK(); } @@ -1335,8 +1388,9 @@ Status ScanOperatorX::prepare(RuntimeState* state) { continue; } - state->get_query_ctx()->get_runtime_predicate(id).init_target(node_id(), - _slot_id_to_slot_desc); + RETURN_IF_ERROR(state->get_query_ctx()->get_runtime_predicate(id).init_target( + node_id(), _slot_id_to_slot_desc, + OperatorX::intermediate_row_desc())); } RETURN_IF_CANCELLED(state); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 0135fff01bcf95..f6d45bc3354d8e 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -217,6 +217,7 @@ class ScanLocalState : public ScanLocalStateBase { virtual bool _push_down_topn(const vectorized::RuntimePredicate& predicate) { return false; } virtual bool _is_key_column(const std::string& col_name) { return false; } virtual PushDownType _should_push_down_bloom_filter() { return PushDownType::UNACCEPTABLE; } + virtual PushDownType _should_push_down_topn_filter() { return PushDownType::UNACCEPTABLE; } virtual PushDownType _should_push_down_bitmap_filter() { return PushDownType::UNACCEPTABLE; } virtual PushDownType _should_push_down_is_null_predicate() { return PushDownType::UNACCEPTABLE; @@ -254,6 +255,9 @@ class ScanLocalState : public ScanLocalStateBase { Status _normalize_bloom_filter(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, std::vector>& predicates, PushDownType* pdt); + Status _normalize_topn_filter(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, + std::vector>& predicates, + PushDownType* pdt); Status _normalize_bitmap_filter(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot, std::vector>& predicates, diff --git a/be/src/runtime/runtime_predicate.cpp b/be/src/runtime/runtime_predicate.cpp index b0ce2b7f7d43b3..a35ac186067e20 100644 --- a/be/src/runtime/runtime_predicate.cpp +++ b/be/src/runtime/runtime_predicate.cpp @@ -55,16 +55,31 @@ RuntimePredicate::RuntimePredicate(const TTopnFilterDesc& desc) : create_comparison_predicate0; } -void RuntimePredicate::init_target( - int32_t target_node_id, phmap::flat_hash_map slot_id_to_slot_desc) { +Status RuntimePredicate::init_target( + int32_t target_node_id, phmap::flat_hash_map slot_id_to_slot_desc, + const doris::RowDescriptor& desc) { std::unique_lock wlock(_rwlock); check_target_node_id(target_node_id); if (target_is_slot(target_node_id)) { _contexts[target_node_id].col_name = slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id] ->col_name(); + auto slot_id = get_texpr(target_node_id).nodes[0].slot_ref.slot_id; + auto column_id = desc.get_column_id(slot_id); + if (column_id < 0) { + return Status::Error( + "RuntimePredicate has invalid slot id: {}, name: {}, desc: {}, slot_desc: {}", + slot_id, + slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id] + ->col_name(), + desc.debug_string(), + slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id] + ->debug_string()); + } + _contexts[target_node_id].predicate = SharedPredicate::create_shared(column_id); } _detected_target = true; + return Status::OK(); } StringRef RuntimePredicate::_get_string_ref(const Field& field, const PrimitiveType type) { diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index adf90e9095a481..34ada1ad53cb52 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -44,8 +44,9 @@ class RuntimePredicate { public: RuntimePredicate(const TTopnFilterDesc& desc); - void init_target(int32_t target_node_id, - phmap::flat_hash_map slot_id_to_slot_desc); + Status init_target(int32_t target_node_id, + phmap::flat_hash_map slot_id_to_slot_desc, + const doris::RowDescriptor& desc); bool enable() const { // when sort node and scan node are not in the same fragment, predicate will be disabled @@ -66,9 +67,10 @@ class RuntimePredicate { } RETURN_IF_ERROR(tablet_schema->have_column(_contexts[target_node_id].col_name)); _contexts[target_node_id].tablet_schema = tablet_schema; - int64_t index = DORIS_TRY(_contexts[target_node_id].get_field_index()) - _contexts[target_node_id] - .predicate = SharedPredicate::create_shared(index); + int64_t index = DORIS_TRY(_contexts[target_node_id].get_field_index()); + DCHECK(_contexts[target_node_id].predicate != nullptr); + assert_cast(_contexts[target_node_id].predicate.get()) + ->set_column_id(cast_set(index)); return Status::OK(); } @@ -130,6 +132,7 @@ class RuntimePredicate { struct TargetContext { TExpr expr; std::string col_name; + // TODO(gabriel): remove this TabletSchemaSPtr tablet_schema; std::shared_ptr predicate; diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 1356350ad5788a..b33ac7aa1af0c0 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -209,6 +209,7 @@ class VExpr { return std::ranges::any_of(_children.begin(), _children.end(), [](VExprSPtr child) { return child->is_rf_wrapper(); }); } + virtual bool is_topn_filter() const { return false; } virtual void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows) { for (auto child : _children) { diff --git a/be/src/vec/exprs/vtopn_pred.h b/be/src/vec/exprs/vtopn_pred.h index 7b94672ebb4216..d7c7ea9f90aabd 100644 --- a/be/src/vec/exprs/vtopn_pred.h +++ b/be/src/vec/exprs/vtopn_pred.h @@ -45,6 +45,7 @@ class VTopNPred : public VExpr { _source_node_id(source_node_id), _expr_name(fmt::format("VTopNPred(source_node_id={})", _source_node_id)), _target_ctx(std::move(target_ctx)) {} + bool is_topn_filter() const override { return true; } static Status create_vtopn_pred(const TExpr& target_expr, int source_node_id, vectorized::VExprSPtr& expr) { @@ -63,6 +64,8 @@ class VTopNPred : public VExpr { return Status::OK(); } + int source_node_id() const { return _source_node_id; } + Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override { _predicate = &state->get_query_ctx()->get_runtime_predicate(_source_node_id); RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));