From 7d5e837cbee071f5f2afa89916cb19551ec050b7 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Tue, 7 Nov 2023 09:50:35 +0800 Subject: [PATCH] [fix](runtime filter) append late arrival runtime filters in vfilecanner (#25996) `VFileScanner` will try to append late arrival runtime filters in each loop of `ScannerScheduler::_scanner_scan`. However, `VFileScanner::_get_next_reader` only generates the `_push_down_conjuncts` in the first loop, so the late arrival runtime filters are ignored. --- be/src/vec/exec/scan/vfile_scanner.cpp | 42 +++++++++++++++----------- be/src/vec/exec/scan/vfile_scanner.h | 2 ++ 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index af723021ca0bd3..7f35422ec030a1 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -136,6 +136,8 @@ Status VFileScanner::prepare( ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime"); _empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT); _file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT); + _has_fully_rf_file_counter = + ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT); _file_cache_statistics.reset(new io::FileCacheStatistics()); _io_ctx.reset(new io::IOContext()); @@ -175,7 +177,9 @@ Status VFileScanner::prepare( } Status VFileScanner::_process_conjuncts_for_dict_filter() { - for (auto& conjunct : _conjuncts) { + _slot_id_to_filter_conjuncts.clear(); + _not_single_slot_filter_conjuncts.clear(); + for (auto& conjunct : _push_down_conjuncts) { auto impl = conjunct->root()->get_impl(); // If impl is not null, which means this a conjuncts from runtime filter. auto cur_expr = impl ? impl : conjunct->root(); @@ -203,6 +207,22 @@ Status VFileScanner::_process_conjuncts_for_dict_filter() { return Status::OK(); } +Status VFileScanner::_process_late_arrival_conjuncts() { + if (_push_down_conjuncts.size() < _conjuncts.size()) { + _push_down_conjuncts.clear(); + _push_down_conjuncts.resize(_conjuncts.size()); + for (size_t i = 0; i != _conjuncts.size(); ++i) { + RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i])); + } + RETURN_IF_ERROR(_process_conjuncts_for_dict_filter()); + _discard_conjuncts(); + } + if (_applied_rf_num == _total_rf_num) { + COUNTER_UPDATE(_has_fully_rf_file_counter, 1); + } + return Status::OK(); +} + void VFileScanner::_get_slot_ids(VExpr* expr, std::vector* slot_ids) { for (auto& child_expr : expr->children()) { if (child_expr->is_slot_ref()) { @@ -703,12 +723,8 @@ Status VFileScanner::_get_next_reader() { SCOPED_TIMER(_open_reader_timer); RETURN_IF_ERROR(parquet_reader->open()); } - if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) { - _push_down_conjuncts.resize(_conjuncts.size()); - for (size_t i = 0; i != _conjuncts.size(); ++i) { - RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i])); - } - _discard_conjuncts(); + if (push_down_predicates) { + RETURN_IF_ERROR(_process_late_arrival_conjuncts()); } if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { @@ -739,12 +755,8 @@ Status VFileScanner::_get_next_reader() { std::unique_ptr orc_reader = OrcReader::create_unique( _profile, _state, *_params, range, _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat); - if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) { - _push_down_conjuncts.resize(_conjuncts.size()); - for (size_t i = 0; i != _conjuncts.size(); ++i) { - RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i])); - } - _discard_conjuncts(); + if (push_down_predicates) { + RETURN_IF_ERROR(_process_late_arrival_conjuncts()); } if (range.__isset.table_format_params && range.table_format_params.table_format_type == "transactional_hive") { @@ -1017,10 +1029,6 @@ Status VFileScanner::_init_expr_ctxes() { _is_dynamic_schema = _output_tuple_desc && _output_tuple_desc->slots().back()->type().is_variant_type(); - // TODO: It should can move to scan node to process. - if (!_conjuncts.empty()) { - _process_conjuncts_for_dict_filter(); - } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index b2781c35c551e0..3611785625d338 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -176,6 +176,7 @@ class VFileScanner : public VScanner { RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr; RuntimeProfile::Counter* _empty_file_counter = nullptr; RuntimeProfile::Counter* _file_counter = nullptr; + RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr; const std::unordered_map* _col_name_to_slot_id; // single slot filter conjuncts @@ -206,6 +207,7 @@ class VFileScanner : public VScanner { Status _generate_fill_columns(); Status _handle_dynamic_block(Block* block); Status _process_conjuncts_for_dict_filter(); + Status _process_late_arrival_conjuncts(); void _get_slot_ids(VExpr* expr, std::vector* slot_ids); void _reset_counter() {