Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/olap/column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ struct PredicateTypeTraits {
} \
}

class ColumnPredicate {
class ColumnPredicate : public std::enable_shared_from_this<ColumnPredicate> {
public:
explicit ColumnPredicate(uint32_t column_id, PrimitiveType primitive_type,
bool opposite = false)
Expand Down
33 changes: 0 additions & 33 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,39 +275,6 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o
}
}

if (!read_options.topn_filter_source_node_ids.empty()) {
auto* query_ctx = read_options.runtime_state->get_query_ctx();
for (int id : read_options.topn_filter_source_node_ids) {
auto runtime_predicate = query_ctx->get_runtime_predicate(id).get_predicate(
read_options.topn_filter_target_node_id);

AndBlockColumnPredicate and_predicate;
and_predicate.add_column_predicate(
SingleColumnBlockPredicate::create_unique(runtime_predicate));
std::shared_ptr<ColumnReader> reader;
Status st = get_column_reader(
read_options.tablet_schema->column(runtime_predicate->column_id()), &reader,
read_options.stats);
if (st.is<ErrorCode::NOT_FOUND>()) {
continue;
}
RETURN_IF_ERROR(st);
DCHECK(reader != nullptr);
if (can_apply_predicate_safely(runtime_predicate->column_id(), *schema,
read_options.target_cast_type_for_variants,
read_options)) {
bool matched = true;
RETURN_IF_ERROR(reader->match_condition(&and_predicate, &matched));
if (!matched) {
// any condition not satisfied, return.
*iter = std::make_unique<EmptySegmentIterator>(*schema);
read_options.stats->filtered_segment_number++;
return Status::OK();
}
}
}
}

{
SCOPED_RAW_TIMER(&read_options.stats->segment_load_index_timer_ns);
RETURN_IF_ERROR(load_index(read_options.stats));
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdd
const std::map<std::string, vectorized::DataTypePtr>& target_cast_type_for_variants,
const StorageReadOptions& read_options) {
const doris::Field* col = schema.column(cid);
DCHECK(col != nullptr) << "Column not found in schema for cid=" << cid;
vectorized::DataTypePtr storage_column_type =
get_data_type_of(col->get_desc(), read_options);
if (storage_column_type == nullptr || col->type() != FieldType::OLAP_FIELD_TYPE_VARIANT ||
Expand Down
45 changes: 0 additions & 45 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,31 +946,6 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
condition_row_ranges);

if (!_opts.topn_filter_source_node_ids.empty()) {
auto* query_ctx = _opts.runtime_state->get_query_ctx();
for (int id : _opts.topn_filter_source_node_ids) {
std::shared_ptr<doris::ColumnPredicate> runtime_predicate =
query_ctx->get_runtime_predicate(id).get_predicate(
_opts.topn_filter_target_node_id);
if (_segment->can_apply_predicate_safely(runtime_predicate->column_id(), *_schema,
_opts.target_cast_type_for_variants,
_opts)) {
AndBlockColumnPredicate and_predicate;
and_predicate.add_column_predicate(
SingleColumnBlockPredicate::create_unique(runtime_predicate));

RowRanges column_rp_row_ranges = RowRanges::create_single(num_rows());
RETURN_IF_ERROR(_column_iterators[runtime_predicate->column_id()]
->get_row_ranges_by_zone_map(&and_predicate, nullptr,
&column_rp_row_ranges));

// intersect different columns's row ranges to get final row ranges by zone map
RowRanges::ranges_intersection(zone_map_row_ranges, column_rp_row_ranges,
&zone_map_row_ranges);
}
}
}

size_t pre_size2 = condition_row_ranges->count();
RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
condition_row_ranges);
Expand Down Expand Up @@ -1688,26 +1663,6 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
}
}

// add runtime predicate to _col_predicates
// should NOT add for order by key,
// since key is already sorted and topn_next only need first N rows from each segment,
// but runtime predicate will filter some rows and read more than N rows.
// should add add for order by none-key column, since none-key column is not sorted and
// all rows should be read, so runtime predicate will reduce rows for topn node
if (!_opts.topn_filter_source_node_ids.empty() &&
(_opts.read_orderby_key_columns == nullptr || _opts.read_orderby_key_columns->empty())) {
for (int id : _opts.topn_filter_source_node_ids) {
auto& runtime_predicate =
_opts.runtime_state->get_query_ctx()->get_runtime_predicate(id);
_col_predicates.push_back(
runtime_predicate.get_predicate(_opts.topn_filter_target_node_id));
VLOG_DEBUG << fmt::format(
"After appending topn filter to col_predicates, "
"col_predicates size: {}, col_predicate: {}",
_col_predicates.size(), _col_predicates.back()->debug_string());
}
}

// Step1: extract columns that can be lazy materialization
if (!_col_predicates.empty() || !del_cond_id_set.empty()) {
std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/shared_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ class SharedPredicate final : public ColumnPredicate {
ColumnPredicate::debug_string(), _nested ? _nested->debug_string() : "null");
return fmt::to_string(debug_string_buffer);
}
void set_column_id(uint32_t column_id) { _column_id = column_id; }
std::shared_ptr<ColumnPredicate> clone(uint32_t column_id) const override {
return SharedPredicate::create_shared(*this, column_id);
// All scanner thread should share the same SharedPredicate object.
return std::const_pointer_cast<ColumnPredicate>(shared_from_this());
}

PredicateType type() const override {
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
PushDownType& pdt) override;

PushDownType _should_push_down_bloom_filter() override { return PushDownType::ACCEPTABLE; }
PushDownType _should_push_down_topn_filter() override { return PushDownType::ACCEPTABLE; }

PushDownType _should_push_down_bitmap_filter() override { return PushDownType::ACCEPTABLE; }

Expand Down
62 changes: 58 additions & 4 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,19 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c
_parent->cast<typename Derived::Parent>()._slot_id_to_slot_desc[slot->slot_id()];
return _is_predicate_acting_on_slot(slot, range);
};
auto topn_predicate_checker = [&](const vectorized::VExprSPtrs& children,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我们为什么在一个函数里,定义这么多lambda,而不是写多个函数呢?

SlotDescriptor** slot_desc, ColumnValueRangeType** range) {
if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
}
std::shared_ptr<vectorized::VSlotRef> slot =
std::dynamic_pointer_cast<vectorized::VSlotRef>(children[0]);
CHECK(slot != nullptr);
*slot_desc =
_parent->cast<typename Derived::Parent>()._slot_id_to_slot_desc[slot->slot_id()];
return _is_predicate_acting_on_slot(slot, range);
};

if (expr_root != nullptr) {
if (is_leaf(expr_root)) {
Expand Down Expand Up @@ -353,7 +366,8 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c
vectorized::VExpr::expr_without_cast(child));
}
if (in_predicate_checker(expr_root->children(), &slot, &range) ||
eq_predicate_checker(expr_root->children(), &slot, &range)) {
eq_predicate_checker(expr_root->children(), &slot, &range) ||
topn_predicate_checker(expr_root->children(), &slot, &range)) {
Status status = Status::OK();
std::visit(
[&](auto& value_range) {
Expand Down Expand Up @@ -403,7 +417,14 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c
context, slot,
_slot_id_to_predicates[slot->id()], &pdt),
status);

RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(
context, slot,
_slot_id_to_predicates[slot->id()], &pdt),
status);
RETURN_IF_PUSH_DOWN(_normalize_topn_filter(
context, slot,
_slot_id_to_predicates[slot->id()], &pdt),
status);
if (state()->enable_function_pushdown()) {
RETURN_IF_PUSH_DOWN(
_normalize_function_filters(context, slot, &pdt), status);
Expand Down Expand Up @@ -465,6 +486,26 @@ Status ScanLocalState<Derived>::_normalize_bloom_filter(
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_normalize_topn_filter(
vectorized::VExprContext* expr_ctx, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates, PushDownType* pdt) {
auto expr = expr_ctx->root()->is_rf_wrapper() ? expr_ctx->root()->get_impl() : expr_ctx->root();
if (expr->is_topn_filter()) {
PushDownType temp_pdt = _should_push_down_topn_filter();
if (temp_pdt != PushDownType::UNACCEPTABLE) {
auto& p = _parent->cast<typename Derived::Parent>();
auto& pred = _state->get_query_ctx()->get_runtime_predicate(
assert_cast<vectorized::VTopNPred*>(expr.get())->source_node_id());
if (_push_down_topn(pred)) {
predicates.emplace_back(pred.get_predicate(p.node_id()));
*pdt = temp_pdt;
}
}
}
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_normalize_bitmap_filter(
vectorized::VExprContext* expr_ctx, SlotDescriptor* slot,
Expand Down Expand Up @@ -1211,6 +1252,18 @@ Status ScanLocalState<Derived>::_get_topn_filters(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->open(state));
_conjuncts.emplace_back(conjunct);
}
for (auto id : get_topn_filter_source_node_ids(state, true)) {
const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
vectorized::VExprSPtr topn_pred;
RETURN_IF_ERROR(vectorized::VTopNPred::create_vtopn_pred(pred.get_texpr(p.node_id()), id,
topn_pred));

vectorized::VExprContextSPtr conjunct = vectorized::VExprContext::create_shared(topn_pred);
RETURN_IF_ERROR(conjunct->prepare(
state, _parent->cast<typename Derived::Parent>().row_descriptor()));
RETURN_IF_ERROR(conjunct->open(state));
_conjuncts.emplace_back(conjunct);
}
return Status::OK();
}

Expand Down Expand Up @@ -1335,8 +1388,9 @@ Status ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
continue;
}

state->get_query_ctx()->get_runtime_predicate(id).init_target(node_id(),
_slot_id_to_slot_desc);
RETURN_IF_ERROR(state->get_query_ctx()->get_runtime_predicate(id).init_target(
node_id(), _slot_id_to_slot_desc,
OperatorX<LocalStateType>::intermediate_row_desc()));
}

RETURN_IF_CANCELLED(state);
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class ScanLocalState : public ScanLocalStateBase {
virtual bool _push_down_topn(const vectorized::RuntimePredicate& predicate) { return false; }
virtual bool _is_key_column(const std::string& col_name) { return false; }
virtual PushDownType _should_push_down_bloom_filter() { return PushDownType::UNACCEPTABLE; }
virtual PushDownType _should_push_down_topn_filter() { return PushDownType::UNACCEPTABLE; }
virtual PushDownType _should_push_down_bitmap_filter() { return PushDownType::UNACCEPTABLE; }
virtual PushDownType _should_push_down_is_null_predicate() {
return PushDownType::UNACCEPTABLE;
Expand Down Expand Up @@ -254,6 +255,9 @@ class ScanLocalState : public ScanLocalStateBase {
Status _normalize_bloom_filter(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt);
Status _normalize_topn_filter(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
PushDownType* pdt);

Status _normalize_bitmap_filter(vectorized::VExprContext* expr_ctx, SlotDescriptor* slot,
std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
Expand Down
19 changes: 17 additions & 2 deletions be/src/runtime/runtime_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,31 @@ RuntimePredicate::RuntimePredicate(const TTopnFilterDesc& desc)
: create_comparison_predicate0<PredicateType::GE>;
}

void RuntimePredicate::init_target(
int32_t target_node_id, phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc) {
Status RuntimePredicate::init_target(
int32_t target_node_id, phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc,
const doris::RowDescriptor& desc) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
check_target_node_id(target_node_id);
if (target_is_slot(target_node_id)) {
_contexts[target_node_id].col_name =
slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id]
->col_name();
auto slot_id = get_texpr(target_node_id).nodes[0].slot_ref.slot_id;
auto column_id = desc.get_column_id(slot_id);
if (column_id < 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"RuntimePredicate has invalid slot id: {}, name: {}, desc: {}, slot_desc: {}",
slot_id,
slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id]
->col_name(),
desc.debug_string(),
slot_id_to_slot_desc[get_texpr(target_node_id).nodes[0].slot_ref.slot_id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我们这个可能不对,之前的可能是对的

->debug_string());
}
_contexts[target_node_id].predicate = SharedPredicate::create_shared(column_id);
}
_detected_target = true;
return Status::OK();
}

StringRef RuntimePredicate::_get_string_ref(const Field& field, const PrimitiveType type) {
Expand Down
13 changes: 8 additions & 5 deletions be/src/runtime/runtime_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ class RuntimePredicate {
public:
RuntimePredicate(const TTopnFilterDesc& desc);

void init_target(int32_t target_node_id,
phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc);
Status init_target(int32_t target_node_id,
phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc,
const doris::RowDescriptor& desc);

bool enable() const {
// when sort node and scan node are not in the same fragment, predicate will be disabled
Expand All @@ -66,9 +67,10 @@ class RuntimePredicate {
}
RETURN_IF_ERROR(tablet_schema->have_column(_contexts[target_node_id].col_name));
_contexts[target_node_id].tablet_schema = tablet_schema;
int64_t index = DORIS_TRY(_contexts[target_node_id].get_field_index())
_contexts[target_node_id]
.predicate = SharedPredicate::create_shared(index);
int64_t index = DORIS_TRY(_contexts[target_node_id].get_field_index());
DCHECK(_contexts[target_node_id].predicate != nullptr);
assert_cast<SharedPredicate*>(_contexts[target_node_id].predicate.get())
->set_column_id(cast_set<uint32_t>(index));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么要setcolumnid?

return Status::OK();
}

Expand Down Expand Up @@ -130,6 +132,7 @@ class RuntimePredicate {
struct TargetContext {
TExpr expr;
std::string col_name;
// TODO(gabriel): remove this
TabletSchemaSPtr tablet_schema;
std::shared_ptr<ColumnPredicate> predicate;

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class VExpr {
return std::ranges::any_of(_children.begin(), _children.end(),
[](VExprSPtr child) { return child->is_rf_wrapper(); });
}
virtual bool is_topn_filter() const { return false; }

virtual void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows) {
for (auto child : _children) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exprs/vtopn_pred.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class VTopNPred : public VExpr {
_source_node_id(source_node_id),
_expr_name(fmt::format("VTopNPred(source_node_id={})", _source_node_id)),
_target_ctx(std::move(target_ctx)) {}
bool is_topn_filter() const override { return true; }

static Status create_vtopn_pred(const TExpr& target_expr, int source_node_id,
vectorized::VExprSPtr& expr) {
Expand All @@ -63,6 +64,8 @@ class VTopNPred : public VExpr {
return Status::OK();
}

int source_node_id() const { return _source_node_id; }

Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override {
_predicate = &state->get_query_ctx()->get_runtime_predicate(_source_node_id);
RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
Expand Down
Loading