-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[enhance](multi-catalog) Runtime Filter Partition Pruning for Data Lake Tables #53399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhance](multi-catalog) Runtime Filter Partition Pruning for Data Lake Tables #53399
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
31cfbd2 to
abeecff
Compare
|
run buildall |
a28e5d9 to
28966e7
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 34111 ms |
TPC-DS: Total hot run time: 187424 ms |
ClickBench: Total hot run time: 33.38 s |
FE UT Coverage ReportIncrement line coverage |
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
Show resolved
Hide resolved
| new String[0], partition.getPartitionValues()); | ||
| hudiSplit.setTableFormatType(TableFormatType.HUDI); | ||
| if (sessionVariable.isEnableRuntimeFilterPartitionPrune()) { | ||
| hudiSplit.setHudiPartitionValues(HudiUtils.getPartitionInfoMap(hmsTable, partition)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the HudiUtils.getPartitionInfoMap(hmsTable, partition) should only be called once for one partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unresolved?
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
ff87bdb to
511964b
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 33900 ms |
TPC-DS: Total hot run time: 186951 ms |
ClickBench: Total hot run time: 33.55 s |
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
FE UT Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
morningman
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
PR approved by at least one committer and no changes requested. |
…ke Tables (apache#53399) follow: apache#47025 This PR implements dynamic partition pruning based on runtime filters for Iceberg, Paimon, and Hudi data lake tables, extending and enhancing the previous PR [apache#47025](apache#47025). In PR [apache#47025](apache#47025), we implemented runtime filter-based dynamic partition pruning for Hive tables. However, due to significant differences in partition metadata formats between Iceberg, Paimon, Hudi and traditional Hive tables, specialized adaptation and implementation are required for these data lake formats. - During split generation in scan nodes, when `enable_runtime_filter_partition_prune` is enabled, call corresponding partition value extraction functions - Pass extracted partition values to backend through `TFileRangeDesc.data_lake_partition_values` field - Store partition values in `Map<String, String>` format, with keys as partition column names and values as serialized partition values - Process partition column information in `FileScanner::_generate_data_lake_partition_columns()` - Runtime filters can perform partition pruning based on this partition value information, avoiding scanning of non-matching partition files Dynamic partition pruning supports the following types of queries: ```sql -- Equality queries SELECT count(*) FROM iceberg_table WHERE partition_col = ( SELECT partition_col FROM iceberg_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); -- IN queries SELECT count(*) FROM paimon_table WHERE partition_col IN ( SELECT partition_col FROM paimon_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 2 ); -- Function expression queries SELECT count(*) FROM hudi_table WHERE abs(partition_col) = ( SELECT partition_col FROM hudi_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); ``` Partition data types supported by each format: **Common Support**: - **Numeric types**: INT, BIGINT, DECIMAL, FLOAT, DOUBLE, TINYINT, SMALLINT - **String types**: STRING, VARCHAR, CHAR - **Date/time types**: DATE, TIMESTAMP - **Boolean type**: BOOLEAN - **Binary types**: BINARY (except for Paimon) **Format-specific Support**: - **Iceberg**: Additionally supports TIMESTAMP_NTZ type for timezone-free timestamps - **Paimon**: Does not support BINARY as partition key (currently binary as partition key causes issues in Spark) - **Hudi**: Based on Hive partition format, supports all Hive-compatible types **Notes**: - TIME and UUID types are supported at the code level, but since Spark does not support these types as partition keys, test cases do not include related test scenarios - In actual production environments, if these types are used, the dynamic partition pruning feature can still work normally
…ke Tables (apache#53399) follow: apache#47025 This PR implements dynamic partition pruning based on runtime filters for Iceberg, Paimon, and Hudi data lake tables, extending and enhancing the previous PR [apache#47025](apache#47025). In PR [apache#47025](apache#47025), we implemented runtime filter-based dynamic partition pruning for Hive tables. However, due to significant differences in partition metadata formats between Iceberg, Paimon, Hudi and traditional Hive tables, specialized adaptation and implementation are required for these data lake formats. - During split generation in scan nodes, when `enable_runtime_filter_partition_prune` is enabled, call corresponding partition value extraction functions - Pass extracted partition values to backend through `TFileRangeDesc.data_lake_partition_values` field - Store partition values in `Map<String, String>` format, with keys as partition column names and values as serialized partition values - Process partition column information in `FileScanner::_generate_data_lake_partition_columns()` - Runtime filters can perform partition pruning based on this partition value information, avoiding scanning of non-matching partition files Dynamic partition pruning supports the following types of queries: ```sql -- Equality queries SELECT count(*) FROM iceberg_table WHERE partition_col = ( SELECT partition_col FROM iceberg_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); -- IN queries SELECT count(*) FROM paimon_table WHERE partition_col IN ( SELECT partition_col FROM paimon_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 2 ); -- Function expression queries SELECT count(*) FROM hudi_table WHERE abs(partition_col) = ( SELECT partition_col FROM hudi_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); ``` Partition data types supported by each format: **Common Support**: - **Numeric types**: INT, BIGINT, DECIMAL, FLOAT, DOUBLE, TINYINT, SMALLINT - **String types**: STRING, VARCHAR, CHAR - **Date/time types**: DATE, TIMESTAMP - **Boolean type**: BOOLEAN - **Binary types**: BINARY (except for Paimon) **Format-specific Support**: - **Iceberg**: Additionally supports TIMESTAMP_NTZ type for timezone-free timestamps - **Paimon**: Does not support BINARY as partition key (currently binary as partition key causes issues in Spark) - **Hudi**: Based on Hive partition format, supports all Hive-compatible types **Notes**: - TIME and UUID types are supported at the code level, but since Spark does not support these types as partition keys, test cases do not include related test scenarios - In actual production environments, if these types are used, the dynamic partition pruning feature can still work normally
… Data Lake Tables (apache#53399)" This reverts commit 8d98908.
…ke Tables (apache#53399) follow: apache#47025 This PR implements dynamic partition pruning based on runtime filters for Iceberg, Paimon, and Hudi data lake tables, extending and enhancing the previous PR [apache#47025](apache#47025). In PR [apache#47025](apache#47025), we implemented runtime filter-based dynamic partition pruning for Hive tables. However, due to significant differences in partition metadata formats between Iceberg, Paimon, Hudi and traditional Hive tables, specialized adaptation and implementation are required for these data lake formats. - During split generation in scan nodes, when `enable_runtime_filter_partition_prune` is enabled, call corresponding partition value extraction functions - Pass extracted partition values to backend through `TFileRangeDesc.data_lake_partition_values` field - Store partition values in `Map<String, String>` format, with keys as partition column names and values as serialized partition values - Process partition column information in `FileScanner::_generate_data_lake_partition_columns()` - Runtime filters can perform partition pruning based on this partition value information, avoiding scanning of non-matching partition files Dynamic partition pruning supports the following types of queries: ```sql -- Equality queries SELECT count(*) FROM iceberg_table WHERE partition_col = ( SELECT partition_col FROM iceberg_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); -- IN queries SELECT count(*) FROM paimon_table WHERE partition_col IN ( SELECT partition_col FROM paimon_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 2 ); -- Function expression queries SELECT count(*) FROM hudi_table WHERE abs(partition_col) = ( SELECT partition_col FROM hudi_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); ``` Partition data types supported by each format: **Common Support**: - **Numeric types**: INT, BIGINT, DECIMAL, FLOAT, DOUBLE, TINYINT, SMALLINT - **String types**: STRING, VARCHAR, CHAR - **Date/time types**: DATE, TIMESTAMP - **Boolean type**: BOOLEAN - **Binary types**: BINARY (except for Paimon) **Format-specific Support**: - **Iceberg**: Additionally supports TIMESTAMP_NTZ type for timezone-free timestamps - **Paimon**: Does not support BINARY as partition key (currently binary as partition key causes issues in Spark) - **Hudi**: Based on Hive partition format, supports all Hive-compatible types **Notes**: - TIME and UUID types are supported at the code level, but since Spark does not support these types as partition keys, test cases do not include related test scenarios - In actual production environments, if these types are used, the dynamic partition pruning feature can still work normally
### What problem does this PR solve? Issue Number: close #xxx Related PR: ##53399
### What problem does this PR solve? Issue Number: close #xxx Related PR: ##53399
### What problem does this PR solve? Issue Number: close #xxx Related PR: #apache#53399
Issue Number: close #xxx Related PR: #apache#53399
… type (apache#59564) ### What problem does this PR solve? Related PR: apache#53399 Problem Summary: the serializePartitionValue function will return String value. But the binary type use String with utf8 will be cause data corrupted, and it is not same with origin data.
… type (apache#59564) ### What problem does this PR solve? Related PR: apache#53399 Problem Summary: the serializePartitionValue function will return String value. But the binary type use String with utf8 will be cause data corrupted, and it is not same with origin data.
What problem does this PR solve?
follow: #47025
PR Overview
This PR implements dynamic partition pruning based on runtime filters for Iceberg, Paimon, and Hudi data lake tables, extending and enhancing the previous PR #47025.
Background
In PR #47025, we implemented runtime filter-based dynamic partition pruning for Hive tables. However, due to significant differences in partition metadata formats between Iceberg, Paimon, Hudi and traditional Hive tables, specialized adaptation and implementation are required for these data lake formats.
Main Features
1. Core Implementation
Frontend (FE) Changes
enable_runtime_filter_partition_pruneis enabled, call corresponding partition value extraction functionsTFileRangeDesc.data_lake_partition_valuesfieldMap<String, String>format, with keys as partition column names and values as serialized partition valuesBackend (BE) Changes
FileScanner::_generate_data_lake_partition_columns()2. Supported Query Types
Dynamic partition pruning supports the following types of queries:
3. Supported Data Types
Partition data types supported by each format:
Common Support:
Format-specific Support:
Notes:
Release note
Impl runtime filter partition pruning for Iceberg/Paimon
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)