Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class ColumnValueRange {

size_t get_fixed_value_size() const { return _fixed_values.size(); }

void to_olap_filter(std::vector<FilterOlapParam<TCondition>>& filters) {
void to_olap_filter(std::vector<FilterOlapParam<TCondition>>& filters) const {
if (is_fixed_value_range()) {
// 1. convert to in filter condition
to_in_condition(filters, true);
Expand Down Expand Up @@ -259,7 +259,8 @@ class ColumnValueRange {
}
}

void to_in_condition(std::vector<FilterOlapParam<TCondition>>& filters, bool is_in = true) {
void to_in_condition(std::vector<FilterOlapParam<TCondition>>& filters,
bool is_in = true) const {
TCondition condition;
condition.__set_column_name(_column_name);
condition.__set_condition_op(is_in ? "*=" : "!*=");
Expand Down
20 changes: 13 additions & 7 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,10 +892,16 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
for (int column_index = 0; column_index < column_names.size() &&
!_scan_keys.has_range_value() && !eos && !should_break;
++column_index) {
auto iter = _colname_to_value_range.find(column_names[column_index]);
if (_colname_to_value_range.end() == iter) {
if (p._colname_to_slot_id.find(column_names[column_index]) ==
p._colname_to_slot_id.end()) {
break;
}
auto iter =
_slot_id_to_value_range.find(p._colname_to_slot_id[column_names[column_index]]);
if (_slot_id_to_value_range.end() == iter) {
break;
}
const auto& value_range = iter->second.second;

RETURN_IF_ERROR(std::visit(
[&](auto&& range) {
Expand All @@ -908,28 +914,28 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
_scan_keys.extend_scan_key(temp_range, p._max_scan_key_num,
&exact_range, &eos, &should_break));
if (exact_range) {
_colname_to_value_range.erase(iter->first);
_slot_id_to_value_range.erase(iter->first);
}
} else {
// if exceed max_pushdown_conditions_per_column, use whole_value_rang instead
// and will not erase from _colname_to_value_range, it must be not exact_range
// and will not erase from _slot_id_to_value_range, it must be not exact_range
temp_range.set_whole_value_range();
RETURN_IF_ERROR(
_scan_keys.extend_scan_key(temp_range, p._max_scan_key_num,
&exact_range, &eos, &should_break));
}
return Status::OK();
},
iter->second));
value_range));
}
if (eos) {
_eos = true;
_scan_dependency->set_ready();
}

for (auto& iter : _colname_to_value_range) {
for (auto& iter : _slot_id_to_value_range) {
std::vector<FilterOlapParam<TCondition>> filters;
std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second);
std::visit([&](auto&& range) { range.to_olap_filter(filters); }, iter.second.second);

for (const auto& filter : filters) {
_olap_filters.emplace_back(filter);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
}
++it;
}

for (auto& it : _slot_id_to_value_range) {
std::visit(
[&](auto&& range) {
Expand All @@ -245,7 +246,6 @@ Status ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
}
},
it.second.second);
_colname_to_value_range[it.second.first->col_name()] = it.second.second;
}

return Status::OK();
Expand Down
7 changes: 1 addition & 6 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,21 +320,16 @@ class ScanLocalState : public ScanLocalStateBase {
// Parsed from conjuncts
phmap::flat_hash_map<int, std::pair<SlotDescriptor*, ColumnValueRangeType>>
_slot_id_to_value_range;
// column -> ColumnValueRange
// We use _colname_to_value_range to store a column and its conresponding value ranges.
std::unordered_map<std::string, ColumnValueRangeType> _colname_to_value_range;

// But if a col is with value range, eg: 1 < col < 10, which is "!is_fixed_range",
// in this case we can not merge "1 < col < 10" with "col not in (2)".
// So we have to save "col not in (2)" to another structure: "_not_in_value_ranges".
// When the data source try to use the value ranges, it should use both ranges in
// "_colname_to_value_range" and in "_not_in_value_ranges"
// "_slot_id_to_value_range" and in "_not_in_value_ranges"
std::vector<ColumnValueRangeType> _not_in_value_ranges;

std::atomic<bool> _eos = false;

std::mutex _block_lock;

std::vector<std::shared_ptr<Dependency>> _filter_dependencies;

// ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
Expand Down
Loading