Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ void Block::initialize_index_by_name() {
void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
if (position > data.size()) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"invalid input position, position={}, data.size{}, names={}", position,
"invalid input position, position={}, data.size={}, names={}", position,
data.size(), dump_names());
}

Expand All @@ -164,7 +164,7 @@ void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
void Block::insert(size_t position, ColumnWithTypeAndName&& elem) {
if (position > data.size()) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"invalid input position, position={}, data.size{}, names={}", position,
"invalid input position, position={}, data.size={}, names={}", position,
data.size(), dump_names());
}

Expand Down
162 changes: 156 additions & 6 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@
#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <iterator>
#include <map>
#include <ranges>
#include <tuple>
#include <unordered_map>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block_file_cache_profile.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/runtime_profile.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -67,6 +74,7 @@
#include "vec/exec/scan/vscan_node.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/function.h"
#include "vec/functions/function_string.h"
Expand Down Expand Up @@ -130,12 +138,17 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerPreFilterTimer", 1);
_convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(),
"FileScannerConvertOuputBlockTime", 1);
_runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
_empty_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT, 1);
_not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"NotFoundFileNum", TUnit::UNIT, 1);
_file_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
_runtime_filter_partition_pruned_range_counter =
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
"RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);

_file_cache_statistics.reset(new io::FileCacheStatistics());
_io_ctx.reset(new io::IOContext());
Expand Down Expand Up @@ -174,6 +187,113 @@ Status VFileScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conju
return Status::OK();
}

// check if the expr is a partition pruning expr
bool VFileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
if (expr->is_slot_ref()) {
auto* slot_ref = static_cast<VSlotRef*>(expr.get());
return _partition_slot_index_map.find(slot_ref->slot_id()) !=
_partition_slot_index_map.end();
}
if (expr->is_literal()) {
return true;
}
return std::ranges::all_of(expr->children(), [this](const auto& child) {
return _check_partition_prune_expr(child);
});
}

void VFileScanner::_init_runtime_filter_partition_prune_ctxs() {
_runtime_filter_partition_prune_ctxs.clear();
for (auto& conjunct : _conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime filter.
auto expr = impl ? impl : conjunct->root();
if (_check_partition_prune_expr(expr)) {
_runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
}
}
}

void VFileScanner::_init_runtime_filter_partition_prune_block() {
// init block with empty column
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
_runtime_filter_partition_prune_block.insert(
ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}
}

Status VFileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
SCOPED_TIMER(_runtime_filter_partition_prune_timer);
if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
return Status::OK();
}
size_t partition_value_column_size = 1;

// 1. Get partition key values to string columns.
std::unordered_map<SlotId, MutableColumnPtr> parititon_slot_id_to_column;
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde();
auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
int num_deserialized = 0;
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, {}));
parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}

// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
// 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
size_t index = 0;
bool first_column_filled = false;
for (auto const* slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
if (parititon_slot_id_to_column.find(slot_desc->id()) !=
parititon_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
ColumnNullable::create(
std::move(partition_value_column),
ColumnUInt8::create(partition_value_column_size, 0)),
data_type, slot_desc->col_name()));
} else {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
slot_desc->col_name()));
}
if (index == 0) {
first_column_filled = true;
}
}
index++;
}

// 2.2 Execute conjuncts.
if (!first_column_filled) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
_runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
partition_value_column_size);
}
IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
&_runtime_filter_partition_prune_block,
&result_filter, &can_filter_all));
return Status::OK();
}

Status VFileScanner::_process_conjuncts_for_dict_filter() {
_slot_id_to_filter_conjuncts.clear();
_not_single_slot_filter_conjuncts.clear();
Expand Down Expand Up @@ -237,6 +357,11 @@ Status VFileScanner::open(RuntimeState* state) {
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
if (_first_scan_range) {
RETURN_IF_ERROR(_init_expr_ctxes());
if (_state->query_options().enable_runtime_filter_partition_prune &&
!_partition_slot_index_map.empty()) {
_init_runtime_filter_partition_prune_ctxs();
_init_runtime_filter_partition_prune_block();
}
} else {
// there's no scan range in split source. stop scanner directly.
_scanner_eof = true;
Expand Down Expand Up @@ -716,6 +841,29 @@ Status VFileScanner::_get_next_reader() {
const TFileRangeDesc& range = _current_range;
_current_range_path = range.path;

if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_parititon_columns());

if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
// by runtime filter partition prune
if (_push_down_conjuncts.size() < _conjuncts.size()) {
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
_init_runtime_filter_partition_prune_ctxs();
}

bool can_filter_all = false;
RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
if (can_filter_all) {
// this range can be filtered out by runtime filter partition pruning
// so we need to skip this range
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
continue;
}
}
}

// create reader for specific format
Status init_status;
// for compatibility, if format_type is not set in range, use the format type of params
Expand Down Expand Up @@ -976,7 +1124,8 @@ Status VFileScanner::_get_next_reader() {
_missing_cols.clear();
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
RETURN_IF_ERROR(_generate_fill_columns());
RETURN_IF_ERROR(_generate_missing_columns());
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
Expand Down Expand Up @@ -1006,10 +1155,8 @@ Status VFileScanner::_get_next_reader() {
return Status::OK();
}

Status VFileScanner::_generate_fill_columns() {
Status VFileScanner::_generate_parititon_columns() {
_partition_col_descs.clear();
_missing_col_descs.clear();

const TFileRangeDesc& range = _current_range;
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
Expand All @@ -1030,7 +1177,11 @@ Status VFileScanner::_generate_fill_columns() {
}
}
}
return Status::OK();
}

Status VFileScanner::_generate_missing_columns() {
_missing_col_descs.clear();
if (!_missing_cols.empty()) {
for (auto slot_desc : _real_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
Expand All @@ -1048,8 +1199,7 @@ Status VFileScanner::_generate_fill_columns() {
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}

return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
return Status::OK();
}

Status VFileScanner::_init_expr_ctxes() {
Expand Down
12 changes: 11 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "vec/core/block.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/scan/vscanner.h"
#include "vec/exprs/vexpr_fwd.h"

namespace doris {
class RuntimeState;
Expand Down Expand Up @@ -160,6 +161,8 @@ class VFileScanner : public VScanner {
Block _src_block;

VExprContextSPtrs _push_down_conjuncts;
VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
Block _runtime_filter_partition_prune_block;

std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;
Expand All @@ -175,9 +178,11 @@ class VFileScanner : public VScanner {
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;

const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
// single slot filter conjuncts
Expand Down Expand Up @@ -205,7 +210,12 @@ class VFileScanner : public VScanner {
Status _convert_to_output_block(Block* block);
Status _truncate_char_or_varchar_columns(Block* block);
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_fill_columns();
Status _generate_parititon_columns();
Status _generate_missing_columns();
bool _check_partition_prune_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_prune_ctxs();
void _init_runtime_filter_partition_prune_block();
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
Status _process_conjuncts_for_dict_filter();
Status _process_late_arrival_conjuncts();
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_RUNTIME_FILTER_PRUNE =
"enable_runtime_filter_prune";

public static final String ENABLE_RUNTIME_FILTER_PARTITION_PRUNE =
"enable_runtime_filter_partition_prune";

static final String SESSION_CONTEXT = "session_context";

public static final String DEFAULT_ORDER_BY_LIMIT = "default_order_by_limit";
Expand Down Expand Up @@ -1500,6 +1503,9 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
@VariableMgr.VarAttr(name = ENABLE_RUNTIME_FILTER_PRUNE, needForward = true)
public boolean enableRuntimeFilterPrune = true;

@VariableMgr.VarAttr(name = ENABLE_RUNTIME_FILTER_PARTITION_PRUNE, needForward = true, fuzzy = true)
public boolean enableRuntimeFilterPartitionPrune = true;

/**
* The client can pass some special information by setting this session variable in the format: "k1:v1;k2:v2".
* For example, trace_id can be passed to trace the query request sent by the user.
Expand Down Expand Up @@ -3795,6 +3801,14 @@ public void setEnableRuntimeFilterPrune(boolean enableRuntimeFilterPrune) {
this.enableRuntimeFilterPrune = enableRuntimeFilterPrune;
}

public boolean isEnableRuntimeFilterPartitionPrune() {
return enableRuntimeFilterPartitionPrune;
}

public void setEnableRuntimeFilterPartitionPrune(boolean enableRuntimeFilterPartitionPrune) {
this.enableRuntimeFilterPartitionPrune = enableRuntimeFilterPartitionPrune;
}

public void setFragmentTransmissionCompressionCodec(String codec) {
this.fragmentTransmissionCompressionCodec = codec;
}
Expand Down Expand Up @@ -4142,7 +4156,7 @@ public TQueryOptions toThrift() {
tResult.setIgnoreRuntimeFilterError(ignoreRuntimeFilterError);

tResult.setNewIsIpAddressInRange(newIsIpAddressInRange);

tResult.setEnableRuntimeFilterPartitionPrune(enableRuntimeFilterPartitionPrune);
return tResult;
}

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ struct TQueryOptions {

146: optional bool fuzzy_disable_runtime_filter_in_be = false;

150: optional bool enable_runtime_filter_partition_prune = true;

163: optional bool inverted_index_compatible_read = false
// upgrade options. keep them same in every branch.
200: optional bool new_is_ip_address_in_range = false;
Expand Down
Loading
Loading