-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[enhance](runtime filter) impl partition pruning in runtime filer #47025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhance](runtime filter) impl partition pruning in runtime filer #47025
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
TPC-H: Total hot run time: 32093 ms |
|
TeamCity be ut coverage result: |
TPC-DS: Total hot run time: 195472 ms |
ClickBench: Total hot run time: 30.52 s |
|
run buildall |
TPC-H: Total hot run time: 32424 ms |
TPC-DS: Total hot run time: 188758 ms |
ClickBench: Total hot run time: 30.84 s |
|
TeamCity be ut coverage result: |
|
run buildall |
TPC-H: Total hot run time: 32468 ms |
TPC-DS: Total hot run time: 194362 ms |
ClickBench: Total hot run time: 30.88 s |
|
TeamCity be ut coverage result: |
|
run buildall |
TPC-H: Total hot run time: 32077 ms |
TPC-DS: Total hot run time: 193645 ms |
|
TeamCity be ut coverage result: |
ClickBench: Total hot run time: 31.03 s |
|
run buildall |
TPC-H: Total hot run time: 32312 ms |
TPC-DS: Total hot run time: 184063 ms |
ClickBench: Total hot run time: 29.99 s |
|
TeamCity be ut coverage result: |
|
run buildall |
|
TeamCity cloud ut coverage result: |
TPC-H: Total hot run time: 31560 ms |
TPC-DS: Total hot run time: 189219 ms |
ClickBench: Total hot run time: 30.96 s |
|
TeamCity be ut coverage result: |
|
run p0 |
kaka11chen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
…7025) This PR implements partition pruning through runtime filters. When executing a SQL query like: ```sql SELECT count(*) FROM int_partition_table WHERE partition_col = ( SELECT partition_col FROM int_partition_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ) ``` During execution, the backend (BE) will receive a dynamic runtime filter condition `partition_col = xxx`. Since partition_col is a partitioning column, we can use its value to determine if the partition can be pruned. Additionally, this mechanism also supports filtering queries like: ```sql SELECT count(*) FROM int_partition_table WHERE func(partition_col) = xxx ``` If func cannot be evaluated at the frontend (FE), the frontend will not perform partition pruning. However, since the backend can compute func, this mechanism allows us to handle pruning scenarios that are not possible at the frontend, providing a more efficient pruning process on the backend side.
…ache#47025) This PR implements partition pruning through runtime filters. When executing a SQL query like: ```sql SELECT count(*) FROM int_partition_table WHERE partition_col = ( SELECT partition_col FROM int_partition_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ) ``` During execution, the backend (BE) will receive a dynamic runtime filter condition `partition_col = xxx`. Since partition_col is a partitioning column, we can use its value to determine if the partition can be pruned. Additionally, this mechanism also supports filtering queries like: ```sql SELECT count(*) FROM int_partition_table WHERE func(partition_col) = xxx ``` If func cannot be evaluated at the frontend (FE), the frontend will not perform partition pruning. However, since the backend can compute func, this mechanism allows us to handle pruning scenarios that are not possible at the frontend, providing a more efficient pruning process on the backend side.
…ache#47025) This PR implements partition pruning through runtime filters. When executing a SQL query like: ```sql SELECT count(*) FROM int_partition_table WHERE partition_col = ( SELECT partition_col FROM int_partition_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ) ``` During execution, the backend (BE) will receive a dynamic runtime filter condition `partition_col = xxx`. Since partition_col is a partitioning column, we can use its value to determine if the partition can be pruned. Additionally, this mechanism also supports filtering queries like: ```sql SELECT count(*) FROM int_partition_table WHERE func(partition_col) = xxx ``` If func cannot be evaluated at the frontend (FE), the frontend will not perform partition pruning. However, since the backend can compute func, this mechanism allows us to handle pruning scenarios that are not possible at the frontend, providing a more efficient pruning process on the backend side.
…ache#47025) This PR implements partition pruning through runtime filters. When executing a SQL query like: ```sql SELECT count(*) FROM int_partition_table WHERE partition_col = ( SELECT partition_col FROM int_partition_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ) ``` During execution, the backend (BE) will receive a dynamic runtime filter condition `partition_col = xxx`. Since partition_col is a partitioning column, we can use its value to determine if the partition can be pruned. Additionally, this mechanism also supports filtering queries like: ```sql SELECT count(*) FROM int_partition_table WHERE func(partition_col) = xxx ``` If func cannot be evaluated at the frontend (FE), the frontend will not perform partition pruning. However, since the backend can compute func, this mechanism allows us to handle pruning scenarios that are not possible at the frontend, providing a more efficient pruning process on the backend side.
…ke Tables (#53399) ### What problem does this PR solve? follow: #47025 ## PR Overview This PR implements dynamic partition pruning based on runtime filters for Iceberg, Paimon, and Hudi data lake tables, extending and enhancing the previous PR [#47025](#47025). ## Background In PR [#47025](#47025), we implemented runtime filter-based dynamic partition pruning for Hive tables. However, due to significant differences in partition metadata formats between Iceberg, Paimon, Hudi and traditional Hive tables, specialized adaptation and implementation are required for these data lake formats. ## Main Features ### 1. Core Implementation #### Frontend (FE) Changes - During split generation in scan nodes, when `enable_runtime_filter_partition_prune` is enabled, call corresponding partition value extraction functions - Pass extracted partition values to backend through `TFileRangeDesc.data_lake_partition_values` field - Store partition values in `Map<String, String>` format, with keys as partition column names and values as serialized partition values #### Backend (BE) Changes - Process partition column information in `FileScanner::_generate_data_lake_partition_columns()` - Runtime filters can perform partition pruning based on this partition value information, avoiding scanning of non-matching partition files ### 2. Supported Query Types Dynamic partition pruning supports the following types of queries: ```sql -- Equality queries SELECT count(*) FROM iceberg_table WHERE partition_col = ( SELECT partition_col FROM iceberg_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); -- IN queries SELECT count(*) FROM paimon_table WHERE partition_col IN ( SELECT partition_col FROM paimon_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 2 ); -- Function expression queries SELECT count(*) FROM hudi_table WHERE abs(partition_col) = ( SELECT partition_col FROM hudi_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); ``` ### 3. Supported Data Types Partition data types supported by each format: **Common Support**: - **Numeric types**: INT, BIGINT, DECIMAL, FLOAT, DOUBLE, TINYINT, SMALLINT - **String types**: STRING, VARCHAR, CHAR - **Date/time types**: DATE, TIMESTAMP - **Boolean type**: BOOLEAN - **Binary types**: BINARY (except for Paimon) **Format-specific Support**: - **Iceberg**: Additionally supports TIMESTAMP_NTZ type for timezone-free timestamps - **Paimon**: Does not support BINARY as partition key (currently binary as partition key causes issues in Spark) - **Hudi**: Based on Hive partition format, supports all Hive-compatible types **Notes**: - TIME and UUID types are supported at the code level, but since Spark does not support these types as partition keys, test cases do not include related test scenarios - In actual production environments, if these types are used, the dynamic partition pruning feature can still work normally
…ke Tables (apache#53399) follow: apache#47025 This PR implements dynamic partition pruning based on runtime filters for Iceberg, Paimon, and Hudi data lake tables, extending and enhancing the previous PR [apache#47025](apache#47025). In PR [apache#47025](apache#47025), we implemented runtime filter-based dynamic partition pruning for Hive tables. However, due to significant differences in partition metadata formats between Iceberg, Paimon, Hudi and traditional Hive tables, specialized adaptation and implementation are required for these data lake formats. - During split generation in scan nodes, when `enable_runtime_filter_partition_prune` is enabled, call corresponding partition value extraction functions - Pass extracted partition values to backend through `TFileRangeDesc.data_lake_partition_values` field - Store partition values in `Map<String, String>` format, with keys as partition column names and values as serialized partition values - Process partition column information in `FileScanner::_generate_data_lake_partition_columns()` - Runtime filters can perform partition pruning based on this partition value information, avoiding scanning of non-matching partition files Dynamic partition pruning supports the following types of queries: ```sql -- Equality queries SELECT count(*) FROM iceberg_table WHERE partition_col = ( SELECT partition_col FROM iceberg_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); -- IN queries SELECT count(*) FROM paimon_table WHERE partition_col IN ( SELECT partition_col FROM paimon_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 2 ); -- Function expression queries SELECT count(*) FROM hudi_table WHERE abs(partition_col) = ( SELECT partition_col FROM hudi_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); ``` Partition data types supported by each format: **Common Support**: - **Numeric types**: INT, BIGINT, DECIMAL, FLOAT, DOUBLE, TINYINT, SMALLINT - **String types**: STRING, VARCHAR, CHAR - **Date/time types**: DATE, TIMESTAMP - **Boolean type**: BOOLEAN - **Binary types**: BINARY (except for Paimon) **Format-specific Support**: - **Iceberg**: Additionally supports TIMESTAMP_NTZ type for timezone-free timestamps - **Paimon**: Does not support BINARY as partition key (currently binary as partition key causes issues in Spark) - **Hudi**: Based on Hive partition format, supports all Hive-compatible types **Notes**: - TIME and UUID types are supported at the code level, but since Spark does not support these types as partition keys, test cases do not include related test scenarios - In actual production environments, if these types are used, the dynamic partition pruning feature can still work normally
…ke Tables (apache#53399) follow: apache#47025 This PR implements dynamic partition pruning based on runtime filters for Iceberg, Paimon, and Hudi data lake tables, extending and enhancing the previous PR [apache#47025](apache#47025). In PR [apache#47025](apache#47025), we implemented runtime filter-based dynamic partition pruning for Hive tables. However, due to significant differences in partition metadata formats between Iceberg, Paimon, Hudi and traditional Hive tables, specialized adaptation and implementation are required for these data lake formats. - During split generation in scan nodes, when `enable_runtime_filter_partition_prune` is enabled, call corresponding partition value extraction functions - Pass extracted partition values to backend through `TFileRangeDesc.data_lake_partition_values` field - Store partition values in `Map<String, String>` format, with keys as partition column names and values as serialized partition values - Process partition column information in `FileScanner::_generate_data_lake_partition_columns()` - Runtime filters can perform partition pruning based on this partition value information, avoiding scanning of non-matching partition files Dynamic partition pruning supports the following types of queries: ```sql -- Equality queries SELECT count(*) FROM iceberg_table WHERE partition_col = ( SELECT partition_col FROM iceberg_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); -- IN queries SELECT count(*) FROM paimon_table WHERE partition_col IN ( SELECT partition_col FROM paimon_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 2 ); -- Function expression queries SELECT count(*) FROM hudi_table WHERE abs(partition_col) = ( SELECT partition_col FROM hudi_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); ``` Partition data types supported by each format: **Common Support**: - **Numeric types**: INT, BIGINT, DECIMAL, FLOAT, DOUBLE, TINYINT, SMALLINT - **String types**: STRING, VARCHAR, CHAR - **Date/time types**: DATE, TIMESTAMP - **Boolean type**: BOOLEAN - **Binary types**: BINARY (except for Paimon) **Format-specific Support**: - **Iceberg**: Additionally supports TIMESTAMP_NTZ type for timezone-free timestamps - **Paimon**: Does not support BINARY as partition key (currently binary as partition key causes issues in Spark) - **Hudi**: Based on Hive partition format, supports all Hive-compatible types **Notes**: - TIME and UUID types are supported at the code level, but since Spark does not support these types as partition keys, test cases do not include related test scenarios - In actual production environments, if these types are used, the dynamic partition pruning feature can still work normally
…ke Tables (apache#53399) follow: apache#47025 This PR implements dynamic partition pruning based on runtime filters for Iceberg, Paimon, and Hudi data lake tables, extending and enhancing the previous PR [apache#47025](apache#47025). In PR [apache#47025](apache#47025), we implemented runtime filter-based dynamic partition pruning for Hive tables. However, due to significant differences in partition metadata formats between Iceberg, Paimon, Hudi and traditional Hive tables, specialized adaptation and implementation are required for these data lake formats. - During split generation in scan nodes, when `enable_runtime_filter_partition_prune` is enabled, call corresponding partition value extraction functions - Pass extracted partition values to backend through `TFileRangeDesc.data_lake_partition_values` field - Store partition values in `Map<String, String>` format, with keys as partition column names and values as serialized partition values - Process partition column information in `FileScanner::_generate_data_lake_partition_columns()` - Runtime filters can perform partition pruning based on this partition value information, avoiding scanning of non-matching partition files Dynamic partition pruning supports the following types of queries: ```sql -- Equality queries SELECT count(*) FROM iceberg_table WHERE partition_col = ( SELECT partition_col FROM iceberg_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); -- IN queries SELECT count(*) FROM paimon_table WHERE partition_col IN ( SELECT partition_col FROM paimon_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 2 ); -- Function expression queries SELECT count(*) FROM hudi_table WHERE abs(partition_col) = ( SELECT partition_col FROM hudi_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ); ``` Partition data types supported by each format: **Common Support**: - **Numeric types**: INT, BIGINT, DECIMAL, FLOAT, DOUBLE, TINYINT, SMALLINT - **String types**: STRING, VARCHAR, CHAR - **Date/time types**: DATE, TIMESTAMP - **Boolean type**: BOOLEAN - **Binary types**: BINARY (except for Paimon) **Format-specific Support**: - **Iceberg**: Additionally supports TIMESTAMP_NTZ type for timezone-free timestamps - **Paimon**: Does not support BINARY as partition key (currently binary as partition key causes issues in Spark) - **Hudi**: Based on Hive partition format, supports all Hive-compatible types **Notes**: - TIME and UUID types are supported at the code level, but since Spark does not support these types as partition keys, test cases do not include related test scenarios - In actual production environments, if these types are used, the dynamic partition pruning feature can still work normally
This PR implements partition pruning through runtime filters. When executing a SQL query like:
During execution, the backend (BE) will receive a dynamic runtime filter condition
partition_col = xxx. Since partition_col is a partitioning column, we can use its value to determine if the partition can be pruned.Additionally, this mechanism also supports filtering queries like:
If func cannot be evaluated at the frontend (FE), the frontend will not perform partition pruning. However, since the backend can compute func, this mechanism allows us to handle pruning scenarios that are not possible at the frontend, providing a more efficient pruning process on the backend side.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)