Skip to content

Commit

Permalink
multi match function add
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed Jul 15, 2024
1 parent 8d249a2 commit 1a438ae
Show file tree
Hide file tree
Showing 22 changed files with 685 additions and 65 deletions.
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class InvertedIndexIterator;
class InvertedIndexQueryCacheHandle;
class InvertedIndexFileReader;
struct InvertedIndexQueryInfo;

class InvertedIndexReader : public std::enable_shared_from_this<InvertedIndexReader> {
public:
explicit InvertedIndexReader(
Expand Down Expand Up @@ -147,6 +146,7 @@ class InvertedIndexReader : public std::enable_shared_from_this<InvertedIndexRea
TabletIndex _index_meta;
bool _has_null = true;
};
using InvertedIndexReaderPtr = std::shared_ptr<InvertedIndexReader>;

class FullTextIndexReader : public InvertedIndexReader {
ENABLE_FACTORY_CREATOR(FullTextIndexReader);
Expand Down Expand Up @@ -380,6 +380,8 @@ class InvertedIndexIterator {
[[nodiscard]] const std::map<string, string>& get_index_properties() const;
[[nodiscard]] bool has_null() { return _reader->has_null(); };

const InvertedIndexReaderPtr& reader() { return _reader; }

private:
OlapReaderStatistics* _stats = nullptr;
RuntimeState* _runtime_state = nullptr;
Expand Down
183 changes: 156 additions & 27 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
for (auto& expr : _remaining_conjunct_roots) {
_calculate_pred_in_remaining_conjunct_root(expr);
}
_calculate_func_in_remaining_conjunct_root();

_column_predicate_info.reset(new ColumnPredicateInfo());
if (_schema->rowid_col_idx() > 0) {
Expand Down Expand Up @@ -560,6 +561,7 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
}
}
_col_preds_except_leafnode_of_andnode.clear();
compound_func_exprs.clear();
// 1. if all conditions in the compound hit the inverted index and there are no other expr to handle.
// 2. then there is no need to generate index_result_column.
if (_enable_common_expr_pushdown && _remaining_conjunct_roots.empty()) {
Expand Down Expand Up @@ -809,25 +811,32 @@ Status SegmentIterator::_execute_predicates_except_leafnode_of_andnode(
auto v_literal_expr = std::dynamic_pointer_cast<doris::vectorized::VLiteral>(expr);
_column_predicate_info->query_values.insert(v_literal_expr->value());
} else if (node_type == TExprNodeType::BINARY_PRED || node_type == TExprNodeType::MATCH_PRED ||
node_type == TExprNodeType::IN_PRED) {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
if (expr->op() == TExprOpcode::type::FILTER_IN) {
_column_predicate_info->query_op = "in";
node_type == TExprNodeType::IN_PRED || node_type == TExprNodeType::FUNCTION_CALL) {
std::string result_sign;
if (node_type == TExprNodeType::FUNCTION_CALL) {
result_sign =
BeConsts::BLOCK_TEMP_COLUMN_PREFIX + std::to_string(expr->index_unique_id());
} else {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
if (expr->op() == TExprOpcode::type::FILTER_IN) {
_column_predicate_info->query_op = "in";
} else {
_column_predicate_info->query_op = "not_in";
}
} else {
_column_predicate_info->query_op = "not_in";
_column_predicate_info->query_op = expr->fn().name.function_name;
}
} else {
_column_predicate_info->query_op = expr->fn().name.function_name;
result_sign = _gen_predicate_result_sign(_column_predicate_info.get());
}

// get child condition result in compound conditions
auto pred_result_sign = _gen_predicate_result_sign(_column_predicate_info.get());
_column_predicate_info.reset(new ColumnPredicateInfo());
VLOG_DEBUG << "_gen_predicate_result_sign " << pred_result_sign;
if (_rowid_result_for_index.count(pred_result_sign) > 0 &&
_rowid_result_for_index[pred_result_sign].first) {
auto apply_result = _rowid_result_for_index[pred_result_sign].second;
VLOG_DEBUG << "result_sign " << result_sign;
if (_rowid_result_for_index.count(result_sign) > 0 &&
_rowid_result_for_index[result_sign].first) {
auto apply_result = _rowid_result_for_index[result_sign].second;
_pred_except_leafnode_of_andnode_evaluate_result.push_back(apply_result);
}
} else if (node_type == TExprNodeType::COMPOUND_PRED) {
Expand Down Expand Up @@ -871,7 +880,7 @@ Status SegmentIterator::_execute_compound_fn(const std::string& function_name) {

bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() {
// no compound predicates push down, so no need to filter
if (_col_preds_except_leafnode_of_andnode.size() == 0) {
if (_col_preds_except_leafnode_of_andnode.empty() && compound_func_exprs.empty()) {
return false;
}
for (auto pred : _col_preds_except_leafnode_of_andnode) {
Expand All @@ -885,6 +894,14 @@ bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() {
return false;
}
}
for (const auto& func_expr_pair : compound_func_exprs) {
const auto& expr = func_expr_pair.first;
std::string pred_result_sign =
BeConsts::BLOCK_TEMP_COLUMN_PREFIX + std::to_string(expr->index_unique_id());
if (!_rowid_result_for_index.contains(pred_result_sign)) {
return false;
}
}
return true;
}

Expand Down Expand Up @@ -992,6 +1009,16 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() {
}
}

for (const auto& func_expr_pair : compound_func_exprs) {
const auto& expr = func_expr_pair.first;
const auto& expr_ctx = func_expr_pair.second;
auto result = std::make_shared<roaring::Roaring>();
RETURN_IF_ERROR(execute_func_expr(expr, expr_ctx, result));
std::string result_sign =
BeConsts::BLOCK_TEMP_COLUMN_PREFIX + std::to_string(expr->index_unique_id());
_rowid_result_for_index.emplace(result_sign, std::make_pair(true, std::move(*result)));
}

return Status::OK();
}

Expand Down Expand Up @@ -1246,18 +1273,6 @@ Status SegmentIterator::_apply_inverted_index() {
std::vector<ColumnPredicate*> remaining_predicates;
std::set<const ColumnPredicate*> no_need_to_pass_column_predicate_set;

// TODO:Comment out this code before introducing range query functionality
/*for (const auto& entry : _opts.col_id_to_predicates) {
ColumnId column_id = entry.first;
auto pred = entry.second;
bool continue_apply = true;
RETURN_IF_ERROR(_apply_inverted_index_on_block_column_predicate(
column_id, pred.get(), no_need_to_pass_column_predicate_set, &continue_apply));
if (!continue_apply) {
break;
}
}*/

for (auto pred : _col_predicates) {
if (no_need_to_pass_column_predicate_set.count(pred) > 0) {
continue;
Expand Down Expand Up @@ -1293,6 +1308,23 @@ Status SegmentIterator::_apply_inverted_index() {
}
}

for (const auto& func_expr_pair : no_compound_func_exprs) {
const auto& expr = func_expr_pair.first;
const auto& expr_ctx = func_expr_pair.second;
auto result = std::make_shared<roaring::Roaring>();
RETURN_IF_ERROR(execute_func_expr(expr, expr_ctx, result));
_row_bitmap &= *result;
for (auto it = _remaining_conjunct_roots.begin(); it != _remaining_conjunct_roots.end();) {
if (*it == expr) {
std::erase_if(_common_expr_ctxs_push_down,
[&it](const auto& iter) { return iter->root() == *it; });
it = _remaining_conjunct_roots.erase(it);
} else {
++it;
}
}
}

// add a switch for inverted index filter
if (_opts.runtime_state &&
_opts.runtime_state->enable_common_expr_pushdown_for_inverted_index()) {
Expand Down Expand Up @@ -1431,6 +1463,18 @@ Status SegmentIterator::_init_inverted_index_iterators() {
return Status::OK();
}

Status SegmentIterator::_init_inverted_index_iterators(ColumnId cid) {
if (_inverted_index_iterators[cid] == nullptr) {
return _init_single_inverted_index_iterator.call([&] {
return _segment->new_inverted_index_iterator(
_opts.tablet_schema->column(cid),
_segment->_tablet_schema->get_inverted_index(_opts.tablet_schema->column(cid)),
_opts, &_inverted_index_iterators[cid]);
});
}
return Status::OK();
}

Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound,
rowid_t* rowid) {
if (_segment->_tablet_schema->keys_type() == UNIQUE_KEYS &&
Expand Down Expand Up @@ -2832,6 +2876,64 @@ void SegmentIterator::_calculate_pred_in_remaining_conjunct_root(
}
}

void SegmentIterator::_calculate_func_in_remaining_conjunct_root() {
auto hash = [](const vectorized::VExprSPtr& expr) -> std::size_t {
return std::hash<std::string>()(expr->expr_name());
};
auto equal = [](const vectorized::VExprSPtr& lhs, const vectorized::VExprSPtr& rhs) -> bool {
return lhs->equals(*rhs);
};

uint32_t next_id = 0;
std::unordered_map<vectorized::VExprSPtr, uint32_t, decltype(hash), decltype(equal)> unique_map(
0, hash, equal);

auto gen_func_unique_id = [&unique_map, &next_id](const vectorized::VExprSPtr& expr) {
auto it = unique_map.find(expr);
if (it != unique_map.end()) {
return it->second;
} else {
unique_map[expr] = ++next_id;
return next_id;
}
};

for (const auto& root_expr_ctx : _common_expr_ctxs_push_down) {
const auto& root_expr = root_expr_ctx->root();
if (root_expr == nullptr) {
continue;
}

std::stack<std::pair<vectorized::VExprSPtr, bool>> stack;
stack.emplace(root_expr, false);

while (!stack.empty()) {
const auto& [expr, has_compound_pred] = stack.top();
stack.pop();

bool current_has_compound_pred =
has_compound_pred || (expr->node_type() == TExprNodeType::COMPOUND_PRED);

if (expr->node_type() == TExprNodeType::FUNCTION_CALL &&
expr->can_push_down_to_index()) {
expr->set_index_unique_id(gen_func_unique_id(expr));
if (current_has_compound_pred) {
compound_func_exprs.emplace_back(expr, root_expr_ctx);
} else {
no_compound_func_exprs.emplace_back(expr, root_expr_ctx);
}
}

const auto& children = expr->children();
for (int32_t i = children.size() - 1; i >= 0; --i) {
if (!children[i]->children().empty()) {
stack.emplace(children[i], current_has_compound_pred);
}
}
}
}
}

bool SegmentIterator::_no_need_read_key_data(ColumnId cid, vectorized::MutableColumnPtr& column,
size_t nrows_read) {
if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) {
Expand Down Expand Up @@ -2895,12 +2997,39 @@ bool SegmentIterator::_can_opt_topn_reads() const {
return false;
}

if (!_common_expr_ctxs_push_down.empty() || !_remaining_conjunct_roots.empty()) {
return false;
}

if (!_col_predicates.empty() || !_col_preds_except_leafnode_of_andnode.empty()) {
return false;
}

if (!no_compound_func_exprs.empty() || !compound_func_exprs.empty()) {
return false;
}

return true;
}

Status SegmentIterator::execute_func_expr(const vectorized::VExprSPtr& expr,
const vectorized::VExprContextSPtr& expr_ctx,
std::shared_ptr<roaring::Roaring>& result) {
const auto& expr0 = expr->get_child(0);
if (!expr0 || expr0->node_type() != TExprNodeType::SLOT_REF) {
return Status::RuntimeError("cannot perform index filtering");
}

FuncExprParams params;
auto slot_expr = std::static_pointer_cast<vectorized::VSlotRef>(expr0);
params._column_id = _schema->column_id(slot_expr->column_id());
params._unique_id = _schema->unique_id(slot_expr->column_id());
params._column_name = _opts.tablet_schema->column(params._column_id).name();
params._segment_iterator = this;
params.result = result;

return expr->eval_inverted_index(expr_ctx.get(), params);
}

} // namespace segment_v2
} // namespace doris
28 changes: 28 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ struct ColumnPredicateInfo {
int32_t column_id;
};

class SegmentIterator;
struct FuncExprParams {
ColumnId _column_id = 0;
uint32_t _unique_id = 0;
std::string _column_name;
SegmentIterator* _segment_iterator = nullptr;
std::shared_ptr<roaring::Roaring> result;
};

class SegmentIterator : public RowwiseIterator {
public:
SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema);
Expand All @@ -123,6 +132,8 @@ class SegmentIterator : public RowwiseIterator {
std::vector<RowLocation>* block_row_locations) override;

const Schema& schema() const override { return *_schema; }
Segment& segment() { return *_segment; }
StorageReadOptions& storage_read_options() { return _opts; }
bool is_lazy_materialization_read() const override { return _lazy_materialization_read; }
uint64_t data_id() const override { return _segment->id(); }
RowsetId rowset_id() const { return _segment->rowset_id(); }
Expand All @@ -142,6 +153,11 @@ class SegmentIterator : public RowwiseIterator {
return updated;
}

std::vector<std::unique_ptr<InvertedIndexIterator>>& inverted_index_iterators() {
return _inverted_index_iterators;
}
[[nodiscard]] Status _init_inverted_index_iterators(ColumnId cid);

private:
Status _next_batch_internal(vectorized::Block* block);

Expand Down Expand Up @@ -310,6 +326,7 @@ class SegmentIterator : public RowwiseIterator {
bool _check_column_pred_all_push_down(const std::string& column_name, bool in_compound = false,
bool is_match = false);
void _calculate_pred_in_remaining_conjunct_root(const vectorized::VExprSPtr& expr);
void _calculate_func_in_remaining_conjunct_root();

// todo(wb) remove this method after RowCursor is removed
void _convert_rowcursor_to_short_key(const RowCursor& key, size_t num_keys) {
Expand Down Expand Up @@ -392,6 +409,10 @@ class SegmentIterator : public RowwiseIterator {
void _initialize_predicate_results();
bool _check_all_predicates_passed_inverted_index_for_column(ColumnId cid);

Status execute_func_expr(const vectorized::VExprSPtr& expr,
const vectorized::VExprContextSPtr& expr_ctx,
std::shared_ptr<roaring::Roaring>& result);

class BitmapRangeIterator;
class BackwardBitmapRangeIterator;

Expand Down Expand Up @@ -458,6 +479,11 @@ class SegmentIterator : public RowwiseIterator {
// make a copy of `_opts.column_predicates` in order to make local changes
std::vector<ColumnPredicate*> _col_predicates;
std::vector<ColumnPredicate*> _col_preds_except_leafnode_of_andnode;

using FuncExprPair = std::pair<vectorized::VExprSPtr, vectorized::VExprContextSPtr>;
std::vector<FuncExprPair> no_compound_func_exprs;
std::vector<FuncExprPair> compound_func_exprs;

vectorized::VExprContextSPtrs _common_expr_ctxs_push_down;
bool _enable_common_expr_pushdown = false;
std::vector<vectorized::VExprSPtr> _remaining_conjunct_roots;
Expand Down Expand Up @@ -504,6 +530,8 @@ class SegmentIterator : public RowwiseIterator {

std::unordered_map<int, std::unordered_map<std::string, bool>>
_column_predicate_inverted_index_status;

DorisCallOnce<Status> _init_single_inverted_index_iterator;
};

} // namespace segment_v2
Expand Down
Loading

0 comments on commit 1a438ae

Please sign in to comment.