From a53ca1c2d019dc29ba40fb1271c6e07b54b90db5 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Tue, 7 Nov 2023 09:50:35 +0800 Subject: [PATCH] [fix](runtime filter) append late arrival runtime filters in vfilecanner (#25996) `VFileScanner` will try to append late arrival runtime filters in each loop of `ScannerScheduler::_scanner_scan`. However, `VFileScanner::_get_next_reader` only generates the `_push_down_conjuncts` in the first loop, so the late arrival runtime filters are ignored. --- be/src/vec/exec/scan/vfile_scanner.cpp | 44 ++++++++++++++++---------- be/src/vec/exec/scan/vfile_scanner.h | 2 ++ 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 35e1d3dff53f68..8eda9c1714b3f8 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -166,6 +166,8 @@ Status VFileScanner::prepare( ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime"); _empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT); _file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT); + _has_fully_rf_file_counter = + ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT); } else { _get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime"); _open_reader_timer = @@ -182,6 +184,8 @@ Status VFileScanner::prepare( _empty_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT); _file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT); + _has_fully_rf_file_counter = + ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT); } _file_cache_statistics.reset(new io::FileCacheStatistics()); @@ -222,7 +226,9 @@ Status VFileScanner::prepare( } Status VFileScanner::_process_conjuncts_for_dict_filter() { - for (auto& conjunct : _conjuncts) { + _slot_id_to_filter_conjuncts.clear(); + _not_single_slot_filter_conjuncts.clear(); + for (auto& conjunct : _push_down_conjuncts) { auto impl = conjunct->root()->get_impl(); // If impl is not null, which means this a conjuncts from runtime filter. auto cur_expr = impl ? impl : conjunct->root(); @@ -250,6 +256,22 @@ Status VFileScanner::_process_conjuncts_for_dict_filter() { return Status::OK(); } +Status VFileScanner::_process_late_arrival_conjuncts() { + if (_push_down_conjuncts.size() < _conjuncts.size()) { + _push_down_conjuncts.clear(); + _push_down_conjuncts.resize(_conjuncts.size()); + for (size_t i = 0; i != _conjuncts.size(); ++i) { + RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i])); + } + RETURN_IF_ERROR(_process_conjuncts_for_dict_filter()); + _discard_conjuncts(); + } + if (_applied_rf_num == _total_rf_num) { + COUNTER_UPDATE(_has_fully_rf_file_counter, 1); + } + return Status::OK(); +} + void VFileScanner::_get_slot_ids(VExpr* expr, std::vector* slot_ids) { for (auto& child_expr : expr->children()) { if (child_expr->is_slot_ref()) { @@ -766,12 +788,8 @@ Status VFileScanner::_get_next_reader() { SCOPED_TIMER(_open_reader_timer); RETURN_IF_ERROR(parquet_reader->open()); } - if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) { - _push_down_conjuncts.resize(_conjuncts.size()); - for (size_t i = 0; i != _conjuncts.size(); ++i) { - RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i])); - } - _discard_conjuncts(); + if (push_down_predicates) { + RETURN_IF_ERROR(_process_late_arrival_conjuncts()); } if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { @@ -802,12 +820,8 @@ Status VFileScanner::_get_next_reader() { std::unique_ptr orc_reader = OrcReader::create_unique( _profile, _state, *_params, range, _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat); - if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) { - _push_down_conjuncts.resize(_conjuncts.size()); - for (size_t i = 0; i != _conjuncts.size(); ++i) { - RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i])); - } - _discard_conjuncts(); + if (push_down_predicates) { + RETURN_IF_ERROR(_process_late_arrival_conjuncts()); } if (range.__isset.table_format_params && range.table_format_params.table_format_type == "transactional_hive") { @@ -1080,10 +1094,6 @@ Status VFileScanner::_init_expr_ctxes() { } } } - // TODO: It should can move to scan node to process. - if (!_conjuncts.empty()) { - static_cast(_process_conjuncts_for_dict_filter()); - } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index b7ceefe775cbce..4524abb1fdbd7b 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -176,6 +176,7 @@ class VFileScanner : public VScanner { RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; RuntimeProfile::Counter* _empty_file_counter = nullptr; RuntimeProfile::Counter* _file_counter = nullptr; + RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr; const std::unordered_map* _col_name_to_slot_id; // single slot filter conjuncts @@ -206,6 +207,7 @@ class VFileScanner : public VScanner { Status _generate_fill_columns(); Status _handle_dynamic_block(Block* block); Status _process_conjuncts_for_dict_filter(); + Status _process_late_arrival_conjuncts(); void _get_slot_ids(VExpr* expr, std::vector* slot_ids); void _reset_counter() {