Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 69 additions & 14 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <memory>
#include <numeric>
#include <set>
#include <unordered_map>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -701,6 +702,8 @@ Status SegmentIterator::_apply_ann_topn_predicate() {
"Ann topn can not be evaluated by ann index, has_ann_index: {}, "
"has_common_expr_push_down: {}, has_column_predicate: {}",
has_ann_index, has_common_expr_push_down, has_column_predicate);
// Disable index-only scan on ann indexed column.
_need_read_data_indices[src_cid] = true;
return Status::OK();
}

Expand All @@ -712,11 +715,15 @@ Status SegmentIterator::_apply_ann_topn_predicate() {
if (_ann_topn_runtime->is_asc()) {
VLOG_DEBUG << fmt::format(
"Asc topn for inner product can not be evaluated by ann index");
// Disable index-only scan on ann indexed column.
_need_read_data_indices[src_cid] = true;
return Status::OK();
}
} else {
if (!_ann_topn_runtime->is_asc()) {
VLOG_DEBUG << fmt::format("Desc topn for l2/cosine can not be evaluated by ann index");
// Disable index-only scan on ann indexed column.
_need_read_data_indices[src_cid] = true;
return Status::OK();
}
}
Expand All @@ -727,6 +734,8 @@ Status SegmentIterator::_apply_ann_topn_predicate() {
"ann index",
metric_to_string(_ann_topn_runtime->get_metric_type()),
metric_to_string(ann_index_reader->get_metric_type()));
// Disable index-only scan on ann indexed column.
_need_read_data_indices[src_cid] = true;
return Status::OK();
}

Expand All @@ -738,6 +747,8 @@ Status SegmentIterator::_apply_ann_topn_predicate() {
"to "
"filter",
pre_size, rows_of_segment);
// Disable index-only scan on ann indexed column.
_need_read_data_indices[src_cid] = true;
return Status::OK();
}
vectorized::IColumn::MutablePtr result_column;
Expand Down Expand Up @@ -772,6 +783,10 @@ Status SegmentIterator::_apply_ann_topn_predicate() {
virtual_column_iter->prepare_materialization(std::move(result_column),
std::move(result_row_ids));

_need_read_data_indices[src_cid] = false;
VLOG_DEBUG << fmt::format(
"Enable ANN index-only scan for src column cid {} (skip reading data pages)", src_cid);

return Status::OK();
}

Expand Down Expand Up @@ -1044,9 +1059,9 @@ Status SegmentIterator::_apply_index_expr() {
segment_v2::AnnIndexStats ann_index_stats;
for (const auto& expr_ctx : _common_expr_ctxs_push_down) {
size_t origin_rows = _row_bitmap.cardinality();
RETURN_IF_ERROR(expr_ctx->evaluate_ann_range_search(_index_iterators, _schema->column_ids(),
_column_iterators, _row_bitmap,
ann_index_stats));
RETURN_IF_ERROR(expr_ctx->evaluate_ann_range_search(
_index_iterators, _schema->column_ids(), _column_iterators,
_common_expr_to_slotref_map, _row_bitmap, ann_index_stats));
_opts.stats->rows_ann_index_range_filtered += (origin_rows - _row_bitmap.cardinality());
_opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value();
_opts.stats->ann_index_range_search_ns += ann_index_stats.search_costs_ns.value();
Expand All @@ -1057,7 +1072,7 @@ Status SegmentIterator::_apply_index_expr() {
}

for (auto it = _common_expr_ctxs_push_down.begin(); it != _common_expr_ctxs_push_down.end();) {
if ((*it)->root()->has_been_executed()) {
if ((*it)->root()->ann_range_search_executedd()) {
_opts.stats->ann_index_range_search_cnt++;
it = _common_expr_ctxs_push_down.erase(it);
} else {
Expand Down Expand Up @@ -1808,14 +1823,6 @@ Status SegmentIterator::_vec_init_lazy_materialization() {
if (pred_id_set.find(cid) != pred_id_set.end()) {
_predicate_column_ids.push_back(cid);
}
// In the past, if schema columns > pred columns, the _lazy_materialization_read maybe == false, but
// we make sure using _lazy_materialization_read= true now, so these logic may never happens. I comment
// these lines and we could delete them in the future to make the code more clear.
// else if (non_pred_set.find(cid) != non_pred_set.end()) {
// _predicate_column_ids.push_back(cid);
// // when _lazy_materialization_read = false, non-predicate column should also be filtered by sel idx, so we regard it as pred columns
// _is_pred_column[cid] = true;
// }
}
} else if (_is_need_expr_eval) {
DCHECK(!_is_need_vec_eval && !_is_need_short_eval);
Expand Down Expand Up @@ -2029,8 +2036,9 @@ Status SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
if (column_in_block_is_nothing || column_is_normal) {
block->replace_by_position(loc, std::move(_current_return_columns[cid]));
VLOG_DEBUG << fmt::format(
"Output non-predicate column, cid: {}, loc: {}, col_name: {}", cid, loc,
_schema->column(cid)->name());
"Output non-predicate column, cid: {}, loc: {}, col_name: {}, rows {}", cid,
loc, _schema->column(cid)->name(),
block->get_by_position(loc).column->size());
}
// Means virtual column in block has been materialized(maybe by common expr).
// so do nothing here.
Expand Down Expand Up @@ -2073,6 +2081,8 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16

for (auto cid : _predicate_column_ids) {
auto& column = _current_return_columns[cid];
VLOG_DEBUG << fmt::format("Reading column {}, col_name {}", cid,
_schema->column(cid)->name());
if (!_virtual_column_exprs.contains(cid)) {
if (_no_need_read_key_data(cid, column, nrows_read)) {
VLOG_DEBUG << fmt::format("Column {} no need to read.", cid);
Expand Down Expand Up @@ -2822,6 +2832,8 @@ void SegmentIterator::_calculate_expr_in_remaining_conjunct_root() {
if (root_expr == nullptr) {
continue;
}
_common_expr_to_slotref_map[root_expr_ctx.get()] =
std::unordered_map<ColumnId, vectorized::VExpr*>();

std::stack<vectorized::VExprSPtr> stack;
stack.emplace(root_expr);
Expand All @@ -2831,10 +2843,53 @@ void SegmentIterator::_calculate_expr_in_remaining_conjunct_root() {
stack.pop();

for (const auto& child : expr->children()) {
if (child->is_virtual_slot_ref()) {
// Expand virtual slot ref to its underlying expression tree and
// collect real slot refs used inside. We still associate those
// slot refs with the current parent expr node for inverted index
// tracking, just like normal slot refs.
auto* vir_slot_ref = assert_cast<vectorized::VirtualSlotRef*>(child.get());
auto vir_expr = vir_slot_ref->get_virtual_column_expr();
if (vir_expr) {
std::stack<vectorized::VExprSPtr> vir_stack;
vir_stack.emplace(vir_expr);

while (!vir_stack.empty()) {
const auto& vir_node = vir_stack.top();
vir_stack.pop();

for (const auto& vir_child : vir_node->children()) {
if (vir_child->is_slot_ref()) {
auto* inner_slot_ref =
assert_cast<vectorized::VSlotRef*>(vir_child.get());
_common_expr_inverted_index_status[_schema->column_id(
inner_slot_ref->column_id())][expr.get()] = false;
_common_expr_to_slotref_map[root_expr_ctx.get()]
[inner_slot_ref->column_id()] =
expr.get();
// Print debug info for virtual slot expansion
LOG(INFO) << fmt::format(
"common_expr_ctx_ptr: {}, expr_ptr: {}, "
"virtual_slotref_ptr: {}, inner_slotref_ptr: {}, "
"column_id: {}",
fmt::ptr(root_expr_ctx.get()), fmt::ptr(expr.get()),
fmt::ptr(child.get()), fmt::ptr(vir_child.get()),
inner_slot_ref->column_id());
}

if (!vir_child->children().empty()) {
vir_stack.emplace(vir_child);
}
}
}
}
}
if (child->is_slot_ref()) {
auto* column_slot_ref = assert_cast<vectorized::VSlotRef*>(child.get());
_common_expr_inverted_index_status[_schema->column_id(
column_slot_ref->column_id())][expr.get()] = false;
_common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] =
expr.get();
}
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ class SegmentIterator : public RowwiseIterator {

// key is column uid, value is the sparse column cache
std::unordered_map<int32_t, PathToSparseColumnCacheUPtr> _variant_sparse_column_cache;

std::unordered_map<vectorized::VExprContext*, std::unordered_map<ColumnId, vectorized::VExpr*>>
_common_expr_to_slotref_map;
};

} // namespace segment_v2
Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/rowset/segment_v2/virtual_column_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstring>
#include <memory>

#include "common/logging.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nothing.h"

Expand Down Expand Up @@ -79,12 +80,15 @@ void VirtualColumnIterator::prepare_materialization(vectorized::IColumn::Ptr col

_size = n;

std::string msg;
for (const auto& pair : _row_id_to_idx) {
msg += fmt::format("{}: {}, ", pair.first, pair.second);
if (VLOG_DEBUG_IS_ON) {
std::string msg;
for (const auto& pair : _row_id_to_idx) {
msg += fmt::format("{}: {}, ", pair.first, pair.second);
}

VLOG_DEBUG << fmt::format("virtual column iterator, row_idx_to_idx:\n{}", msg);
}

VLOG_DEBUG << fmt::format("virtual column iterator, row_idx_to_idx:\n{}", msg);
_filter = doris::vectorized::IColumn::Filter(_size, 0);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
auto& p = _parent->cast<OlapScanOperatorX>();

for (auto uid : p._olap_scan_node.output_column_unique_ids) {
_maybe_read_column_ids.emplace(uid);
_output_column_ids.emplace(uid);
}

// ranges constructed from scan keys
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
OlapScanKeys _scan_keys;
std::vector<FilterOlapParam<TCondition>> _olap_filters;
// If column id in this set, indicate that we need to read data after index filtering
std::set<int32_t> _maybe_read_column_ids;
std::set<int32_t> _output_column_ids;

std::unique_ptr<RuntimeProfile> _segment_profile;
std::unique_ptr<RuntimeProfile> _index_filter_profile;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type;
_tablet_reader_params.score_runtime = _score_runtime;
_tablet_reader_params.output_columns =
((pipeline::OlapScanLocalState*)_local_state)->_maybe_read_column_ids;
((pipeline::OlapScanLocalState*)_local_state)->_output_column_ids;
_tablet_reader_params.ann_topn_runtime = _ann_topn_runtime;
for (const auto& ele :
((pipeline::OlapScanLocalState*)_local_state)->_cast_types_for_variants) {
Expand Down
24 changes: 20 additions & 4 deletions be/src/vec/exprs/vectorized_fn_call.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,15 +605,31 @@ Status VectorizedFnCall::evaluate_ann_range_search(
}
virtual_column_iterator->prepare_materialization(std::move(distance_col),
std::move(result.row_ids));
_virtual_column_is_fulfilled = true;
} else {
DCHECK(this->op() != TExprOpcode::LE && this->op() != TExprOpcode::LT)
<< "Should not have distance";
// Whether the ANN index should have produced distance depends on metric and operator:
// - L2: distance is produced for LE/LT; not produced for GE/GT
// - IP: distance is produced for GE/GT; not produced for LE/LT
#ifndef NDEBUG
const bool should_have_distance =
(range_search_runtime.is_le_or_lt &&
range_search_runtime.metric_type == AnnIndexMetric::L2) ||
(!range_search_runtime.is_le_or_lt &&
range_search_runtime.metric_type == AnnIndexMetric::IP);
// If we expected distance but didn't get it, assert in debug to catch logic errors.
DCHECK(!should_have_distance) << "Expected distance from ANN index but got none";
#endif
_virtual_column_is_fulfilled = false;
}
} else {
// Dest is not virtual column.
_virtual_column_is_fulfilled = true;
}

_has_been_executed = true;
VLOG_DEBUG << fmt::format("Ann range search filtered {} rows, origin {} rows",
origin_num - row_bitmap.cardinality(), origin_num);
VLOG_DEBUG << fmt::format(
"Ann range search filtered {} rows, origin {} rows, virtual column is full-filled: {}",
origin_num - row_bitmap.cardinality(), origin_num, _virtual_column_is_fulfilled);

ann_index_stats = *stats;
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/vec/exprs/vexpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1002,9 +1002,13 @@ void VExpr::prepare_ann_range_search(const doris::VectorSearchUserParams& params
}
}

bool VExpr::has_been_executed() {
bool VExpr::ann_range_search_executedd() {
return _has_been_executed;
}

bool VExpr::ann_dist_is_fulfilled() const {
return _virtual_column_is_fulfilled;
}

#include "common/compile_check_end.h"
} // namespace doris::vectorized
11 changes: 10 additions & 1 deletion be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class VExpr {

bool is_slot_ref() const { return _node_type == TExprNodeType::SLOT_REF; }

bool is_virtual_slot_ref() const { return _node_type == TExprNodeType::VIRTUAL_SLOT_REF; }

bool is_column_ref() const { return _node_type == TExprNodeType::COLUMN_REF; }

virtual bool is_literal() const { return false; }
Expand Down Expand Up @@ -308,7 +310,9 @@ class VExpr {
segment_v2::AnnRangeSearchRuntime& range_search_runtime,
bool& suitable_for_ann_index);

bool has_been_executed();
bool ann_range_search_executedd();

bool ann_dist_is_fulfilled() const;

protected:
/// Simple debug string that provides no expr subclass-specific information
Expand Down Expand Up @@ -392,7 +396,12 @@ class VExpr {
uint32_t _index_unique_id = 0;
bool _enable_inverted_index_query = true;

// Indicates whether the expr row_bitmap has been updated.
bool _has_been_executed = false;
// Indicates whether the virtual column is fulfilled.
// NOTE, if there is no virtual column in the expr tree, and expr
// is evaluated by ann index, this flag is still true.
bool _virtual_column_is_fulfilled = false;
};

} // namespace vectorized
Expand Down
43 changes: 39 additions & 4 deletions be/src/vec/exprs/vexpr_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/exception.h"
#include "common/status.h"
#include "olap/olap_common.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "udf/udf.h"
Expand Down Expand Up @@ -463,12 +465,45 @@ Status VExprContext::evaluate_ann_range_search(
const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& cid_to_index_iterators,
const std::vector<ColumnId>& idx_to_cid,
const std::vector<std::unique_ptr<segment_v2::ColumnIterator>>& column_iterators,
const std::unordered_map<vectorized::VExprContext*,
std::unordered_map<ColumnId, vectorized::VExpr*>>&
common_expr_to_slotref_map,
roaring::Roaring& row_bitmap, segment_v2::AnnIndexStats& ann_index_stats) {
if (_root != nullptr) {
return _root->evaluate_ann_range_search(_ann_range_search_runtime, cid_to_index_iterators,
idx_to_cid, column_iterators, row_bitmap,
ann_index_stats);
if (_root == nullptr) {
return Status::OK();
}

RETURN_IF_ERROR(_root->evaluate_ann_range_search(
_ann_range_search_runtime, cid_to_index_iterators, idx_to_cid, column_iterators,
row_bitmap, ann_index_stats));

if (!_root->ann_range_search_executedd()) {
return Status::OK();
}

if (!_root->ann_dist_is_fulfilled()) {
// Do not perform index scan in this case.
return Status::OK();
}

auto src_col_idx = _ann_range_search_runtime.src_col_idx;
auto slot_ref_map_it = common_expr_to_slotref_map.find(this);
if (slot_ref_map_it == common_expr_to_slotref_map.end()) {
return Status::OK();
}
auto& slot_ref_map = slot_ref_map_it->second;
ColumnId cid = idx_to_cid[src_col_idx];
if (slot_ref_map.find(cid) == slot_ref_map.end()) {
return Status::OK();
}
const VExpr* slot_ref_expr_addr = slot_ref_map.find(cid)->second;
_inverted_index_context->set_true_for_inverted_index_status(slot_ref_expr_addr,
idx_to_cid[cid]);

VLOG_DEBUG << fmt::format(
"Evaluate ann range search for expr {}, src_col_idx {}, cid {}, row_bitmap "
"cardinality {}",
_root->debug_string(), src_col_idx, cid, row_bitmap.cardinality());
return Status::OK();
}

Expand Down
Loading
Loading