Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2058,7 +2058,7 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
block, columns_to_filter, (*_delete_rows_filter_ptr)));
} else {
} else if (_position_delete_ordered_rowids != nullptr) {
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
_execute_filter_position_delete_rowids(*filter);
RETURN_IF_CATCH_EXCEPTION(
Expand Down
83 changes: 64 additions & 19 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,39 @@ Status VFileScanner::_process_runtime_filters_partition_prune(bool& can_filter_a
size_t partition_value_column_size = 1;

// 1. Get partition key values to string columns.
std::unordered_map<SlotId, MutableColumnPtr> parititon_slot_id_to_column;
std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde();
auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column();
auto data_type = partition_slot_desc->get_data_type_ptr();
auto test_serde = data_type->get_serde();
auto partition_value_column = data_type->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
uint64_t num_deserialized = 0;
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, {}));
parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
DataTypeSerDe::FormatOptions options {};
if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
// for iceberg/paimon table
// NOTICE: column is always be nullable for iceberg/paimon table now
DCHECK(data_type->is_nullable());
test_serde = test_serde->get_nested_serdes()[0];
auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
if (_partition_value_is_null[partition_slot_desc->col_name()]) {
null_column->insert_many_defaults(partition_value_column_size);
} else {
// If the partition value is not null, we set null map to 0 and deserialize it normally.
null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
null_column->get_nested_column(), slice, partition_value_column_size,
&num_deserialized, options));
}
} else {
// for hive/hudi table, the null value is set as "\\N"
// TODO: this will be unified as iceberg/paimon table in the future
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, options));
}

partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}

// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
Expand All @@ -268,10 +290,10 @@ Status VFileScanner::_process_runtime_filters_partition_prune(bool& can_filter_a
// should be ignored from reading
continue;
}
if (parititon_slot_id_to_column.find(slot_desc->id()) !=
parititon_slot_id_to_column.end()) {
if (partition_slot_id_to_column.find(slot_desc->id()) !=
partition_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
Expand Down Expand Up @@ -598,6 +620,9 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
}

Status VFileScanner::_fill_columns_from_path(size_t rows) {
if (!_fill_partition_from_path) {
return Status::OK();
}
DataTypeSerDe::FormatOptions _text_formatOptions;
for (auto& kv : _partition_col_descs) {
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
Expand Down Expand Up @@ -892,7 +917,7 @@ Status VFileScanner::_get_next_reader() {

if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_parititon_columns());
RETURN_IF_ERROR(_generate_partition_columns());

if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
Expand Down Expand Up @@ -1283,7 +1308,13 @@ Status VFileScanner::_get_next_reader() {
}

RETURN_IF_ERROR(_generate_missing_columns());
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
if (_fill_partition_from_path) {
RETURN_IF_ERROR(
_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
} else {
// If the partition columns are not from path, we only fill the missing columns.
RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs));
}
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
Expand Down Expand Up @@ -1315,8 +1346,9 @@ Status VFileScanner::_get_next_reader() {
return Status::OK();
}

Status VFileScanner::_generate_parititon_columns() {
Status VFileScanner::_generate_partition_columns() {
_partition_col_descs.clear();
_partition_value_is_null.clear();
const TFileRangeDesc& range = _current_range;
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
Expand All @@ -1327,13 +1359,12 @@ Status VFileScanner::_generate_parititon_columns() {
slot_desc->id());
}
const std::string& column_from_path = range.columns_from_path[it->second];
const char* data = column_from_path.c_str();
size_t size = column_from_path.size();
if (size == 4 && memcmp(data, "null", 4) == 0) {
data = const_cast<char*>("\\N");
}
_partition_col_descs.emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
std::make_tuple(column_from_path, slot_desc));
if (range.__isset.columns_from_path_is_null) {
_partition_value_is_null.emplace(slot_desc->col_name(),
range.columns_from_path_is_null[it->second]);
}
}
}
}
Expand Down Expand Up @@ -1399,7 +1430,21 @@ Status VFileScanner::_init_expr_ctxes() {
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
} else {
}

if (partition_name_to_key_index_map.find(it->second->col_name()) !=
partition_name_to_key_index_map.end()) {
if (slot_info.is_file_slot) {
// If there is slot which is both a partition column and a file column,
// we should not fill the partition column from path.
_fill_partition_from_path = false;
} else if (!_fill_partition_from_path) {
// This should not happen
return Status::InternalError(
"Partition column {} is not a file column, but there is already a column "
"which is both a partition column and a file column.",
it->second->col_name());
}
_partition_slot_descs.emplace_back(it->second);
if (_is_load) {
auto iti = full_src_index_map.find(slot_id);
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ class VFileScanner : public VScanner {
std::unique_ptr<io::FileReaderStats> _file_reader_stats;
std::unique_ptr<io::IOContext> _io_ctx;

// Whether to fill partition columns from path, default is true.
bool _fill_partition_from_path = true;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_col_descs;
std::unordered_map<std::string, bool> _partition_value_is_null;
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;

// idx of skip_bitmap_col in _input_tuple_desc
Expand Down Expand Up @@ -220,7 +223,7 @@ class VFileScanner : public VScanner {
Status _convert_to_output_block(Block* block);
Status _truncate_char_or_varchar_columns(Block* block);
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_parititon_columns();
Status _generate_partition_columns();
Status _generate_missing_columns();
bool _check_partition_prune_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_prune_ctxs();
Expand Down
Loading
Loading