Skip to content

Commit

Permalink
[fix](runtime filter) append late arrival runtime filters in vfilecanner
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Nov 2, 2023
1 parent 3e9e8be commit 874cd01
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 24 deletions.
3 changes: 3 additions & 0 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,9 @@ bool IRuntimeFilter::is_ready_or_timeout() {

void IRuntimeFilter::signal() {
DCHECK(is_consumer());
if (is_ready()) {
LOG(FATAL) << "signal runtime filter " << _filter_id << " twice";
}
if (_enable_pipeline_exec) {
_rf_state_atomic.store(RuntimeFilterState::READY);
if (!_filter_timer.empty()) {
Expand Down
67 changes: 43 additions & 24 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ Status VFileScanner::prepare(
ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
_before_push_down_rows =
ADD_COUNTER(_parent->_scanner_profile, "BeforePushDownRows", TUnit::UNIT);
} else {
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
_open_reader_timer =
Expand All @@ -182,6 +186,10 @@ Status VFileScanner::prepare(
_empty_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
_has_fully_rf_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
_before_push_down_rows =
ADD_COUNTER(_local_state->scanner_profile(), "BeforePushDownRows", TUnit::UNIT);
}

_file_cache_statistics.reset(new io::FileCacheStatistics());
Expand Down Expand Up @@ -222,7 +230,9 @@ Status VFileScanner::prepare(
}

Status VFileScanner::_process_conjuncts_for_dict_filter() {
for (auto& conjunct : _conjuncts) {
_slot_id_to_filter_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
for (auto& conjunct : _push_down_conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto cur_expr = impl ? impl : conjunct->root();
Expand Down Expand Up @@ -250,6 +260,22 @@ Status VFileScanner::_process_conjuncts_for_dict_filter() {
return Status::OK();
}

Status VFileScanner::_late_arrival_push_down_conjuncts() {
if (_push_down_conjuncts.size() < _conjuncts.size()) {
_push_down_conjuncts.clear();
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
}
return _process_conjuncts_for_dict_filter();
_discard_conjuncts();
}
if (_applied_rf_num == _total_rf_num) {
COUNTER_UPDATE(_has_fully_rf_file_counter, 1);
}
return Status::OK();
}

void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
for (auto& child_expr : expr->children()) {
if (child_expr->is_slot_ref()) {
Expand Down Expand Up @@ -290,6 +316,7 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo
do {
if (_cur_reader == nullptr || _cur_reader_eof) {
RETURN_IF_ERROR(_get_next_reader());
_file_rows = 0;
}

if (_scanner_eof) {
Expand All @@ -308,6 +335,14 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo
// Some of column in block may not be filled (column not exist in file)
RETURN_IF_ERROR(
_cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof));
read_rows = _src_block_ptr->rows();
COUNTER_UPDATE(_before_push_down_rows, read_rows);
_file_rows += read_rows;
if (_cur_reader_eof) {
const TFileRangeDesc& range = _ranges[_next_range - 1];
LOG(WARNING) << range.path << " has runtime filters(" << _applied_rf_num << "/"
<< _total_rf_num << "), and output rows=" << _file_rows;
}
}
if (_params->format_type == TFileFormatType::FORMAT_WAL) {
block->swap(*_src_block_ptr);
Expand Down Expand Up @@ -340,6 +375,9 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo
}
} while (true);

RETURN_IF_ERROR(VExprContext::filter_block(_push_down_conjuncts, _src_block_ptr,
_src_block_ptr->columns()));

// Update filtered rows and unselected rows for load, reset counter.
// {
// state->update_num_rows_load_filtered(_counter.num_rows_filtered);
Expand Down Expand Up @@ -719,13 +757,6 @@ Status VFileScanner::_get_next_reader() {
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
if (range.table_format_params.hudi_params.delta_logs.empty()) {
// fall back to native reader if there is no log file
format_type = TFileFormatType::FORMAT_PARQUET;
}
}
bool need_to_get_parsed_schema = false;
switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
Expand Down Expand Up @@ -766,12 +797,8 @@ Status VFileScanner::_get_next_reader() {
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
}
if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
}
_discard_conjuncts();
if (push_down_predicates) {
RETURN_IF_ERROR(_late_arrival_push_down_conjuncts());
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
Expand Down Expand Up @@ -802,12 +829,8 @@ Status VFileScanner::_get_next_reader() {
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
}
_discard_conjuncts();
if (push_down_predicates) {
RETURN_IF_ERROR(_late_arrival_push_down_conjuncts());
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "transactional_hive") {
Expand Down Expand Up @@ -1080,10 +1103,6 @@ Status VFileScanner::_init_expr_ctxes() {
}
}
}
// TODO: It should can move to scan node to process.
if (!_conjuncts.empty()) {
static_cast<void>(_process_conjuncts_for_dict_filter());
}
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ class VFileScanner : public VScanner {
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
RuntimeProfile::Counter* _before_push_down_rows = nullptr;
size_t _file_rows = 0;

const std::unordered_map<std::string, int>* _col_name_to_slot_id;
// single slot filter conjuncts
Expand Down Expand Up @@ -206,6 +209,7 @@ class VFileScanner : public VScanner {
Status _generate_fill_columns();
Status _handle_dynamic_block(Block* block);
Status _process_conjuncts_for_dict_filter();
Status _late_arrival_push_down_conjuncts();
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);

void _reset_counter() {
Expand Down

0 comments on commit 874cd01

Please sign in to comment.