diff --git a/be/src/vec/exec/scan/file_scanner.cpp b/be/src/vec/exec/scan/file_scanner.cpp index 715a71974d0e59..a21243f3d1dfd5 100644 --- a/be/src/vec/exec/scan/file_scanner.cpp +++ b/be/src/vec/exec/scan/file_scanner.cpp @@ -250,17 +250,39 @@ Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_al size_t partition_value_column_size = 1; // 1. Get partition key values to string columns. - std::unordered_map parititon_slot_id_to_column; + std::unordered_map partition_slot_id_to_column; for (auto const& partition_col_desc : _partition_col_descs) { const auto& [partition_value, partition_slot_desc] = partition_col_desc.second; - auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde(); - auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column(); + auto data_type = partition_slot_desc->get_data_type_ptr(); + auto test_serde = data_type->get_serde(); + auto partition_value_column = data_type->create_column(); auto* col_ptr = static_cast(partition_value_column.get()); Slice slice(partition_value.data(), partition_value.size()); uint64_t num_deserialized = 0; - RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json( - *col_ptr, slice, partition_value_column_size, &num_deserialized, {})); - parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column); + DataTypeSerDe::FormatOptions options {}; + if (_partition_value_is_null.contains(partition_slot_desc->col_name())) { + // for iceberg/paimon table + // NOTICE: column is always be nullable for iceberg/paimon table now + DCHECK(data_type->is_nullable()); + test_serde = test_serde->get_nested_serdes()[0]; + auto* null_column = assert_cast(col_ptr); + if (_partition_value_is_null[partition_slot_desc->col_name()]) { + null_column->insert_many_defaults(partition_value_column_size); + } else { + // If the partition value is not null, we set null map to 0 and deserialize it normally. + null_column->get_null_map_column().insert_many_vals(0, partition_value_column_size); + RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json( + null_column->get_nested_column(), slice, partition_value_column_size, + &num_deserialized, options)); + } + } else { + // for hive/hudi table, the null value is set as "\\N" + // TODO: this will be unified as iceberg/paimon table in the future + RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json( + *col_ptr, slice, partition_value_column_size, &num_deserialized, options)); + } + + partition_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column); } // 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block. @@ -272,10 +294,10 @@ Status FileScanner::_process_runtime_filters_partition_prune(bool& can_filter_al // should be ignored from reading continue; } - if (parititon_slot_id_to_column.find(slot_desc->id()) != - parititon_slot_id_to_column.end()) { + if (partition_slot_id_to_column.find(slot_desc->id()) != + partition_slot_id_to_column.end()) { auto data_type = slot_desc->get_data_type_ptr(); - auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]); + auto partition_value_column = std::move(partition_slot_id_to_column[slot_desc->id()]); if (data_type->is_nullable()) { _runtime_filter_partition_prune_block.insert( index, ColumnWithTypeAndName( @@ -602,6 +624,9 @@ Status FileScanner::_cast_to_input_block(Block* block) { } Status FileScanner::_fill_columns_from_path(size_t rows) { + if (!_fill_partition_from_path) { + return Status::OK(); + } DataTypeSerDe::FormatOptions _text_formatOptions; for (auto& kv : _partition_col_descs) { auto doris_column = _src_block_ptr->get_by_name(kv.first).column; @@ -915,7 +940,7 @@ Status FileScanner::_get_next_reader() { if (!_partition_slot_descs.empty()) { // we need get partition columns first for runtime filter partition pruning - RETURN_IF_ERROR(_generate_parititon_columns()); + RETURN_IF_ERROR(_generate_partition_columns()); if (_state->query_options().enable_runtime_filter_partition_prune) { // if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out @@ -1332,7 +1357,12 @@ Status FileScanner::_set_fill_or_truncate_columns(bool need_to_get_parsed_schema } RETURN_IF_ERROR(_generate_missing_columns()); - RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs)); + if (_fill_partition_from_path) { + RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs)); + } else { + // If the partition columns are not from path, we only fill the missing columns. + RETURN_IF_ERROR(_cur_reader->set_fill_columns({}, _missing_col_descs)); + } if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; for (auto& col : _missing_cols) { @@ -1393,7 +1423,7 @@ Status FileScanner::read_lines_from_range(const TFileRangeDesc& range, const ExternalFileMappingInfo& external_info, int64_t* init_reader_ms, int64_t* get_block_ms) { _current_range = range; - RETURN_IF_ERROR(_generate_parititon_columns()); + RETURN_IF_ERROR(_generate_partition_columns()); TFileFormatType::type format_type = _get_current_format_type(); Status init_status = Status::OK(); @@ -1455,8 +1485,9 @@ Status FileScanner::read_lines_from_range(const TFileRangeDesc& range, return Status::OK(); } -Status FileScanner::_generate_parititon_columns() { +Status FileScanner::_generate_partition_columns() { _partition_col_descs.clear(); + _partition_value_is_null.clear(); const TFileRangeDesc& range = _current_range; if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { for (const auto& slot_desc : _partition_slot_descs) { @@ -1467,13 +1498,12 @@ Status FileScanner::_generate_parititon_columns() { slot_desc->id()); } const std::string& column_from_path = range.columns_from_path[it->second]; - const char* data = column_from_path.c_str(); - size_t size = column_from_path.size(); - if (size == 4 && memcmp(data, "null", 4) == 0) { - data = const_cast("\\N"); - } _partition_col_descs.emplace(slot_desc->col_name(), - std::make_tuple(data, slot_desc)); + std::make_tuple(column_from_path, slot_desc)); + if (range.__isset.columns_from_path_is_null) { + _partition_value_is_null.emplace(slot_desc->col_name(), + range.columns_from_path_is_null[it->second]); + } } } } @@ -1540,10 +1570,25 @@ Status FileScanner::_init_expr_ctxes() { _row_id_column_iterator_pair.second = _default_val_row_desc->get_column_id(slot_id); continue; } + if (slot_info.is_file_slot) { _file_slot_descs.emplace_back(it->second); _file_col_names.push_back(it->second->col_name()); - } else { + } + + if (partition_name_to_key_index_map.find(it->second->col_name()) != + partition_name_to_key_index_map.end()) { + if (slot_info.is_file_slot) { + // If there is slot which is both a partition column and a file column, + // we should not fill the partition column from path. + _fill_partition_from_path = false; + } else if (!_fill_partition_from_path) { + // This should not happen + return Status::InternalError( + "Partition column {} is not a file column, but there is already a column " + "which is both a partition column and a file column.", + it->second->col_name()); + } _partition_slot_descs.emplace_back(it->second); if (_is_load) { auto iti = full_src_index_map.find(slot_id); diff --git a/be/src/vec/exec/scan/file_scanner.h b/be/src/vec/exec/scan/file_scanner.h index 99585585c20984..290d2ff2f2ea28 100644 --- a/be/src/vec/exec/scan/file_scanner.h +++ b/be/src/vec/exec/scan/file_scanner.h @@ -188,8 +188,11 @@ class FileScanner : public Scanner { std::unique_ptr _file_reader_stats; std::unique_ptr _io_ctx; + // Whether to fill partition columns from path, default is true. + bool _fill_partition_from_path = true; std::unordered_map> _partition_col_descs; + std::unordered_map _partition_value_is_null; std::unordered_map _missing_col_descs; // idx of skip_bitmap_col in _input_tuple_desc @@ -241,7 +244,7 @@ class FileScanner : public Scanner { Status _convert_to_output_block(Block* block); Status _truncate_char_or_varchar_columns(Block* block); void _truncate_char_or_varchar_column(Block* block, int idx, int len); - Status _generate_parititon_columns(); + Status _generate_partition_columns(); Status _generate_missing_columns(); bool _check_partition_prune_expr(const VExprSPtr& expr); void _init_runtime_filter_partition_prune_ctxs(); diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run18.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run18.sql new file mode 100644 index 00000000000000..84c00e12781f71 --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run18.sql @@ -0,0 +1,250 @@ +create database if not exists demo.partition_db; + +use demo.partition_db; + +-- set time zone to shanghai +SET TIME ZONE '+08:00'; +-- Partition by date type +CREATE TABLE date_partitioned ( + id BIGINT, + name STRING, + partition_key DATE +) USING ICEBERG PARTITIONED BY (partition_key); +-- Insert data into date_partitioned table +INSERT INTO + date_partitioned +VALUES (1, 'Alice', DATE '2024-01-01'), + (2, 'Bob', DATE '2024-01-01'), + ( + 3, + 'Charlie', + DATE '2024-02-01' + ), + (4, 'David', DATE '2024-02-01'), + (5, 'Eve', DATE '2024-03-01'), + (6, 'Null Date', NULL); + +-- Partition by integer type +CREATE TABLE int_partitioned ( + id BIGINT, + name STRING, + partition_key INT +) USING ICEBERG PARTITIONED BY (partition_key); + +-- Insert data into int_partitioned table +INSERT INTO + int_partitioned +VALUES (1, 'Product A', 1), + (2, 'Product B', 1), + (3, 'Product C', 2), + (4, 'Product D', 2), + (5, 'Product E', 3), + (6, 'Null Int', NULL); + +-- Partition by float type +CREATE TABLE float_partitioned ( + id BIGINT, + name STRING, + partition_key FLOAT +) USING ICEBERG PARTITIONED BY (partition_key); + +-- Insert data into float_partitioned table +INSERT INTO + float_partitioned +VALUES (1, 'Item 1', 10.5), + (2, 'Item 2', 20.75), + (3, 'Item 3', 30.0), + (4, 'Item 4', 40.25), + (5, 'Item 5', 50.5), + (6, 'Null Float', NULL); + +-- Partition by string type +CREATE TABLE string_partitioned ( + id BIGINT, + name STRING, + partition_key STRING +) USING ICEBERG PARTITIONED BY (partition_key); + +-- Insert data into string_partitioned table +INSERT INTO + string_partitioned +VALUES (1, 'User1', 'North America'), + (2, 'User2', 'North America'), + (3, 'User3', 'Europe'), + (4, 'User4', 'Europe'), + (5, 'User5', 'Asia'), + (6, 'User6', 'Asia'), + (7, 'Null String', NULL); + +-- Partition by timestamp type +CREATE TABLE timestamp_partitioned ( + id BIGINT, + name STRING, + partition_key TIMESTAMP +) USING ICEBERG PARTITIONED BY (partition_key); + +-- Insert data into timestamp_partitioned table +INSERT INTO + timestamp_partitioned +VALUES ( + 1, + 'Event1', + TIMESTAMP '2024-01-15 08:00:00' + ), + ( + 2, + 'Event2', + TIMESTAMP '2024-01-15 09:00:00' + ), + ( + 3, + 'Event3', + TIMESTAMP '2024-01-15 14:00:00' + ), + ( + 4, + 'Event4', + TIMESTAMP '2024-01-16 10:00:00' + ), + ( + 5, + 'Event5', + TIMESTAMP '2024-01-16 16:00:00' + ), + ( + 6, + 'Null Timestamp', + NULL + ); + +-- Partition by timestamp_ntz type +CREATE TABLE timestamp_ntz_partitioned ( + id BIGINT, + name STRING, + partition_key TIMESTAMP_NTZ +) USING ICEBERG PARTITIONED BY (partition_key); + +-- INSERT INTO timestamp_ntz_partitioned +INSERT INTO + timestamp_ntz_partitioned +VALUES ( + 1, + 'Event1', + TIMESTAMP_NTZ '2024-01-15 08:00:00' + ), + ( + 2, + 'Event2', + TIMESTAMP_NTZ '2024-01-15 09:00:00' + ), + ( + 3, + 'Event3', + TIMESTAMP_NTZ '2024-01-15 14:00:00' + ), + ( + 4, + 'Event4', + TIMESTAMP_NTZ '2024-01-16 10:00:00' + ), + ( + 5, + 'Event5', + TIMESTAMP_NTZ '2024-01-16 16:00:00' + ), + ( + 6, + 'Null Timestamp NTZ', + NULL + ); + +-- Partition by boolean type +CREATE TABLE boolean_partitioned ( + id BIGINT, + name STRING, + partition_key BOOLEAN +) USING ICEBERG PARTITIONED BY (partition_key); + +-- Insert data into boolean_partitioned table +INSERT INTO + boolean_partitioned +VALUES (1, 'Active User', true), + (2, 'Active Admin', true), + (3, 'Inactive User', false), + (4, 'Inactive Guest', false), + (5, 'Active Manager', true), + (6, 'Null Boolean', NULL); + +-- Partition by decimal type +CREATE TABLE decimal_partitioned ( + id BIGINT, + name STRING, + value FLOAT, + partition_key DECIMAL(10, 2) +) USING ICEBERG PARTITIONED BY (partition_key); + +-- Insert data into decimal_partitioned table +INSERT INTO + decimal_partitioned +VALUES (1, 'Item A', 125.50, 10.50), + (2, 'Item B', 200.75, 10.50), + (3, 'Item C', 89.99, 25.25), + (4, 'Item D', 156.80, 25.25), + (5, 'Item E', 299.95, 50.00), + (6, 'Item F', 399.99, 50.00), + (7, 'Null Decimal', 0.0, NULL); +-- Partition by binary type +CREATE TABLE binary_partitioned ( + id BIGINT, + name STRING, + partition_key BINARY +) USING ICEBERG PARTITIONED BY (partition_key); + +-- Insert data into binary_partitioned table +INSERT INTO + binary_partitioned +VALUES ( + 1, + 'Binary Data 1', + CAST('01010101' AS BINARY) + ), + ( + 2, + 'Binary Data 2', + CAST('01010101' AS BINARY) + ), + ( + 3, + 'Binary Data 3', + CAST('11001100' AS BINARY) + ), + ( + 4, + 'Binary Data 4', + CAST('11001100' AS BINARY) + ), + ( + 5, + 'Binary Data 5', + CAST('11110000' AS BINARY) + ), + ( + 6, + 'Null Binary', + NULL + ); + +-- Partition by string type with null values +CREATE TABLE null_str_partition_table ( + id BIGINT, + category STRING, + value DOUBLE +) USING iceberg PARTITIONED BY (category); + +INSERT INTO + null_str_partition_table +VALUES (1, NULL, 100.0), + (2, 'NULL', 200.0), + (3, '\\N', 300.0), + (4, 'null', 400.0), + (5, 'A', 500.0); \ No newline at end of file diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run06.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run06.sql new file mode 100644 index 00000000000000..eb60255a08e965 --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run06.sql @@ -0,0 +1,231 @@ +use paimon; + +create database if not exists partition_db; + +use partition_db; + +-- set time zone to shanghai +SET TIME ZONE '+08:00'; + +-- Partition by date type +CREATE TABLE date_partitioned ( + id BIGINT, + name STRING, + partition_key DATE +) PARTITIONED BY (partition_key); + +-- Insert data into date_partitioned table +INSERT INTO + date_partitioned +VALUES (1, 'Alice', DATE '2024-01-01'), + (2, 'Bob', DATE '2024-01-01'), + ( + 3, + 'Charlie', + DATE '2024-02-01' + ), + (4, 'David', DATE '2024-02-01'), + (5, 'Eve', DATE '2024-03-01'); + -- TODO: add this after fix paimon null date partition issue + -- (6, 'Null Date', NULL); + +-- Partition by timestamp type +CREATE TABLE timestamp_partitioned ( + id BIGINT, + name STRING, + partition_key TIMESTAMP +) PARTITIONED BY (partition_key); + +-- Insert data into timestamp_partitioned table +INSERT INTO + timestamp_partitioned +VALUES ( + 1, + 'Event1', + TIMESTAMP '2024-01-15 08:00:00' + ), + ( + 2, + 'Event2', + TIMESTAMP '2024-01-15 09:00:00' + ), + ( + 3, + 'Event3', + TIMESTAMP '2024-01-15 14:00:00' + ), + ( + 4, + 'Event4', + TIMESTAMP '2024-01-16 10:00:00' + ), + ( + 5, + 'Event5', + TIMESTAMP '2024-01-16 16:00:00' + ), + ( + 6, + 'Null Timestamp', + NULL + ); + +-- Partition by integer type +CREATE TABLE int_partitioned ( + id BIGINT, + name STRING, + partition_key INT +) PARTITIONED BY (partition_key); + +-- Insert data into int_partitioned table +INSERT INTO + int_partitioned +VALUES (1, 'Product A', 1), + (2, 'Product B', 1), + (3, 'Product C', 2), + (4, 'Product D', 2), + (5, 'Product E', 3), + (6, 'Null Int', NULL); + +-- Partition by bigint type +CREATE TABLE bigint_partitioned ( + id BIGINT, + name STRING, + partition_key BIGINT +) PARTITIONED BY (partition_key); + +-- Insert data into bigint_partitioned table +INSERT INTO + bigint_partitioned +VALUES (1, 'Item 1', 100), + (2, 'Item 2', 100), + (3, 'Item 3', 200), + (4, 'Item 4', 200), + (5, 'Item 5', 300), + (6, 'Null Bigint', NULL); + +-- Partition by string type +CREATE TABLE string_partitioned ( + id BIGINT, + name STRING, + partition_key STRING +) PARTITIONED BY (partition_key); + +-- Insert data into string_partitioned table +INSERT INTO + string_partitioned +VALUES (1, 'User1', 'North America'), + (2, 'User2', 'North America'), + (3, 'User3', 'Europe'), + (4, 'User4', 'Europe'), + (5, 'User5', 'Asia'), + (6, 'User6', 'Asia'), + (7, 'Null String', NULL); + +-- Partition by boolean type +CREATE TABLE boolean_partitioned ( + id BIGINT, + name STRING, + partition_key BOOLEAN +) PARTITIONED BY (partition_key); + +-- Insert data into boolean_partitioned table +INSERT INTO + boolean_partitioned +VALUES (1, 'Active User', true), + (2, 'Active Admin', true), + (3, 'Inactive User', false), + (4, 'Inactive Guest', false), + (5, 'Active Manager', true), + (6, 'Null Boolean', NULL); + +-- Partition by decimal type +CREATE TABLE decimal_partitioned ( + id BIGINT, + name STRING, + value DOUBLE, + partition_key DECIMAL(10, 2) +) PARTITIONED BY (partition_key); + +-- Insert data into decimal_partitioned table +INSERT INTO + decimal_partitioned +VALUES (1, 'Item A', 125.50, 10.50), + (2, 'Item B', 200.75, 10.50), + (3, 'Item C', 89.99, 25.25), + (4, 'Item D', 156.80, 25.25), + (5, 'Item E', 299.95, 50.00), + (6, 'Item F', 399.99, 50.00), + (7, 'Null Decimal', 0.0, NULL); + +-- Partition by binary type +CREATE TABLE binary_partitioned ( + id BIGINT, + name STRING, + partition_key BINARY +) PARTITIONED BY (partition_key); + +-- Insert data into binary_partitioned table +INSERT INTO + binary_partitioned +VALUES ( + 1, + 'Binary Data 1', + CAST('binary1' AS BINARY) + ), + ( + 2, + 'Binary Data 2', + CAST('binary1' AS BINARY) + ), + ( + 3, + 'Binary Data 3', + CAST('binary2' AS BINARY) + ), + ( + 4, + 'Binary Data 4', + CAST('binary2' AS BINARY) + ), + ( + 5, + 'Binary Data 5', + CAST('binary3' AS BINARY) + ), + ( + 6, + 'Null Binary', + NULL + ); + +-- Partition by float type +CREATE TABLE float_partitioned ( + id BIGINT, + name STRING, + partition_key FLOAT +) PARTITIONED BY (partition_key); +-- Insert data into float_partitioned table +INSERT INTO + float_partitioned +VALUES (1, 'Float Data 1', 1.5), + (2, 'Float Data 2', 1.5), + (3, 'Float Data 3', 2.5), + (4, 'Float Data 4', 2.5), + (5, 'Float Data 5', 3.5), + (6, 'Null Float', NULL); + +-- Partition by string type with null values +CREATE TABLE null_str_partition_table ( + id BIGINT, + category STRING, + value DOUBLE +) PARTITIONED BY (category); + +INSERT INTO + null_str_partition_table +VALUES (1, NULL, 100.0), + (2, 'NULL', 200.0), + (3, '\\N', 300.0), + (4, 'null', 400.0), + (5, 'A', 500.0); \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java index b12b30e518e41a..d153d62de7ac27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; +import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor; import org.apache.doris.thrift.TColumnType; import org.apache.doris.thrift.TPrimitiveType; @@ -59,7 +60,9 @@ import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -402,4 +405,14 @@ public static TSchema getSchemaInfo(InternalSchema hudiInternalSchema) { return tschema; } + public static Map getPartitionInfoMap(HMSExternalTable table, HivePartition partition) { + Map partitionInfoMap = new HashMap<>(); + List partitionColumns = table.getPartitionColumns(); + for (int i = 0; i < partitionColumns.size(); i++) { + String partitionName = partitionColumns.get(i).getName(); + String partitionValue = partition.getPartitionValues().get(i); + partitionInfoMap.put(partitionName, partitionValue); + } + return partitionInfoMap; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 5cc100d6a4cb86..c30cffad98244d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -307,6 +307,17 @@ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) { } } tableFormatFileDesc.setHudiParams(fileDesc); + Map partitionValues = hudiSplit.getHudiPartitionValues(); + if (partitionValues != null) { + List formPathKeys = new ArrayList<>(); + List formPathValues = new ArrayList<>(); + for (Map.Entry entry : partitionValues.entrySet()) { + formPathKeys.add(entry.getKey()); + formPathValues.add(entry.getValue()); + } + rangeDesc.setColumnsFromPathKeys(formPathKeys); + rangeDesc.setColumnsFromPath(formPathValues); + } rangeDesc.setTableFormatParams(tableFormatFileDesc); } @@ -362,7 +373,6 @@ private List getIncrementalSplits() { incrementalRelation.getEndTs())).collect(Collectors.toList()); } - private void getPartitionSplits(HivePartition partition, List splits) throws IOException { String partitionName; @@ -373,6 +383,10 @@ private void getPartitionSplits(HivePartition partition, List splits) thr new StoragePath(partition.getPath())); } + final Map partitionValues = sessionVariable.isEnableRuntimeFilterPartitionPrune() + ? HudiUtils.getPartitionInfoMap(hmsTable, partition) + : null; + if (canUseNativeReader()) { fsView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { noLogsSplitNum.incrementAndGet(); @@ -384,6 +398,9 @@ private void getPartitionSplits(HivePartition partition, List splits) thr HudiSplit hudiSplit = new HudiSplit(locationPath, 0, fileSize, fileSize, new String[0], partition.getPartitionValues()); hudiSplit.setTableFormatType(TableFormatType.HUDI); + if (partitionValues != null) { + hudiSplit.setHudiPartitionValues(partitionValues); + } splits.add(hudiSplit); }); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java index 2c3cbdb7fbac5c..9235bdde7a8836 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiSplit.java @@ -23,6 +23,7 @@ import lombok.Data; import java.util.List; +import java.util.Map; @Data public class HudiSplit extends FileSplit { @@ -40,4 +41,5 @@ public HudiSplit(LocationPath file, long start, long length, long fileLength, St private List hudiColumnNames; private List hudiColumnTypes; private List nestedFields; + private Map hudiPartitionValues; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 4c96eb91a20096..90155157a3e610 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -78,6 +78,7 @@ import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionsTable; @@ -101,6 +102,7 @@ import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.TimestampType; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.StructProjection; @@ -111,14 +113,19 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.DateTimeException; import java.time.Instant; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; import java.time.Month; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -602,6 +609,83 @@ public static Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) { } } + public static Map getPartitionInfoMap(PartitionData partitionData, String timeZone) { + Map partitionInfoMap = new HashMap<>(); + List fields = partitionData.getPartitionType().asNestedType().fields(); + for (int i = 0; i < fields.size(); i++) { + NestedField field = fields.get(i); + Object value = partitionData.get(i); + try { + String partitionString = serializePartitionValue(field.type(), value, timeZone); + partitionInfoMap.put(field.name(), partitionString); + } catch (UnsupportedOperationException e) { + LOG.warn("Failed to serialize Iceberg table partition value for field {}: {}", field.name(), + e.getMessage()); + return null; + } + } + return partitionInfoMap; + } + + private static String serializePartitionValue(org.apache.iceberg.types.Type type, Object value, String timeZone) { + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case STRING: + case UUID: + case DECIMAL: + if (value == null) { + return null; + } + return value.toString(); + case FIXED: + case BINARY: + if (value == null) { + return null; + } + // Fixed and binary types are stored as ByteBuffer + ByteBuffer buffer = (ByteBuffer) value; + byte[] res = new byte[buffer.limit()]; + buffer.get(res); + return new String(res, StandardCharsets.UTF_8); + case DATE: + if (value == null) { + return null; + } + // Iceberg date is stored as days since epoch (1970-01-01) + LocalDate date = LocalDate.ofEpochDay((Integer) value); + return date.format(DateTimeFormatter.ISO_LOCAL_DATE); + case TIME: + if (value == null) { + return null; + } + // Iceberg time is stored as microseconds since midnight + long micros = (Long) value; + LocalTime time = LocalTime.ofNanoOfDay(micros * 1000); + return time.format(DateTimeFormatter.ISO_LOCAL_TIME); + case TIMESTAMP: + if (value == null) { + return null; + } + // Iceberg timestamp is stored as microseconds since epoch + // (1970-01-01T00:00:00) + long timestampMicros = (Long) value; + TimestampType timestampType = (TimestampType) type; + LocalDateTime timestamp = LocalDateTime.ofEpochSecond( + timestampMicros / 1_000_000, (int) (timestampMicros % 1_000_000) * 1000, + ZoneOffset.UTC); + // type is timestamptz if timestampType.shouldAdjustToUTC() is true + if (timestampType.shouldAdjustToUTC()) { + timestamp = timestamp.atZone(ZoneId.of("UTC")).withZoneSameInstant(ZoneId.of(timeZone)) + .toLocalDateTime(); + } + return timestamp.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); + default: + throw new UnsupportedOperationException("Unsupported type for serializePartitionValue: " + type); + } + } + public static Table getIcebergTable(ExternalTable dorisTable) { return Env.getCurrentEnv() .getExtMetaCacheMgr() diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index bc9f1ee8842694..badac3ba6c058c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -60,8 +60,8 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; @@ -74,12 +74,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; public class IcebergScanNode extends FileQueryScanNode { @@ -100,7 +100,8 @@ public class IcebergScanNode extends FileQueryScanNode { private long countFromSnapshot; private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; private long targetSplitSize; - private ConcurrentHashMap.KeySetView partitionPathSet; + // This is used to avoid repeatedly calculating partition info map for the same partition data. + private Map> partitionMapInfos; private boolean isPartitionedTable; private int formatVersion; private ExecutionAuthenticator preExecutionAuthenticator; @@ -147,7 +148,7 @@ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckCol protected void doInitialize() throws UserException { icebergTable = source.getIcebergTable(); targetSplitSize = getRealFileSplitSize(0); - partitionPathSet = ConcurrentHashMap.newKeySet(); + partitionMapInfos = new HashMap<>(); isPartitionedTable = icebergTable.spec().isPartitioned(); formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); preExecutionAuthenticator = source.getCatalog().getExecutionAuthenticator(); @@ -201,6 +202,20 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli } } tableFormatFileDesc.setIcebergParams(fileDesc); + Map partitionValues = icebergSplit.getIcebergPartitionValues(); + if (partitionValues != null) { + List fromPathKeys = new ArrayList<>(); + List fromPathValues = new ArrayList<>(); + List fromPathIsNull = new ArrayList<>(); + for (Map.Entry entry : partitionValues.entrySet()) { + fromPathKeys.add(entry.getKey()); + fromPathValues.add(entry.getValue() != null ? entry.getValue() : ""); + fromPathIsNull.add(entry.getValue() == null); + } + rangeDesc.setColumnsFromPathKeys(fromPathKeys); + rangeDesc.setColumnsFromPath(fromPathValues); + rangeDesc.setColumnsFromPathIsNull(fromPathIsNull); + } rangeDesc.setTableFormatParams(tableFormatFileDesc); } @@ -312,11 +327,6 @@ private CloseableIterable planFileScanTask(TableScan scan) { } private Split createIcebergSplit(FileScanTask fileScanTask) { - if (isPartitionedTable) { - StructLike structLike = fileScanTask.file().partition(); - // Counts the number of partitions read - partitionPathSet.add(structLike.toString()); - } String originalPath = fileScanTask.file().path().toString(); LocationPath locationPath = LocationPath.of(originalPath, source.getCatalog().getCatalogProperty().getStoragePropertiesMap()); @@ -335,6 +345,20 @@ private Split createIcebergSplit(FileScanTask fileScanTask) { } split.setTableFormatType(TableFormatType.ICEBERG); split.setTargetSplitSize(targetSplitSize); + if (isPartitionedTable) { + PartitionData partitionData = (PartitionData) fileScanTask.file().partition(); + if (sessionVariable.isEnableRuntimeFilterPartitionPrune()) { + // If the partition data is not in the map, we need to calculate the partition + Map partitionInfoMap = partitionMapInfos.computeIfAbsent(partitionData, k -> { + return IcebergUtils.getPartitionInfoMap(partitionData, sessionVariable.getTimeZone()); + }); + if (partitionInfoMap != null) { + split.setIcebergPartitionValues(partitionInfoMap); + } + } else { + partitionMapInfos.put(partitionData, null); + } + } return split; } @@ -363,7 +387,7 @@ private List doGetSplits(int numBackends) throws UserException { throw new UserException(e.getMessage(), e.getCause()); } - selectedPartitionNum = partitionPathSet.size(); + selectedPartitionNum = partitionMapInfos.size(); return splits; } @@ -549,7 +573,7 @@ private void assignCountToSplits(List splits, long totalCount) { @Override public int numApproximateSplits() { - return NUM_SPLITS_PER_PARTITION * partitionPathSet.size() > 0 ? partitionPathSet.size() : 1; + return NUM_SPLITS_PER_PARTITION * partitionMapInfos.size() > 0 ? partitionMapInfos.size() : 1; } private Optional checkNotSupportedException(Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index 67a3b3d37ff357..0d08f53b3e8525 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -41,6 +41,8 @@ public class IcebergSplit extends FileSplit { private Map config; // tableLevelRowCount will be set only table-level count push down opt is available. private long tableLevelRowCount = -1; + // Partition values are used to do runtime filter partition pruning. + private Map icebergPartitionValues = null; // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java index 8f82f3c3aa0695..4d6fd91f23adbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -42,7 +42,9 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.partition.Partition; @@ -63,8 +65,14 @@ import org.apache.paimon.utils.InstantiationUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Projection; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; @@ -380,4 +388,82 @@ public static String encodeObjectToString(T t) { } } + public static Map getPartitionInfoMap(Table table, BinaryRow partitionValues, String timeZone) { + Map partitionInfoMap = new HashMap<>(); + List partitionKeys = table.partitionKeys(); + RowType partitionType = table.rowType().project(partitionKeys); + RowDataToObjectArrayConverter toObjectArrayConverter = new RowDataToObjectArrayConverter( + partitionType); + Object[] partitionValuesArray = toObjectArrayConverter.convert(partitionValues); + for (int i = 0; i < partitionKeys.size(); i++) { + try { + String partitionValue = serializePartitionValue(partitionType.getFields().get(i).type(), + partitionValuesArray[i], timeZone); + partitionInfoMap.put(partitionKeys.get(i), partitionValue); + } catch (UnsupportedOperationException e) { + LOG.warn("Failed to serialize table {} partition value for key {}: {}", table.name(), + partitionKeys.get(i), e.getMessage()); + return null; + } + } + return partitionInfoMap; + } + + private static String serializePartitionValue(org.apache.paimon.types.DataType type, Object value, + String timeZone) { + switch (type.getTypeRoot()) { + case BOOLEAN: + case INTEGER: + case BIGINT: + case SMALLINT: + case TINYINT: + case DECIMAL: + case VARCHAR: + case CHAR: + if (value == null) { + return null; + } + return value.toString(); + case BINARY: + case VARBINARY: + if (value == null) { + return null; + } + return new String((byte[]) value, StandardCharsets.UTF_8); + case DATE: + if (value == null) { + return null; + } + // Paimon date is stored as days since epoch + LocalDate date = LocalDate.ofEpochDay((Integer) value); + return date.format(DateTimeFormatter.ISO_LOCAL_DATE); + case TIME_WITHOUT_TIME_ZONE: + if (value == null) { + return null; + } + // Paimon time is stored as microseconds since midnight in utc + long micros = (Long) value; + LocalTime time = LocalTime.ofNanoOfDay(micros * 1000); + return time.format(DateTimeFormatter.ISO_LOCAL_TIME); + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (value == null) { + return null; + } + // Paimon timestamp is stored as Timestamp type in utc + return ((Timestamp) value).toLocalDateTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (value == null) { + return null; + } + // Paimon timestamp with local time zone is stored as Timestamp type in utc + Timestamp timestamp = (Timestamp) value; + return timestamp.toLocalDateTime() + .atZone(ZoneId.of("UTC")) + .withZoneSameInstant(ZoneId.of(timeZone)) + .toLocalDateTime() + .format(DateTimeFormatter.ISO_LOCAL_DATE_TIME); + default: + throw new UnsupportedOperationException("Unsupported type for serializePartitionValue: " + type); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 9eecaa20d35071..ff125f0d501f9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -44,7 +44,6 @@ import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.CoreOptions; @@ -64,7 +63,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -243,6 +241,20 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) tableFormatFileDesc.setTableLevelRowCount(paimonSplit.getRowCount().get()); } tableFormatFileDesc.setPaimonParams(fileDesc); + Map partitionValues = paimonSplit.getPaimonPartitionValues(); + if (partitionValues != null) { + List fromPathKeys = new ArrayList<>(); + List fromPathValues = new ArrayList<>(); + List fromPathIsNull = new ArrayList<>(); + for (Map.Entry entry : partitionValues.entrySet()) { + fromPathKeys.add(entry.getKey()); + fromPathValues.add(entry.getValue() != null ? entry.getValue() : ""); + fromPathIsNull.add(entry.getValue() == null); + } + rangeDesc.setColumnsFromPathKeys(fromPathKeys); + rangeDesc.setColumnsFromPath(fromPathValues); + rangeDesc.setColumnsFromPathIsNull(fromPathIsNull); + } rangeDesc.setTableFormatParams(tableFormatFileDesc); } @@ -265,8 +277,10 @@ public List getSplits(int numBackends) throws UserException { } boolean applyCountPushdown = getPushDownAggNoGroupingOp() == TPushAggOp.COUNT; - // Just for counting the number of selected partitions for this paimon table - Set selectedPartitionValues = Sets.newHashSet(); + // Used to avoid repeatedly calculating partition info map for the same + // partition data. + // And for counting the number of selected partitions for this paimon table. + Map> partitionInfoMaps = new HashMap<>(); // if applyCountPushdown is true, we can't split the DataSplit long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0); for (DataSplit dataSplit : dataSplits) { @@ -274,13 +288,26 @@ public List getSplits(int numBackends) throws UserException { splitStat.setRowCount(dataSplit.rowCount()); BinaryRow partitionValue = dataSplit.partition(); - selectedPartitionValues.add(partitionValue); + Map partitionInfoMap = null; + if (sessionVariable.isEnableRuntimeFilterPartitionPrune()) { + // If the partition value is not in the map, we need to calculate the partition + // info map and store it in the map. + partitionInfoMap = partitionInfoMaps.computeIfAbsent(partitionValue, k -> { + return PaimonUtil.getPartitionInfoMap( + source.getPaimonTable(), partitionValue, sessionVariable.getTimeZone()); + }); + } else { + partitionInfoMaps.put(partitionValue, null); + } Optional> optRawFiles = dataSplit.convertToRawFiles(); Optional> optDeletionFiles = dataSplit.deletionFiles(); if (applyCountPushdown && dataSplit.mergedRowCountAvailable()) { splitStat.setMergedRowCount(dataSplit.mergedRowCount()); PaimonSplit split = new PaimonSplit(dataSplit); split.setRowCount(dataSplit.mergedRowCount()); + if (partitionInfoMap != null) { + split.setPaimonPartitionValues(partitionInfoMap); + } pushDownCountSplits.add(split); pushDownCountSum += dataSplit.mergedRowCount(); } else if (!forceJniScanner && supportNativeReader(optRawFiles)) { @@ -305,10 +332,12 @@ public List getSplits(int numBackends) throws UserException { null, PaimonSplit.PaimonSplitCreator.DEFAULT); for (Split dorisSplit : dorisSplits) { - ((PaimonSplit) dorisSplit).setSchemaId(file.schemaId()); + PaimonSplit paimonSplit = (PaimonSplit) dorisSplit; + paimonSplit.setSchemaId(file.schemaId()); + paimonSplit.setPaimonPartitionValues(partitionInfoMap); // try to set deletion file if (optDeletionFiles.isPresent() && optDeletionFiles.get().get(i) != null) { - ((PaimonSplit) dorisSplit).setDeletionFile(optDeletionFiles.get().get(i)); + paimonSplit.setDeletionFile(optDeletionFiles.get().get(i)); splitStat.setHasDeletionVector(true); } } @@ -346,7 +375,7 @@ public List getSplits(int numBackends) throws UserException { // proportion of each split later. splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize)); - this.selectedPartitionNum = selectedPartitionValues.size(); + this.selectedPartitionNum = partitionInfoMaps.size(); return splits; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index 1bb0a2cd3dadbd..00700c3827f458 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.source.DeletionFile; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -37,6 +38,7 @@ public class PaimonSplit extends FileSplit { private Optional optDeletionFile = Optional.empty(); private Optional optRowCount = Optional.empty(); private Optional schemaId = Optional.empty(); + private Map paimonPartitionValues = null; public PaimonSplit(DataSplit split) { super(DUMMY_PATH, 0, 0, 0, 0, null, null); @@ -101,6 +103,14 @@ public Long getSchemaId() { return schemaId.orElse(null); } + public void setPaimonPartitionValues(Map paimonPartitionValues) { + this.paimonPartitionValues = paimonPartitionValues; + } + + public Map getPaimonPartitionValues() { + return paimonPartitionValues; + } + public static class PaimonSplitCreator implements SplitCreator { static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 247b7dd43be9b2..18736a0774b6e6 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -463,7 +463,7 @@ struct TFileScanRangeParams { // This is used to represent the latest id. 25: optional i64 current_schema_id; // All schema information used in the current query process - 26: optional list history_schema_info + 26: optional list history_schema_info } struct TFileRangeDesc { @@ -493,6 +493,8 @@ struct TFileRangeDesc { 12: optional string fs_name 13: optional TFileFormatType format_type; 14: optional i64 self_split_weight + // whether the value of columns_from_path is null + 15: optional list columns_from_path_is_null; } struct TSplitSource { diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.out new file mode 100644 index 00000000000000..530973b08276ea --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.out @@ -0,0 +1,205 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !runtime_filter_partition_pruning_decimal1 -- +2 + +-- !runtime_filter_partition_pruning_decimal2 -- +4 + +-- !runtime_filter_partition_pruning_decimal3 -- +2 + +-- !runtime_filter_partition_pruning_decimal_in_null -- +2 + +-- !runtime_filter_partition_pruning_int1 -- +1 + +-- !runtime_filter_partition_pruning_int2 -- +3 + +-- !runtime_filter_partition_pruning_int3 -- +1 + +-- !runtime_filter_partition_pruning_int_in_null -- +1 + +-- !runtime_filter_partition_pruning_string1 -- +2 + +-- !runtime_filter_partition_pruning_string2 -- +4 + +-- !runtime_filter_partition_pruning_string_in_null -- +2 + +-- !runtime_filter_partition_pruning_date1 -- +1 + +-- !runtime_filter_partition_pruning_date2 -- +3 + +-- !runtime_filter_partition_pruning_date_in_null -- +1 + +-- !runtime_filter_partition_pruning_timestamp1 -- +1 + +-- !runtime_filter_partition_pruning_timestamp2 -- +2 + +-- !runtime_filter_partition_pruning_timestamp_in_null -- +1 + +-- !runtime_filter_partition_pruning_boolean1 -- +3 + +-- !runtime_filter_partition_pruning_boolean2 -- +5 + +-- !runtime_filter_partition_pruning_boolean_in_null -- +3 + +-- !runtime_filter_partition_pruning_float1 -- +1 + +-- !runtime_filter_partition_pruning_float2 -- +2 + +-- !runtime_filter_partition_pruning_float3 -- +1 + +-- !runtime_filter_partition_pruning_float_in_null -- +1 + +-- !runtime_filter_partition_pruning_timestamp_ntz1 -- +1 + +-- !runtime_filter_partition_pruning_timestamp_ntz2 -- +2 + +-- !runtime_filter_partition_pruning_timestamp_ntz_in_null -- +1 + +-- !runtime_filter_partition_pruning_binary1 -- +1 + +-- !runtime_filter_partition_pruning_binary2 -- +3 + +-- !runtime_filter_partition_pruning_binary_in_null -- +1 + +-- !null_partition_1 -- +4 null 400.0 + +-- !null_partition_2 -- +2 NULL 200.0 + +-- !null_partition_3 -- +1 + +-- !null_partition_4 -- +1 \N 100.0 + +-- !runtime_filter_partition_pruning_decimal1 -- +2 + +-- !runtime_filter_partition_pruning_decimal2 -- +4 + +-- !runtime_filter_partition_pruning_decimal3 -- +2 + +-- !runtime_filter_partition_pruning_decimal_in_null -- +2 + +-- !runtime_filter_partition_pruning_int1 -- +1 + +-- !runtime_filter_partition_pruning_int2 -- +3 + +-- !runtime_filter_partition_pruning_int3 -- +1 + +-- !runtime_filter_partition_pruning_int_in_null -- +1 + +-- !runtime_filter_partition_pruning_string1 -- +2 + +-- !runtime_filter_partition_pruning_string2 -- +4 + +-- !runtime_filter_partition_pruning_string_in_null -- +2 + +-- !runtime_filter_partition_pruning_date1 -- +1 + +-- !runtime_filter_partition_pruning_date2 -- +3 + +-- !runtime_filter_partition_pruning_date_in_null -- +1 + +-- !runtime_filter_partition_pruning_timestamp1 -- +1 + +-- !runtime_filter_partition_pruning_timestamp2 -- +2 + +-- !runtime_filter_partition_pruning_timestamp_in_null -- +1 + +-- !runtime_filter_partition_pruning_boolean1 -- +3 + +-- !runtime_filter_partition_pruning_boolean2 -- +5 + +-- !runtime_filter_partition_pruning_boolean_in_null -- +3 + +-- !runtime_filter_partition_pruning_float1 -- +1 + +-- !runtime_filter_partition_pruning_float2 -- +2 + +-- !runtime_filter_partition_pruning_float3 -- +1 + +-- !runtime_filter_partition_pruning_float_in_null -- +1 + +-- !runtime_filter_partition_pruning_timestamp_ntz1 -- +1 + +-- !runtime_filter_partition_pruning_timestamp_ntz2 -- +2 + +-- !runtime_filter_partition_pruning_timestamp_ntz_in_null -- +1 + +-- !runtime_filter_partition_pruning_binary1 -- +1 + +-- !runtime_filter_partition_pruning_binary2 -- +3 + +-- !runtime_filter_partition_pruning_binary_in_null -- +1 + +-- !null_partition_1 -- +4 null 400.0 + +-- !null_partition_2 -- +2 NULL 200.0 + +-- !null_partition_3 -- +1 + +-- !null_partition_4 -- +1 \N 100.0 + diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.out b/regression-test/data/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.out new file mode 100644 index 00000000000000..738d40fabe0498 --- /dev/null +++ b/regression-test/data/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.out @@ -0,0 +1,193 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !runtime_filter_partition_pruning_decimal1 -- +2 + +-- !runtime_filter_partition_pruning_decimal2 -- +4 + +-- !runtime_filter_partition_pruning_decimal3 -- +2 + +-- !runtime_filter_partition_pruning_decimal_in_null -- +2 + +-- !runtime_filter_partition_pruning_int1 -- +1 + +-- !runtime_filter_partition_pruning_int2 -- +3 + +-- !runtime_filter_partition_pruning_int3 -- +1 + +-- !runtime_filter_partition_pruning_int_in_null -- +1 + +-- !runtime_filter_partition_pruning_string1 -- +2 + +-- !runtime_filter_partition_pruning_string2 -- +4 + +-- !runtime_filter_partition_pruning_string_in_null -- +2 + +-- !runtime_filter_partition_pruning_date1 -- +1 + +-- !runtime_filter_partition_pruning_date2 -- +3 + +-- !runtime_filter_partition_pruning_date_in_null -- +3 + +-- !runtime_filter_partition_pruning_timestamp1 -- +1 + +-- !runtime_filter_partition_pruning_timestamp2 -- +2 + +-- !runtime_filter_partition_pruning_timestamp_in_null -- +1 + +-- !runtime_filter_partition_pruning_bigint1 -- +1 + +-- !runtime_filter_partition_pruning_bigint2 -- +3 + +-- !runtime_filter_partition_pruning_bigint3 -- +1 + +-- !runtime_filter_partition_pruning_bigint_in_null -- +1 + +-- !runtime_filter_partition_pruning_boolean1 -- +3 + +-- !runtime_filter_partition_pruning_boolean2 -- +5 + +-- !runtime_filter_partition_pruning_boolean_in_null -- +3 + +-- !runtime_filter_partition_pruning_float1 -- +1 + +-- !runtime_filter_partition_pruning_float2 -- +3 + +-- !runtime_filter_partition_pruning_float3 -- +1 + +-- !runtime_filter_partition_pruning_float_in_null -- +1 + +-- !null_partition_1 -- +4 null 400.0 + +-- !null_partition_2 -- +2 NULL 200.0 + +-- !null_partition_3 -- +1 + +-- !null_partition_4 -- +1 \N 100.0 + +-- !runtime_filter_partition_pruning_decimal1 -- +2 + +-- !runtime_filter_partition_pruning_decimal2 -- +4 + +-- !runtime_filter_partition_pruning_decimal3 -- +2 + +-- !runtime_filter_partition_pruning_decimal_in_null -- +2 + +-- !runtime_filter_partition_pruning_int1 -- +1 + +-- !runtime_filter_partition_pruning_int2 -- +3 + +-- !runtime_filter_partition_pruning_int3 -- +1 + +-- !runtime_filter_partition_pruning_int_in_null -- +1 + +-- !runtime_filter_partition_pruning_string1 -- +2 + +-- !runtime_filter_partition_pruning_string2 -- +4 + +-- !runtime_filter_partition_pruning_string_in_null -- +2 + +-- !runtime_filter_partition_pruning_date1 -- +1 + +-- !runtime_filter_partition_pruning_date2 -- +3 + +-- !runtime_filter_partition_pruning_date_in_null -- +3 + +-- !runtime_filter_partition_pruning_timestamp1 -- +1 + +-- !runtime_filter_partition_pruning_timestamp2 -- +2 + +-- !runtime_filter_partition_pruning_timestamp_in_null -- +1 + +-- !runtime_filter_partition_pruning_bigint1 -- +1 + +-- !runtime_filter_partition_pruning_bigint2 -- +3 + +-- !runtime_filter_partition_pruning_bigint3 -- +1 + +-- !runtime_filter_partition_pruning_bigint_in_null -- +1 + +-- !runtime_filter_partition_pruning_boolean1 -- +3 + +-- !runtime_filter_partition_pruning_boolean2 -- +5 + +-- !runtime_filter_partition_pruning_boolean_in_null -- +3 + +-- !runtime_filter_partition_pruning_float1 -- +1 + +-- !runtime_filter_partition_pruning_float2 -- +3 + +-- !runtime_filter_partition_pruning_float3 -- +1 + +-- !runtime_filter_partition_pruning_float_in_null -- +1 + +-- !null_partition_1 -- +4 null 400.0 + +-- !null_partition_2 -- +2 NULL 200.0 + +-- !null_partition_3 -- +1 + +-- !null_partition_4 -- +1 \N 100.0 + diff --git a/regression-test/data/external_table_p2/hudi/test_hudi_runtime_filter_partition_pruning.out b/regression-test/data/external_table_p2/hudi/test_hudi_runtime_filter_partition_pruning.out new file mode 100644 index 00000000000000..abf14662aba9ad --- /dev/null +++ b/regression-test/data/external_table_p2/hudi/test_hudi_runtime_filter_partition_pruning.out @@ -0,0 +1,115 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !runtime_filter_partition_pruning_boolean_1 -- +2 + +-- !runtime_filter_partition_pruning_boolean_2 -- +4 + +-- !runtime_filter_partition_pruning_tinyint_1 -- +2 + +-- !runtime_filter_partition_pruning_tinyint_2 -- +4 + +-- !runtime_filter_partition_pruning_smallint_1 -- +2 + +-- !runtime_filter_partition_pruning_smallint_2 -- +4 + +-- !runtime_filter_partition_pruning_int_1 -- +2 + +-- !runtime_filter_partition_pruning_int_2 -- +4 + +-- !runtime_filter_partition_pruning_int_3 -- +2 + +-- !runtime_filter_partition_pruning_bigint_1 -- +2 + +-- !runtime_filter_partition_pruning_bigint_2 -- +4 + +-- !runtime_filter_partition_pruning_string_1 -- +2 + +-- !runtime_filter_partition_pruning_string_2 -- +4 + +-- !runtime_filter_partition_pruning_date_1 -- +2 + +-- !runtime_filter_partition_pruning_date_2 -- +4 + +-- !runtime_filter_partition_pruning_timestamp_1 -- +2 + +-- !runtime_filter_partition_pruning_timestamp_2 -- +4 + +-- !runtime_filter_partition_pruning_complex_1 -- +7 + +-- !runtime_filter_partition_pruning_complex_2 -- +5 + +-- !runtime_filter_partition_pruning_boolean_1 -- +2 + +-- !runtime_filter_partition_pruning_boolean_2 -- +4 + +-- !runtime_filter_partition_pruning_tinyint_1 -- +2 + +-- !runtime_filter_partition_pruning_tinyint_2 -- +4 + +-- !runtime_filter_partition_pruning_smallint_1 -- +2 + +-- !runtime_filter_partition_pruning_smallint_2 -- +4 + +-- !runtime_filter_partition_pruning_int_1 -- +2 + +-- !runtime_filter_partition_pruning_int_2 -- +4 + +-- !runtime_filter_partition_pruning_int_3 -- +2 + +-- !runtime_filter_partition_pruning_bigint_1 -- +2 + +-- !runtime_filter_partition_pruning_bigint_2 -- +4 + +-- !runtime_filter_partition_pruning_string_1 -- +2 + +-- !runtime_filter_partition_pruning_string_2 -- +4 + +-- !runtime_filter_partition_pruning_date_1 -- +2 + +-- !runtime_filter_partition_pruning_date_2 -- +4 + +-- !runtime_filter_partition_pruning_timestamp_1 -- +2 + +-- !runtime_filter_partition_pruning_timestamp_2 -- +4 + +-- !runtime_filter_partition_pruning_complex_1 -- +7 + +-- !runtime_filter_partition_pruning_complex_2 -- +5 + diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy new file mode 100644 index 00000000000000..442a8a4c1218a1 --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_runtime_filter_partition_pruning", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String catalog_name = "test_iceberg_runtime_filter_partition_pruning" + String db_name = "partition_db" + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """switch ${catalog_name}""" + sql """use ${db_name}""" + + def test_runtime_filter_partition_pruning = { + qt_runtime_filter_partition_pruning_decimal1 """ + select count(*) from decimal_partitioned where partition_key = + (select partition_key from decimal_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_decimal2 """ + select count(*) from decimal_partitioned where partition_key in + (select partition_key from decimal_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_decimal3 """ + select count(*) from decimal_partitioned where abs(partition_key) = + (select partition_key from decimal_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_decimal_in_null """ + select count(*) from decimal_partitioned where partition_key in + (select partition_key from decimal_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_int1 """ + select count(*) from int_partitioned where partition_key = + (select partition_key from int_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_int2 """ + select count(*) from int_partitioned where partition_key in + (select partition_key from int_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_int3 """ + select count(*) from int_partitioned where abs(partition_key) = + (select partition_key from int_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_int_in_null """ + select count(*) from int_partitioned where partition_key in + (select partition_key from int_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_string1 """ + select count(*) from string_partitioned where partition_key = + (select partition_key from string_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_string2 """ + select count(*) from string_partitioned where partition_key in + (select partition_key from string_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_string_in_null """ + select count(*) from string_partitioned where partition_key in + (select partition_key from string_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_date1 """ + select count(*) from date_partitioned where partition_key = + (select partition_key from date_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_date2 """ + select count(*) from date_partitioned where partition_key in + (select partition_key from date_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_date_in_null """ + select count(*) from date_partitioned where partition_key in + (select partition_key from date_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_timestamp1 """ + select count(*) from timestamp_partitioned where partition_key = + (select partition_key from timestamp_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_timestamp2 """ + select count(*) from timestamp_partitioned where partition_key in + (select partition_key from timestamp_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_timestamp_in_null """ + select count(*) from timestamp_partitioned where partition_key in + (select partition_key from timestamp_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_boolean1 """ + select count(*) from boolean_partitioned where partition_key = + (select partition_key from boolean_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_boolean2 """ + select count(*) from boolean_partitioned where partition_key in + (select partition_key from boolean_partitioned + group by partition_key having count(*) > 0); + """ + qt_runtime_filter_partition_pruning_boolean_in_null """ + select count(*) from boolean_partitioned where partition_key in + (select partition_key from boolean_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_float1 """ + select count(*) from float_partitioned where partition_key = + (select partition_key from float_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_float2 """ + select count(*) from float_partitioned where partition_key in + (select partition_key from float_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_float3 """ + select count(*) from float_partitioned where abs(partition_key) = + (select partition_key from float_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_float_in_null """ + select count(*) from float_partitioned where partition_key in + (select partition_key from float_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_timestamp_ntz1 """ + select count(*) from timestamp_ntz_partitioned where partition_key = + (select partition_key from timestamp_ntz_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_timestamp_ntz2 """ + select count(*) from timestamp_ntz_partitioned where partition_key in + (select partition_key from timestamp_ntz_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_timestamp_ntz_in_null """ + select count(*) from timestamp_ntz_partitioned where partition_key in + (select partition_key from timestamp_ntz_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_binary1 """ + select count(*) from binary_partitioned where partition_key = + (select partition_key from binary_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_binary2 """ + select count(*) from binary_partitioned where partition_key in + (select partition_key from binary_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_binary_in_null """ + select count(*) from binary_partitioned where partition_key in + (select partition_key from binary_partitioned + order by id desc limit 2); + """ + qt_null_partition_1 """ + select * from null_str_partition_table where category = 'null'; + """ + qt_null_partition_2 """ + select * from null_str_partition_table where category = 'NULL'; + """ + qt_null_partition_3 """ + select count(*) from null_str_partition_table where category = '\\\\N'; + """ + qt_null_partition_4 """ + select * from null_str_partition_table where category is null; + """ + } + try { + sql """ set time_zone = 'Asia/Shanghai'; """ + sql """ set enable_runtime_filter_partition_prune = false; """ + test_runtime_filter_partition_pruning() + sql """ set enable_runtime_filter_partition_prune = true; """ + test_runtime_filter_partition_pruning() + + } finally { + sql """ unset variable time_zone; """ + sql """ set enable_runtime_filter_partition_prune = true; """ + } + +} diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy new file mode 100644 index 00000000000000..f7a666d2c8328f --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_runtime_filter_partition_pruning", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String catalog_name = "test_paimon_runtime_filter_partition_pruning" + String db_name = "partition_db" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + + sql """drop catalog if exists ${catalog_name}""" + + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type' = 'paimon', + 'warehouse' = 's3://warehouse/wh', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.path.style.access' = 'true' + ); + """ + sql """use `${catalog_name}`.`${db_name}`;""" + + def test_runtime_filter_partition_pruning = { + qt_runtime_filter_partition_pruning_decimal1 """ + select count(*) from decimal_partitioned where partition_key = + (select partition_key from decimal_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_decimal2 """ + select count(*) from decimal_partitioned where partition_key in + (select partition_key from decimal_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_decimal3 """ + select count(*) from decimal_partitioned where abs(partition_key) = + (select partition_key from decimal_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_decimal_in_null """ + select count(*) from decimal_partitioned where partition_key in + (select partition_key from decimal_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_int1 """ + select count(*) from int_partitioned where partition_key = + (select partition_key from int_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_int2 """ + select count(*) from int_partitioned where partition_key in + (select partition_key from int_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_int3 """ + select count(*) from int_partitioned where abs(partition_key) = + (select partition_key from int_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_int_in_null """ + select count(*) from int_partitioned where partition_key in + (select partition_key from int_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_string1 """ + select count(*) from string_partitioned where partition_key = + (select partition_key from string_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_string2 """ + select count(*) from string_partitioned where partition_key in + (select partition_key from string_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_string_in_null """ + select count(*) from string_partitioned where partition_key in + (select partition_key from string_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_date1 """ + select count(*) from date_partitioned where partition_key = + (select partition_key from date_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_date2 """ + select count(*) from date_partitioned where partition_key in + (select partition_key from date_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_date_in_null """ + select count(*) from date_partitioned where partition_key in + (select partition_key from date_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_timestamp1 """ + select count(*) from timestamp_partitioned where partition_key = + (select partition_key from timestamp_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_timestamp2 """ + select count(*) from timestamp_partitioned where partition_key in + (select partition_key from timestamp_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_timestamp_in_null """ + select count(*) from timestamp_partitioned where partition_key in + (select partition_key from timestamp_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_bigint1 """ + select count(*) from bigint_partitioned where partition_key = + (select partition_key from bigint_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_bigint2 """ + select count(*) from bigint_partitioned where partition_key in + (select partition_key from bigint_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_bigint3 """ + select count(*) from bigint_partitioned where abs(partition_key) = + (select partition_key from bigint_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_bigint_in_null """ + select count(*) from bigint_partitioned where partition_key in + (select partition_key from bigint_partitioned + order by id desc limit 2); + """ + qt_runtime_filter_partition_pruning_boolean1 """ + select count(*) from boolean_partitioned where partition_key = + (select partition_key from boolean_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_boolean2 """ + select count(*) from boolean_partitioned where partition_key in + (select partition_key from boolean_partitioned + group by partition_key having count(*) > 0); + """ + qt_runtime_filter_partition_pruning_boolean_in_null """ + select count(*) from boolean_partitioned where partition_key in + (select partition_key from boolean_partitioned + order by id desc limit 2); + """ + // binary type as partition key will cause issues in paimon, so skipping these tests + // qt_runtime_filter_partition_pruning_binary1 """ + // select count(*) from binary_partitioned where partition_key = + // (select partition_key from binary_partitioned + // group by partition_key having count(*) > 0 + // order by partition_key desc limit 1); + // """ + // qt_runtime_filter_partition_pruning_binary2 """ + // select count(*) from binary_partitioned where partition_key in + // (select partition_key from binary_partitioned + // group by partition_key having count(*) > 0 + // order by partition_key desc limit 2); + // """ + qt_runtime_filter_partition_pruning_float1 """ + select count(*) from float_partitioned where partition_key = + (select partition_key from float_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_float2 """ + select count(*) from float_partitioned where partition_key in + (select partition_key from float_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 2); + """ + qt_runtime_filter_partition_pruning_float3 """ + select count(*) from float_partitioned where abs(partition_key) = + (select partition_key from float_partitioned + group by partition_key having count(*) > 0 + order by partition_key desc limit 1); + """ + qt_runtime_filter_partition_pruning_float_in_null """ + select count(*) from float_partitioned where partition_key in + (select partition_key from float_partitioned + order by id desc limit 2); + """ + qt_null_partition_1 """ + select * from null_str_partition_table where category = 'null'; + """ + qt_null_partition_2 """ + select * from null_str_partition_table where category = 'NULL'; + """ + qt_null_partition_3 """ + select count(*) from null_str_partition_table where category = '\\\\N'; + """ + qt_null_partition_4 """ + select * from null_str_partition_table where category is null; + """ + } + + try { + sql """ set time_zone = 'Asia/Shanghai'; """ + sql """ set enable_runtime_filter_partition_prune = false; """ + test_runtime_filter_partition_pruning() + sql """ set enable_runtime_filter_partition_prune = true; """ + test_runtime_filter_partition_pruning() + + } finally { + sql """ unset variable time_zone; """ + sql """ set enable_runtime_filter_partition_prune = true; """ + } + } +} + + diff --git a/regression-test/suites/external_table_p2/hudi/test_hudi_runtime_filter_partition_pruning.groovy b/regression-test/suites/external_table_p2/hudi/test_hudi_runtime_filter_partition_pruning.groovy new file mode 100644 index 00000000000000..de4757f7c6f062 --- /dev/null +++ b/regression-test/suites/external_table_p2/hudi/test_hudi_runtime_filter_partition_pruning.groovy @@ -0,0 +1,198 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hudi_runtime_filter_partition_pruning", "p2,external,hudi,external_remote,external_remote_hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHudiTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable hudi test") + return + } + + String catalog_name = "test_hudi_runtime_filter_partition_pruning" + String props = context.config.otherConfigs.get("hudiEmrCatalog") + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + ${props} + ); + """ + + sql """ switch ${catalog_name};""" + sql """ use regression_hudi;""" + sql """ set enable_fallback_to_original_planner=false """ + + def test_runtime_filter_partition_pruning = { + // Test BOOLEAN partition + qt_runtime_filter_partition_pruning_boolean_1 """ + select count(*) from boolean_partition_tb where part1 = + (select part1 from boolean_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + qt_runtime_filter_partition_pruning_boolean_2 """ + select count(*) from boolean_partition_tb where part1 in + (select part1 from boolean_partition_tb + group by part1 having count(*) > 0); + """ + + // Test TINYINT partition + qt_runtime_filter_partition_pruning_tinyint_1 """ + select count(*) from tinyint_partition_tb where part1 = + (select part1 from tinyint_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + qt_runtime_filter_partition_pruning_tinyint_2 """ + select count(*) from tinyint_partition_tb where part1 in + (select part1 from tinyint_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 2); + """ + + // Test SMALLINT partition + qt_runtime_filter_partition_pruning_smallint_1 """ + select count(*) from smallint_partition_tb where part1 = + (select part1 from smallint_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + qt_runtime_filter_partition_pruning_smallint_2 """ + select count(*) from smallint_partition_tb where part1 in + (select part1 from smallint_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 2); + """ + + // Test INT partition + qt_runtime_filter_partition_pruning_int_1 """ + select count(*) from int_partition_tb where part1 = + (select part1 from int_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + qt_runtime_filter_partition_pruning_int_2 """ + select count(*) from int_partition_tb where part1 in + (select part1 from int_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 2); + """ + + qt_runtime_filter_partition_pruning_int_3 """ + select count(*) from int_partition_tb where abs(part1) = + (select part1 from int_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + // Test BIGINT partition + qt_runtime_filter_partition_pruning_bigint_1 """ + select count(*) from bigint_partition_tb where part1 = + (select part1 from bigint_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + qt_runtime_filter_partition_pruning_bigint_2 """ + select count(*) from bigint_partition_tb where part1 in + (select part1 from bigint_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 2); + """ + + // Test STRING partition + qt_runtime_filter_partition_pruning_string_1 """ + select count(*) from string_partition_tb where part1 = + (select part1 from string_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + qt_runtime_filter_partition_pruning_string_2 """ + select count(*) from string_partition_tb where part1 in + (select part1 from string_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 2); + """ + + // Test DATE partition + qt_runtime_filter_partition_pruning_date_1 """ + select count(*) from date_partition_tb where part1 = + (select part1 from date_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + qt_runtime_filter_partition_pruning_date_2 """ + select count(*) from date_partition_tb where part1 in + (select part1 from date_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 2); + """ + + // Test TIMESTAMP partition + qt_runtime_filter_partition_pruning_timestamp_1 """ + select count(*) from timestamp_partition_tb where part1 = + (select part1 from timestamp_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 1); + """ + + qt_runtime_filter_partition_pruning_timestamp_2 """ + select count(*) from timestamp_partition_tb where part1 in + (select part1 from timestamp_partition_tb + group by part1 having count(*) > 0 + order by part1 desc limit 2); + """ + + // Additional complex scenarios with multiple filters + qt_runtime_filter_partition_pruning_complex_1 """ + select count(*) from three_partition_tb t1 + where t1.part1 in ( + select t2.part1 from three_partition_tb t2 + where t2.part2 = 2024 + group by t2.part1 having count(*) > 2 + ); + """ + + qt_runtime_filter_partition_pruning_complex_2 """ + select count(*) from two_partition_tb t1 + where t1.part1 = 'US' and t1.part2 in ( + select t2.part2 from two_partition_tb t2 + where t2.part1 = 'US' + group by t2.part2 having count(*) > 1 + ); + """ + } + + try { + // Test with runtime filter partition pruning disabled + sql """ set enable_runtime_filter_partition_prune = false; """ + test_runtime_filter_partition_pruning() + + // Test with runtime filter partition pruning enabled + sql """ set enable_runtime_filter_partition_prune = true; """ + test_runtime_filter_partition_pruning() + + } finally { + // Restore default setting + sql """ set enable_runtime_filter_partition_prune = true; """ + } +}