Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
747b120
intro data_lake_partition_values
suxiaogang223 Jul 10, 2025
902fc81
get partitionValues
suxiaogang223 Jul 15, 2025
82911e2
impl be
suxiaogang223 Jul 16, 2025
4639b3b
parse partition value
suxiaogang223 Jul 16, 2025
cafde45
fix
suxiaogang223 Jul 16, 2025
ebe0767
fix bug
suxiaogang223 Jul 16, 2025
07cb25c
impl rf paritition prune for paimon
suxiaogang223 Jul 17, 2025
6143731
impl rf partition prune for hudi
suxiaogang223 Jul 17, 2025
13229a8
impl test_iceberg_partition_filter_partition_pruning
suxiaogang223 Jul 17, 2025
13df2e3
fix
suxiaogang223 Jul 17, 2025
a3f3334
impl test_paimon_runtime_filter_partition_pruning
suxiaogang223 Jul 17, 2025
8920afd
flush paimon case
suxiaogang223 Jul 21, 2025
ee59ecd
add test_hudi_runtime_filter_partition_pruning
suxiaogang223 Jul 21, 2025
3f23745
fix iceberg
suxiaogang223 Jul 21, 2025
b2cdade
flush case
suxiaogang223 Jul 21, 2025
dbc7c54
fix paimon
suxiaogang223 Jul 21, 2025
3d69f15
flush out
suxiaogang223 Jul 21, 2025
4a7f5a2
fix timestamp with timezone problem
suxiaogang223 Jul 22, 2025
460c647
fix time
suxiaogang223 Jul 22, 2025
61b5a0a
fix case
suxiaogang223 Jul 22, 2025
be39221
flush paimon case
suxiaogang223 Jul 22, 2025
3b6e49a
flush iceberg cases
suxiaogang223 Jul 22, 2025
ecf4c9c
fix iceberg timestamp with time zone
suxiaogang223 Jul 22, 2025
e7be73d
fix binary as partition key
suxiaogang223 Jul 22, 2025
0279928
fix hudi
suxiaogang223 Jul 22, 2025
77fa6bc
hudi p2
suxiaogang223 Jul 22, 2025
0e09047
cache partitionMapInfos
suxiaogang223 Jul 24, 2025
b2fa196
fix
suxiaogang223 Jul 25, 2025
204f799
fix hudi case
suxiaogang223 Jul 29, 2025
6f2ad12
fix be
suxiaogang223 Jul 31, 2025
4c5199c
fix be
suxiaogang223 Jul 31, 2025
f81629a
fix fe
suxiaogang223 Jul 31, 2025
917b8b9
fix hudi
suxiaogang223 Jul 31, 2025
a80d81d
intro _fill_partition_from_path
suxiaogang223 Jul 31, 2025
f262168
fix build
suxiaogang223 Jul 31, 2025
d2439b5
fix cases
suxiaogang223 Jul 31, 2025
bacf778
fix bug about null partition
suxiaogang223 Aug 1, 2025
92ed4cd
add columns_from_path_is_null
suxiaogang223 Aug 4, 2025
4e58a45
fix be
suxiaogang223 Aug 4, 2025
82acb70
fix fe
suxiaogang223 Aug 4, 2025
9aa46a5
fix and add regression
suxiaogang223 Aug 4, 2025
f0b595b
fix regression
suxiaogang223 Aug 5, 2025
62ccb78
fix regression
suxiaogang223 Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions be/src/vec/exec/scan/file_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,39 @@ Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_al
size_t partition_value_column_size = 1;

// 1. Get partition key values to string columns.
std::unordered_map<SlotId, MutableColumnPtr> parititon_slot_id_to_column;
std::unordered_map<SlotId, MutableColumnPtr> partition_slot_id_to_column;
for (auto const& partition_col_desc : _partition_col_descs) {
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde();
auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column();
auto data_type = partition_slot_desc->get_data_type_ptr();
auto test_serde = data_type->get_serde();
auto partition_value_column = data_type->create_column();
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
Slice slice(partition_value.data(), partition_value.size());
uint64_t num_deserialized = 0;
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, {}));
parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
DataTypeSerDe::FormatOptions options {};
if (_partition_value_is_null.contains(partition_slot_desc->col_name())) {
// for iceberg/paimon table
// NOTICE: column is always be nullable for iceberg/paimon table now
DCHECK(data_type->is_nullable());
test_serde = test_serde->get_nested_serdes()[0];
auto* null_column = assert_cast<ColumnNullable*>(col_ptr);
if (_partition_value_is_null[partition_slot_desc->col_name()]) {
null_column->insert_many_defaults(partition_value_column_size);
} else {
// If the partition value is not null, we set null map to 0 and deserialize it normally.
null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size);
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
null_column->get_nested_column(), slice, partition_value_column_size,
&num_deserialized, options));
}
} else {
// for hive/hudi table, the null value is set as "\\N"
// TODO: this will be unified as iceberg/paimon table in the future
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
*col_ptr, slice, partition_value_column_size, &num_deserialized, options));
}

partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
}

// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
Expand All @@ -272,10 +294,10 @@ Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_al
// should be ignored from reading
continue;
}
if (parititon_slot_id_to_column.find(slot_desc->id()) !=
parititon_slot_id_to_column.end()) {
if (partition_slot_id_to_column.find(slot_desc->id()) !=
partition_slot_id_to_column.end()) {
auto data_type = slot_desc->get_data_type_ptr();
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]);
if (data_type->is_nullable()) {
_runtime_filter_partition_prune_block.insert(
index, ColumnWithTypeAndName(
Expand Down Expand Up @@ -602,6 +624,9 @@ Status FileScanner::_cast_to_input_block(Block* block) {
}

Status FileScanner::_fill_columns_from_path(size_t rows) {
if (!_fill_partition_from_path) {
return Status::OK();
}
DataTypeSerDe::FormatOptions _text_formatOptions;
for (auto& kv : _partition_col_descs) {
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
Expand Down Expand Up @@ -915,7 +940,7 @@ Status FileScanner::_get_next_reader() {

if (!_partition_slot_descs.empty()) {
// we need get partition columns first for runtime filter partition pruning
RETURN_IF_ERROR(_generate_parititon_columns());
RETURN_IF_ERROR(_generate_partition_columns());

if (_state->query_options().enable_runtime_filter_partition_prune) {
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
Expand Down Expand Up @@ -1332,7 +1357,12 @@ Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema
}

RETURN_IF_ERROR(_generate_missing_columns());
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
if (_fill_partition_from_path) {
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
} else {
// If the partition columns are not from path, we only fill the missing columns.
RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs));
}
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
fmt::memory_buffer col_buf;
for (auto& col : _missing_cols) {
Expand Down Expand Up @@ -1393,7 +1423,7 @@ Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
const ExternalFileMappingInfo& external_info,
int64_t* init_reader_ms, int64_t* get_block_ms) {
_current_range = range;
RETURN_IF_ERROR(_generate_parititon_columns());
RETURN_IF_ERROR(_generate_partition_columns());

TFileFormatType::type format_type = _get_current_format_type();
Status init_status = Status::OK();
Expand Down Expand Up @@ -1455,8 +1485,9 @@ Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
return Status::OK();
}

Status FileScanner::_generate_parititon_columns() {
Status FileScanner::_generate_partition_columns() {
_partition_col_descs.clear();
_partition_value_is_null.clear();
const TFileRangeDesc& range = _current_range;
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
Expand All @@ -1467,13 +1498,12 @@ Status FileScanner::_generate_parititon_columns() {
slot_desc->id());
}
const std::string& column_from_path = range.columns_from_path[it->second];
const char* data = column_from_path.c_str();
size_t size = column_from_path.size();
if (size == 4 && memcmp(data, "null", 4) == 0) {
data = const_cast<char*>("\\N");
}
_partition_col_descs.emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
std::make_tuple(column_from_path, slot_desc));
if (range.__isset.columns_from_path_is_null) {
_partition_value_is_null.emplace(slot_desc->col_name(),
range.columns_from_path_is_null[it->second]);
}
}
}
}
Expand Down Expand Up @@ -1540,10 +1570,25 @@ Status FileScanner::_init_expr_ctxes() {
_row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id);
continue;
}

if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
} else {
}

if (partition_name_to_key_index_map.find(it->second->col_name()) !=
partition_name_to_key_index_map.end()) {
if (slot_info.is_file_slot) {
// If there is slot which is both a partition column and a file column,
// we should not fill the partition column from path.
_fill_partition_from_path = false;
} else if (!_fill_partition_from_path) {
// This should not happen
return Status::InternalError(
"Partition column {} is not a file column, but there is already a column "
"which is both a partition column and a file column.",
it->second->col_name());
}
_partition_slot_descs.emplace_back(it->second);
if (_is_load) {
auto iti = full_src_index_map.find(slot_id);
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/exec/scan/file_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,11 @@ class FileScanner : public Scanner {
std::unique_ptr<io::FileReaderStats> _file_reader_stats;
std::unique_ptr<io::IOContext> _io_ctx;

// Whether to fill partition columns from path, default is true.
bool _fill_partition_from_path = true;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_col_descs;
std::unordered_map<std::string, bool> _partition_value_is_null;
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;

// idx of skip_bitmap_col in _input_tuple_desc
Expand Down Expand Up @@ -241,7 +244,7 @@ class FileScanner : public Scanner {
Status _convert_to_output_block(Block* block);
Status _truncate_char_or_varchar_columns(Block* block);
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
Status _generate_parititon_columns();
Status _generate_partition_columns();
Status _generate_missing_columns();
bool _check_partition_prune_expr(const VExprSPtr& expr);
void _init_runtime_filter_partition_prune_ctxs();
Expand Down
Loading
Loading