Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 4 additions & 40 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,27 +207,6 @@ class ColumnValueRange {
_contain_null = _is_nullable_col && contain_null;
}

void attach_profile_counter(
int runtime_filter_id,
std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter,
std::shared_ptr<RuntimeProfile::Counter> predicate_input_rows_counter,
std::shared_ptr<RuntimeProfile::Counter> predicate_always_true_rows_counter) {
DCHECK(predicate_filtered_rows_counter != nullptr);
DCHECK(predicate_input_rows_counter != nullptr);

_runtime_filter_id = runtime_filter_id;

if (predicate_filtered_rows_counter != nullptr) {
_predicate_filtered_rows_counter = predicate_filtered_rows_counter;
}
if (predicate_input_rows_counter != nullptr) {
_predicate_input_rows_counter = predicate_input_rows_counter;
}
if (predicate_always_true_rows_counter != nullptr) {
_predicate_always_true_rows_counter = predicate_always_true_rows_counter;
}
}

int precision() const { return _precision; }

int scale() const { return _scale; }
Expand Down Expand Up @@ -294,15 +273,6 @@ class ColumnValueRange {
primitive_type == PrimitiveType::TYPE_DATETIMEV2 ||
primitive_type == PrimitiveType::TYPE_TIMESTAMPTZ ||
primitive_type == PrimitiveType::TYPE_DECIMAL256;

int _runtime_filter_id = -1;

std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter =
std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
std::shared_ptr<RuntimeProfile::Counter> _predicate_input_rows_counter =
std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
std::shared_ptr<RuntimeProfile::Counter> _predicate_always_true_rows_counter =
std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
};
template <>
const typename ColumnValueRange<TYPE_FLOAT>::CppType ColumnValueRange<TYPE_FLOAT>::TYPE_MIN;
Expand All @@ -315,12 +285,6 @@ const typename ColumnValueRange<TYPE_DOUBLE>::CppType ColumnValueRange<TYPE_DOUB

class OlapScanKeys {
public:
OlapScanKeys()
: _has_range_value(false),
_begin_include(true),
_end_include(true),
_is_convertible(true) {}

// TODO(gabriel): use ColumnPredicate to extend scan key
template <PrimitiveType primitive_type>
Status extend_scan_key(ColumnValueRange<primitive_type>& range, int32_t max_scan_key_num,
Expand Down Expand Up @@ -358,10 +322,10 @@ class OlapScanKeys {
private:
std::vector<OlapTuple> _begin_scan_keys;
std::vector<OlapTuple> _end_scan_keys;
bool _has_range_value;
bool _begin_include;
bool _end_include;
bool _is_convertible;
bool _has_range_value = false;
bool _begin_include = false;
bool _end_include = false;
bool _is_convertible = false;
};

using ColumnValueRangeType = std::variant<
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/shared_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class SharedPredicate final : public ColumnPredicate {
ColumnPredicate::debug_string(), _nested ? _nested->debug_string() : "null");
return fmt::to_string(debug_string_buffer);
}
void set_column_id(uint32_t column_id) { _column_id = column_id; }
std::shared_ptr<ColumnPredicate> clone(uint32_t column_id) const override {
// All scanner thread should share the same SharedPredicate object.
return std::const_pointer_cast<ColumnPredicate>(shared_from_this());
Expand Down
25 changes: 0 additions & 25 deletions be/src/olap/tablet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,31 +116,6 @@ Status TabletReader::init(const ReaderParams& read_params) {
return res;
}

// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation
bool TabletReader::_optimize_for_single_rowset(
const std::vector<RowsetReaderSharedPtr>& rs_readers) {
bool has_delete_rowset = false;
bool has_overlapping = false;
int nonoverlapping_count = 0;
for (const auto& rs_reader : rs_readers) {
if (rs_reader->rowset()->rowset_meta()->delete_flag()) {
has_delete_rowset = true;
break;
}
if (rs_reader->rowset()->rowset_meta()->num_rows() > 0) {
if (rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) {
// when there are overlapping segments, can not do directly read
has_overlapping = true;
break;
} else if (++nonoverlapping_count > 1) {
break;
}
}
}

return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset;
}

Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
SCOPED_RAW_TIMER(&_stats.tablet_reader_capture_rs_readers_timer_ns);
if (read_params.rs_splits.empty()) {
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/tablet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ class TabletReader {
std::vector<ColumnId>* origin_return_columns = nullptr;
std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;
TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
vectorized::VExpr* remaining_vconjunct_root = nullptr;
std::vector<vectorized::VExprSPtr> remaining_conjunct_roots;
vectorized::VExprContextSPtrs common_expr_ctxs_push_down;

Expand Down Expand Up @@ -255,8 +254,6 @@ class TabletReader {

Status _capture_rs_readers(const ReaderParams& read_params);

bool _optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers);

Status _init_keys_param(const ReaderParams& read_params);

Status _init_orderby_keys_param(const ReaderParams& read_params);
Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {

Status prepare(RuntimeState* state) override;

bool is_file_scan_operator() const override { return true; }

// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
int parallelism(RuntimeState* state) const override {
return _batch_split_mode ? 1 : ScanOperatorX<FileScanLocalState>::parallelism(state);
Expand Down
15 changes: 15 additions & 0 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,21 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i
<< ", sort_limit: " << _olap_scan_node.sort_limit
<< ", isset.sort_limit: " << _olap_scan_node.__isset.sort_limit;
})

if (_olap_scan_node.__isset.columns_desc && !_olap_scan_node.columns_desc.empty() &&
_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
_tablet_schema = std::make_shared<TabletSchema>();
_tablet_schema->clear_columns();
for (const auto& column_desc : _olap_scan_node.columns_desc) {
_tablet_schema->append_column(TabletColumn(column_desc));
}
if (_olap_scan_node.__isset.schema_version) {
_tablet_schema->set_schema_version(_olap_scan_node.schema_version);
}
if (_olap_scan_node.__isset.indexes_desc) {
_tablet_schema->update_indexes_from_thrift(_olap_scan_node.indexes_desc);
}
}
}

#include "common/compile_check_end.h"
Expand Down
14 changes: 14 additions & 0 deletions be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
if (!predicate.target_is_slot(_parent->node_id())) {
return false;
}
if (!olap_scan_node().__isset.columns_desc || olap_scan_node().columns_desc.empty() ||
olap_scan_node().columns_desc[0].col_unique_id < 0) {
// Disable topN filter if there is no schema info
return false;
}
return _is_key_column(predicate.get_col_name(_parent->node_id()));
}

Expand Down Expand Up @@ -298,10 +303,19 @@ class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
const DescriptorTbl& descs, int parallel_tasks,
const TQueryCacheParam& cache_param);

int get_column_id(const std::string& col_name) const override {
if (!_tablet_schema) {
return -1;
}
const auto& column = *DORIS_TRY(_tablet_schema->column(col_name));
return _tablet_schema->field_index(column.unique_id());
}

private:
friend class OlapScanLocalState;
TOlapScanNode _olap_scan_node;
TQueryCacheParam _cache_param;
TabletSchemaSPtr _tablet_schema;
};

#include "common/compile_check_end.h"
Expand Down
93 changes: 24 additions & 69 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,45 +298,6 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c
vectorized::VExprSPtr& output_expr) {
const auto expr_root = context->root();
static constexpr auto is_leaf = [](auto&& expr) { return !expr->is_and_expr(); };
auto in_predicate_checker = [&](const vectorized::VExprSPtrs& children,
SlotDescriptor** slot_desc, ColumnValueRangeType** range) {
if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
}
std::shared_ptr<vectorized::VSlotRef> slot =
std::dynamic_pointer_cast<vectorized::VSlotRef>(children[0]);
*slot_desc =
_parent->cast<typename Derived::Parent>()._slot_id_to_slot_desc[slot->slot_id()];
return _is_predicate_acting_on_slot(slot, range);
};
auto eq_predicate_checker = [&](const vectorized::VExprSPtrs& children,
SlotDescriptor** slot_desc, ColumnValueRangeType** range) {
if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
}
std::shared_ptr<vectorized::VSlotRef> slot =
std::dynamic_pointer_cast<vectorized::VSlotRef>(children[0]);
CHECK(slot != nullptr);
*slot_desc =
_parent->cast<typename Derived::Parent>()._slot_id_to_slot_desc[slot->slot_id()];
return _is_predicate_acting_on_slot(slot, range);
};
auto topn_predicate_checker = [&](const vectorized::VExprSPtrs& children,
SlotDescriptor** slot_desc, ColumnValueRangeType** range) {
if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
}
std::shared_ptr<vectorized::VSlotRef> slot =
std::dynamic_pointer_cast<vectorized::VSlotRef>(children[0]);
CHECK(slot != nullptr);
*slot_desc =
_parent->cast<typename Derived::Parent>()._slot_id_to_slot_desc[slot->slot_id()];
return _is_predicate_acting_on_slot(slot, range);
};

if (expr_root != nullptr) {
if (is_leaf(expr_root)) {
if (dynamic_cast<vectorized::VirtualSlotRef*>(expr_root.get())) {
Expand All @@ -363,30 +324,10 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c
slotref = std::dynamic_pointer_cast<vectorized::VSlotRef>(
vectorized::VExpr::expr_without_cast(child));
}
if (in_predicate_checker(expr_root->children(), &slot, &range) ||
eq_predicate_checker(expr_root->children(), &slot, &range) ||
topn_predicate_checker(expr_root->children(), &slot, &range)) {
if (_is_predicate_acting_on_slot(expr_root->children(), &slot, &range)) {
Status status = Status::OK();
std::visit(
[&](auto& value_range) {
bool need_set_runtime_filter_id = value_range.is_whole_value_range() &&
expr_root->is_rf_wrapper();
Defer set_runtime_filter_id {[&]() {
// rf predicates is always appended to the end of conjuncts. We need to ensure that there is no non-rf predicate after rf-predicate
// If it is not a whole range, it means that the column has other non-rf predicates, so it cannot be marked as rf predicate.
// If the range where non-rf predicates are located is incorrectly marked as rf, can_ignore will return true, resulting in the predicate not taking effect and getting an incorrect result.
if (need_set_runtime_filter_id) {
auto* rf_expr = assert_cast<vectorized::VRuntimeFilterWrapper*>(
expr_root.get());
DCHECK(rf_expr->predicate_filtered_rows_counter() != nullptr);
DCHECK(rf_expr->predicate_input_rows_counter() != nullptr);
value_range.attach_profile_counter(
rf_expr->filter_id(),
rf_expr->predicate_filtered_rows_counter(),
rf_expr->predicate_input_rows_counter(),
rf_expr->predicate_always_true_rows_counter());
}
}};
RETURN_IF_PUSH_DOWN(
_normalize_in_and_eq_predicate(
context, slot, _slot_id_to_predicates[slot->id()],
Expand Down Expand Up @@ -562,8 +503,17 @@ Status ScanLocalState<Derived>::_normalize_function_filters(vectorized::VExprCon
}

template <typename Derived>
bool ScanLocalState<Derived>::_is_predicate_acting_on_slot(
const std::shared_ptr<vectorized::VSlotRef>& slot_ref, ColumnValueRangeType** range) {
bool ScanLocalState<Derived>::_is_predicate_acting_on_slot(const vectorized::VExprSPtrs& children,
SlotDescriptor** slot_desc,
ColumnValueRangeType** range) {
if (children.empty() || children[0]->node_type() != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
return false;
}
std::shared_ptr<vectorized::VSlotRef> slot_ref =
std::dynamic_pointer_cast<vectorized::VSlotRef>(children[0]);
*slot_desc =
_parent->cast<typename Derived::Parent>()._slot_id_to_slot_desc[slot_ref->slot_id()];
auto entry = _slot_id_to_predicates.find(slot_ref->slot_id());
if (_slot_id_to_predicates.end() == entry) {
return false;
Expand Down Expand Up @@ -1173,11 +1123,6 @@ TPushAggOp::type ScanLocalState<Derived>::get_push_down_agg_type() {
return _parent->cast<typename Derived::Parent>()._push_down_agg_type;
}

template <typename Derived>
int64_t ScanLocalState<Derived>::get_push_down_count() {
return _parent->cast<typename Derived::Parent>()._push_down_count;
}

template <typename Derived>
int64_t ScanLocalState<Derived>::limit_per_scanner() {
return _parent->cast<typename Derived::Parent>()._limit_per_scanner;
Expand Down Expand Up @@ -1386,9 +1331,19 @@ Status ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
continue;
}

int cid = -1;
if (state->get_query_ctx()->get_runtime_predicate(id).target_is_slot(node_id())) {
auto s = _slot_id_to_slot_desc[state->get_query_ctx()
->get_runtime_predicate(id)
.get_texpr(node_id())
.nodes[0]
.slot_ref.slot_id];
DCHECK(s != nullptr);
auto col_name = s->col_name();
cid = get_column_id(col_name);
}
RETURN_IF_ERROR(state->get_query_ctx()->get_runtime_predicate(id).init_target(
node_id(), _slot_id_to_slot_desc,
OperatorX<LocalStateType>::intermediate_row_desc()));
node_id(), _slot_id_to_slot_desc, cid));
}

RETURN_IF_CANCELLED(state);
Expand Down
Loading
Loading