[feat](iceberg) Implement Iceberg rewrite_data_files action for table optimization and compaction#56413
Conversation
962ad3d to
d46f90b
Compare
a6525d4 to
08dd72f
Compare
rewrite_data_files action for table optimization and compaction
f79ed01 to
4074c6f
Compare
.../src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteDataFilesAction.java
Show resolved
Hide resolved
5d24aa4 to
bf30fe1
Compare
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
ClickBench: Total hot run time: 28.9 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
ClickBench: Total hot run time: 28.87 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run external |
ClickBench: Total hot run time: 28.16 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
Issue Number: apache/doris#56002 Related PR: apache/doris#55679 apache/doris#56638 apache/doris#56413
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
…le optimization and compaction (apache#56413) **Issue Number:** apache#56002 **Related PR:** apache#55679 apache#56638 This PR implements the `rewrite_data_files` action for Apache Iceberg tables in Doris, providing comprehensive table optimization and data file compaction capabilities. This feature allows users to reorganize data files to improve query performance, optimize storage efficiency, and maintain delete files according to Iceberg's official specification. --- This PR implements the `rewrite_data_files` operation for Iceberg tables, providing table optimization and data file compaction capabilities. The feature follows Iceberg's official `RewriteDataFiles` specification and provides the following core capabilities: 1. **Data File Compaction**: Merges multiple small files into larger files, reducing file count and improving query performance 2. **Storage Efficiency Optimization**: Reduces storage overhead through file reorganization and optimizes data distribution 3. **Delete File Management**: Properly handles and maintains delete files, reducing filtering overhead during queries 4. **WHERE Condition Support**: Supports rewriting specific data ranges through WHERE conditions, including various data types (BIGINT, STRING, INT, DOUBLE, BOOLEAN, DATE, TIMESTAMP, DECIMAL) and complex conditional expressions 5. **Concurrent Execution**: Supports concurrent execution of multiple rewrite tasks for improved processing efficiency After execution, detailed statistics are returned, including: - `rewritten_data_files_count`: Number of data files that were rewritten - `added_data_files_count`: Number of new data files generated - `rewritten_bytes_count`: Number of bytes rewritten - `removed_delete_files_count`: Number of delete files removed --- ```sql -- Rewrite data files with default parameters ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files(); ``` ```sql -- Specify target file size and minimum input files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3" ); ``` ```sql -- Rewrite only data within specific date range ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3", "delete-ratio-threshold" = "0.2" ) WHERE created_date >= '2024-01-01' AND status = 'active'; -- Rewrite data satisfying complex conditions ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "536870912" ) WHERE age > 25 AND salary > 50000.0 AND is_active = true; ``` ```sql -- Ignore file size limits and rewrite all files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files("rewrite-all" = "true"); ``` ```sql -- Trigger rewrite when delete file count or ratio exceeds threshold ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "delete-file-threshold" = "10", "delete-ratio-threshold" = "0.3" ); ``` --- | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `target-file-size-bytes` | Long | 536870912 (512MB) | Target size in bytes for output files | | `min-file-size-bytes` | Long | 0 (auto-calculated as 75% of target) | Minimum file size in bytes for files to be rewritten | | `max-file-size-bytes` | Long | 0 (auto-calculated as 180% of target) | Maximum file size in bytes for files to be rewritten | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `min-input-files` | Int | 5 | Minimum number of input files to rewrite together | | `rewrite-all` | Boolean | false | Whether to rewrite all files regardless of size | | `max-file-group-size-bytes` | Long | 107374182400 (100GB) | Maximum size in bytes for a file group to be rewritten | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `delete-file-threshold` | Int | Integer.MAX_VALUE | Minimum number of delete files to trigger rewrite | | `delete-ratio-threshold` | Double | 0.3 | Minimum ratio of delete records to total records to trigger rewrite (0.0-1.0) | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `output-spec-id` | Long | 2 | Partition specification ID for output files | - If `min-file-size-bytes` is not specified, default value is `target-file-size-bytes * 0.75` - If `max-file-size-bytes` is not specified, default value is `target-file-size-bytes * 1.8` - File groups are only rewritten when they meet the `min-input-files` condition - `delete-file-threshold` and `delete-ratio-threshold` are used to determine if rewrite is needed to handle delete files --- ``` 1. Parameter Validation and Table Retrieval ├─ Validate rewrite parameters ├─ Get Iceberg table reference └─ Check if table has data snapshots 2. File Planning and Grouping ├─ Use RewriteDataFilePlanner to plan file scan tasks ├─ Filter file scan tasks based on WHERE conditions ├─ Organize file groups by partition and size constraints └─ Filter file groups that don't meet rewrite conditions 3. Concurrent Rewrite Execution ├─ Create RewriteDataFileExecutor ├─ Execute multiple file group rewrite tasks concurrently ├─ Each task executes INSERT-SELECT statements └─ Wait for all tasks to complete 4. Transaction Commit and Result Return ├─ Commit transaction and create new snapshot ├─ Update table metadata └─ Return detailed execution result statistics ``` - Validate all parameters for validity and value ranges - If table has no snapshots, return empty result directly - Calculate default values for `min-file-size-bytes` and `max-file-size-bytes` based on parameters - **File Scanning**: Build `TableScan` based on WHERE conditions to get qualified `FileScanTask` - **File Filtering**: Filter files based on `min-file-size-bytes`, `max-file-size-bytes`, and `rewrite-all` parameters - **Partition Grouping**: Group files into `RewriteDataGroup` by partition specification - **Size Constraints**: Ensure each file group doesn't exceed `max-file-group-size-bytes` - **Delete File Check**: Determine if rewrite is needed based on `delete-file-threshold` and `delete-ratio-threshold` - **Task Creation**: Create `RewriteGroupTask` for each `RewriteDataGroup` - **Concurrent Execution**: Use thread pool to execute multiple rewrite tasks concurrently - **Data Writing**: Each task executes `INSERT INTO ... SELECT FROM ...` statements to write data to new files - **Progress Tracking**: Use atomic counters and `CountDownLatch` to track task completion - **Transaction Management**: Use `IcebergTransaction` to manage transactions, ensuring atomicity - **Metadata Update**: Commit transaction to create new snapshot and update table metadata - **Result Statistics**: Aggregate execution results from all tasks and return statistics
…le optimization and compaction (apache#56413) **Issue Number:** apache#56002 **Related PR:** apache#55679 apache#56638 This PR implements the `rewrite_data_files` action for Apache Iceberg tables in Doris, providing comprehensive table optimization and data file compaction capabilities. This feature allows users to reorganize data files to improve query performance, optimize storage efficiency, and maintain delete files according to Iceberg's official specification. --- This PR implements the `rewrite_data_files` operation for Iceberg tables, providing table optimization and data file compaction capabilities. The feature follows Iceberg's official `RewriteDataFiles` specification and provides the following core capabilities: 1. **Data File Compaction**: Merges multiple small files into larger files, reducing file count and improving query performance 2. **Storage Efficiency Optimization**: Reduces storage overhead through file reorganization and optimizes data distribution 3. **Delete File Management**: Properly handles and maintains delete files, reducing filtering overhead during queries 4. **WHERE Condition Support**: Supports rewriting specific data ranges through WHERE conditions, including various data types (BIGINT, STRING, INT, DOUBLE, BOOLEAN, DATE, TIMESTAMP, DECIMAL) and complex conditional expressions 5. **Concurrent Execution**: Supports concurrent execution of multiple rewrite tasks for improved processing efficiency After execution, detailed statistics are returned, including: - `rewritten_data_files_count`: Number of data files that were rewritten - `added_data_files_count`: Number of new data files generated - `rewritten_bytes_count`: Number of bytes rewritten - `removed_delete_files_count`: Number of delete files removed --- ```sql -- Rewrite data files with default parameters ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files(); ``` ```sql -- Specify target file size and minimum input files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3" ); ``` ```sql -- Rewrite only data within specific date range ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3", "delete-ratio-threshold" = "0.2" ) WHERE created_date >= '2024-01-01' AND status = 'active'; -- Rewrite data satisfying complex conditions ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "536870912" ) WHERE age > 25 AND salary > 50000.0 AND is_active = true; ``` ```sql -- Ignore file size limits and rewrite all files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files("rewrite-all" = "true"); ``` ```sql -- Trigger rewrite when delete file count or ratio exceeds threshold ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "delete-file-threshold" = "10", "delete-ratio-threshold" = "0.3" ); ``` --- | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `target-file-size-bytes` | Long | 536870912 (512MB) | Target size in bytes for output files | | `min-file-size-bytes` | Long | 0 (auto-calculated as 75% of target) | Minimum file size in bytes for files to be rewritten | | `max-file-size-bytes` | Long | 0 (auto-calculated as 180% of target) | Maximum file size in bytes for files to be rewritten | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `min-input-files` | Int | 5 | Minimum number of input files to rewrite together | | `rewrite-all` | Boolean | false | Whether to rewrite all files regardless of size | | `max-file-group-size-bytes` | Long | 107374182400 (100GB) | Maximum size in bytes for a file group to be rewritten | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `delete-file-threshold` | Int | Integer.MAX_VALUE | Minimum number of delete files to trigger rewrite | | `delete-ratio-threshold` | Double | 0.3 | Minimum ratio of delete records to total records to trigger rewrite (0.0-1.0) | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `output-spec-id` | Long | 2 | Partition specification ID for output files | - If `min-file-size-bytes` is not specified, default value is `target-file-size-bytes * 0.75` - If `max-file-size-bytes` is not specified, default value is `target-file-size-bytes * 1.8` - File groups are only rewritten when they meet the `min-input-files` condition - `delete-file-threshold` and `delete-ratio-threshold` are used to determine if rewrite is needed to handle delete files --- ``` 1. Parameter Validation and Table Retrieval ├─ Validate rewrite parameters ├─ Get Iceberg table reference └─ Check if table has data snapshots 2. File Planning and Grouping ├─ Use RewriteDataFilePlanner to plan file scan tasks ├─ Filter file scan tasks based on WHERE conditions ├─ Organize file groups by partition and size constraints └─ Filter file groups that don't meet rewrite conditions 3. Concurrent Rewrite Execution ├─ Create RewriteDataFileExecutor ├─ Execute multiple file group rewrite tasks concurrently ├─ Each task executes INSERT-SELECT statements └─ Wait for all tasks to complete 4. Transaction Commit and Result Return ├─ Commit transaction and create new snapshot ├─ Update table metadata └─ Return detailed execution result statistics ``` - Validate all parameters for validity and value ranges - If table has no snapshots, return empty result directly - Calculate default values for `min-file-size-bytes` and `max-file-size-bytes` based on parameters - **File Scanning**: Build `TableScan` based on WHERE conditions to get qualified `FileScanTask` - **File Filtering**: Filter files based on `min-file-size-bytes`, `max-file-size-bytes`, and `rewrite-all` parameters - **Partition Grouping**: Group files into `RewriteDataGroup` by partition specification - **Size Constraints**: Ensure each file group doesn't exceed `max-file-group-size-bytes` - **Delete File Check**: Determine if rewrite is needed based on `delete-file-threshold` and `delete-ratio-threshold` - **Task Creation**: Create `RewriteGroupTask` for each `RewriteDataGroup` - **Concurrent Execution**: Use thread pool to execute multiple rewrite tasks concurrently - **Data Writing**: Each task executes `INSERT INTO ... SELECT FROM ...` statements to write data to new files - **Progress Tracking**: Use atomic counters and `CountDownLatch` to track task completion - **Transaction Management**: Use `IcebergTransaction` to manage transactions, ensuring atomicity - **Metadata Update**: Commit transaction to create new snapshot and update table metadata - **Result Statistics**: Aggregate execution results from all tasks and return statistics
…le optimization and compaction (apache#56413) **Issue Number:** apache#56002 **Related PR:** apache#55679 apache#56638 This PR implements the `rewrite_data_files` action for Apache Iceberg tables in Doris, providing comprehensive table optimization and data file compaction capabilities. This feature allows users to reorganize data files to improve query performance, optimize storage efficiency, and maintain delete files according to Iceberg's official specification. --- This PR implements the `rewrite_data_files` operation for Iceberg tables, providing table optimization and data file compaction capabilities. The feature follows Iceberg's official `RewriteDataFiles` specification and provides the following core capabilities: 1. **Data File Compaction**: Merges multiple small files into larger files, reducing file count and improving query performance 2. **Storage Efficiency Optimization**: Reduces storage overhead through file reorganization and optimizes data distribution 3. **Delete File Management**: Properly handles and maintains delete files, reducing filtering overhead during queries 4. **WHERE Condition Support**: Supports rewriting specific data ranges through WHERE conditions, including various data types (BIGINT, STRING, INT, DOUBLE, BOOLEAN, DATE, TIMESTAMP, DECIMAL) and complex conditional expressions 5. **Concurrent Execution**: Supports concurrent execution of multiple rewrite tasks for improved processing efficiency After execution, detailed statistics are returned, including: - `rewritten_data_files_count`: Number of data files that were rewritten - `added_data_files_count`: Number of new data files generated - `rewritten_bytes_count`: Number of bytes rewritten - `removed_delete_files_count`: Number of delete files removed --- ```sql -- Rewrite data files with default parameters ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files(); ``` ```sql -- Specify target file size and minimum input files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3" ); ``` ```sql -- Rewrite only data within specific date range ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3", "delete-ratio-threshold" = "0.2" ) WHERE created_date >= '2024-01-01' AND status = 'active'; -- Rewrite data satisfying complex conditions ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "536870912" ) WHERE age > 25 AND salary > 50000.0 AND is_active = true; ``` ```sql -- Ignore file size limits and rewrite all files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files("rewrite-all" = "true"); ``` ```sql -- Trigger rewrite when delete file count or ratio exceeds threshold ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "delete-file-threshold" = "10", "delete-ratio-threshold" = "0.3" ); ``` --- | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `target-file-size-bytes` | Long | 536870912 (512MB) | Target size in bytes for output files | | `min-file-size-bytes` | Long | 0 (auto-calculated as 75% of target) | Minimum file size in bytes for files to be rewritten | | `max-file-size-bytes` | Long | 0 (auto-calculated as 180% of target) | Maximum file size in bytes for files to be rewritten | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `min-input-files` | Int | 5 | Minimum number of input files to rewrite together | | `rewrite-all` | Boolean | false | Whether to rewrite all files regardless of size | | `max-file-group-size-bytes` | Long | 107374182400 (100GB) | Maximum size in bytes for a file group to be rewritten | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `delete-file-threshold` | Int | Integer.MAX_VALUE | Minimum number of delete files to trigger rewrite | | `delete-ratio-threshold` | Double | 0.3 | Minimum ratio of delete records to total records to trigger rewrite (0.0-1.0) | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `output-spec-id` | Long | 2 | Partition specification ID for output files | - If `min-file-size-bytes` is not specified, default value is `target-file-size-bytes * 0.75` - If `max-file-size-bytes` is not specified, default value is `target-file-size-bytes * 1.8` - File groups are only rewritten when they meet the `min-input-files` condition - `delete-file-threshold` and `delete-ratio-threshold` are used to determine if rewrite is needed to handle delete files --- ``` 1. Parameter Validation and Table Retrieval ├─ Validate rewrite parameters ├─ Get Iceberg table reference └─ Check if table has data snapshots 2. File Planning and Grouping ├─ Use RewriteDataFilePlanner to plan file scan tasks ├─ Filter file scan tasks based on WHERE conditions ├─ Organize file groups by partition and size constraints └─ Filter file groups that don't meet rewrite conditions 3. Concurrent Rewrite Execution ├─ Create RewriteDataFileExecutor ├─ Execute multiple file group rewrite tasks concurrently ├─ Each task executes INSERT-SELECT statements └─ Wait for all tasks to complete 4. Transaction Commit and Result Return ├─ Commit transaction and create new snapshot ├─ Update table metadata └─ Return detailed execution result statistics ``` - Validate all parameters for validity and value ranges - If table has no snapshots, return empty result directly - Calculate default values for `min-file-size-bytes` and `max-file-size-bytes` based on parameters - **File Scanning**: Build `TableScan` based on WHERE conditions to get qualified `FileScanTask` - **File Filtering**: Filter files based on `min-file-size-bytes`, `max-file-size-bytes`, and `rewrite-all` parameters - **Partition Grouping**: Group files into `RewriteDataGroup` by partition specification - **Size Constraints**: Ensure each file group doesn't exceed `max-file-group-size-bytes` - **Delete File Check**: Determine if rewrite is needed based on `delete-file-threshold` and `delete-ratio-threshold` - **Task Creation**: Create `RewriteGroupTask` for each `RewriteDataGroup` - **Concurrent Execution**: Use thread pool to execute multiple rewrite tasks concurrently - **Data Writing**: Each task executes `INSERT INTO ... SELECT FROM ...` statements to write data to new files - **Progress Tracking**: Use atomic counters and `CountDownLatch` to track task completion - **Transaction Management**: Use `IcebergTransaction` to manage transactions, ensuring atomicity - **Metadata Update**: Commit transaction to create new snapshot and update table metadata - **Result Statistics**: Aggregate execution results from all tasks and return statistics
…le optimization and compaction (apache#56413) **Issue Number:** apache#56002 **Related PR:** apache#55679 apache#56638 This PR implements the `rewrite_data_files` action for Apache Iceberg tables in Doris, providing comprehensive table optimization and data file compaction capabilities. This feature allows users to reorganize data files to improve query performance, optimize storage efficiency, and maintain delete files according to Iceberg's official specification. --- This PR implements the `rewrite_data_files` operation for Iceberg tables, providing table optimization and data file compaction capabilities. The feature follows Iceberg's official `RewriteDataFiles` specification and provides the following core capabilities: 1. **Data File Compaction**: Merges multiple small files into larger files, reducing file count and improving query performance 2. **Storage Efficiency Optimization**: Reduces storage overhead through file reorganization and optimizes data distribution 3. **Delete File Management**: Properly handles and maintains delete files, reducing filtering overhead during queries 4. **WHERE Condition Support**: Supports rewriting specific data ranges through WHERE conditions, including various data types (BIGINT, STRING, INT, DOUBLE, BOOLEAN, DATE, TIMESTAMP, DECIMAL) and complex conditional expressions 5. **Concurrent Execution**: Supports concurrent execution of multiple rewrite tasks for improved processing efficiency After execution, detailed statistics are returned, including: - `rewritten_data_files_count`: Number of data files that were rewritten - `added_data_files_count`: Number of new data files generated - `rewritten_bytes_count`: Number of bytes rewritten - `removed_delete_files_count`: Number of delete files removed --- ```sql -- Rewrite data files with default parameters ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files(); ``` ```sql -- Specify target file size and minimum input files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3" ); ``` ```sql -- Rewrite only data within specific date range ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3", "delete-ratio-threshold" = "0.2" ) WHERE created_date >= '2024-01-01' AND status = 'active'; -- Rewrite data satisfying complex conditions ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "536870912" ) WHERE age > 25 AND salary > 50000.0 AND is_active = true; ``` ```sql -- Ignore file size limits and rewrite all files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files("rewrite-all" = "true"); ``` ```sql -- Trigger rewrite when delete file count or ratio exceeds threshold ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "delete-file-threshold" = "10", "delete-ratio-threshold" = "0.3" ); ``` --- | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `target-file-size-bytes` | Long | 536870912 (512MB) | Target size in bytes for output files | | `min-file-size-bytes` | Long | 0 (auto-calculated as 75% of target) | Minimum file size in bytes for files to be rewritten | | `max-file-size-bytes` | Long | 0 (auto-calculated as 180% of target) | Maximum file size in bytes for files to be rewritten | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `min-input-files` | Int | 5 | Minimum number of input files to rewrite together | | `rewrite-all` | Boolean | false | Whether to rewrite all files regardless of size | | `max-file-group-size-bytes` | Long | 107374182400 (100GB) | Maximum size in bytes for a file group to be rewritten | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `delete-file-threshold` | Int | Integer.MAX_VALUE | Minimum number of delete files to trigger rewrite | | `delete-ratio-threshold` | Double | 0.3 | Minimum ratio of delete records to total records to trigger rewrite (0.0-1.0) | | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `output-spec-id` | Long | 2 | Partition specification ID for output files | - If `min-file-size-bytes` is not specified, default value is `target-file-size-bytes * 0.75` - If `max-file-size-bytes` is not specified, default value is `target-file-size-bytes * 1.8` - File groups are only rewritten when they meet the `min-input-files` condition - `delete-file-threshold` and `delete-ratio-threshold` are used to determine if rewrite is needed to handle delete files --- ``` 1. Parameter Validation and Table Retrieval ├─ Validate rewrite parameters ├─ Get Iceberg table reference └─ Check if table has data snapshots 2. File Planning and Grouping ├─ Use RewriteDataFilePlanner to plan file scan tasks ├─ Filter file scan tasks based on WHERE conditions ├─ Organize file groups by partition and size constraints └─ Filter file groups that don't meet rewrite conditions 3. Concurrent Rewrite Execution ├─ Create RewriteDataFileExecutor ├─ Execute multiple file group rewrite tasks concurrently ├─ Each task executes INSERT-SELECT statements └─ Wait for all tasks to complete 4. Transaction Commit and Result Return ├─ Commit transaction and create new snapshot ├─ Update table metadata └─ Return detailed execution result statistics ``` - Validate all parameters for validity and value ranges - If table has no snapshots, return empty result directly - Calculate default values for `min-file-size-bytes` and `max-file-size-bytes` based on parameters - **File Scanning**: Build `TableScan` based on WHERE conditions to get qualified `FileScanTask` - **File Filtering**: Filter files based on `min-file-size-bytes`, `max-file-size-bytes`, and `rewrite-all` parameters - **Partition Grouping**: Group files into `RewriteDataGroup` by partition specification - **Size Constraints**: Ensure each file group doesn't exceed `max-file-group-size-bytes` - **Delete File Check**: Determine if rewrite is needed based on `delete-file-threshold` and `delete-ratio-threshold` - **Task Creation**: Create `RewriteGroupTask` for each `RewriteDataGroup` - **Concurrent Execution**: Use thread pool to execute multiple rewrite tasks concurrently - **Data Writing**: Each task executes `INSERT INTO ... SELECT FROM ...` statements to write data to new files - **Progress Tracking**: Use atomic counters and `CountDownLatch` to track task completion - **Transaction Management**: Use `IcebergTransaction` to manage transactions, ensuring atomicity - **Metadata Update**: Commit transaction to create new snapshot and update table metadata - **Result Statistics**: Aggregate execution results from all tasks and return statistics
…le optimization and compaction (apache#56413) ### What problem does this PR solve? **Issue Number:** apache#56002 **Related PR:** apache#55679 apache#56638 This PR implements the `rewrite_data_files` action for Apache Iceberg tables in Doris, providing comprehensive table optimization and data file compaction capabilities. This feature allows users to reorganize data files to improve query performance, optimize storage efficiency, and maintain delete files according to Iceberg's official specification. --- ## Feature Description This PR implements the `rewrite_data_files` operation for Iceberg tables, providing table optimization and data file compaction capabilities. The feature follows Iceberg's official `RewriteDataFiles` specification and provides the following core capabilities: 1. **Data File Compaction**: Merges multiple small files into larger files, reducing file count and improving query performance 2. **Storage Efficiency Optimization**: Reduces storage overhead through file reorganization and optimizes data distribution 3. **Delete File Management**: Properly handles and maintains delete files, reducing filtering overhead during queries 4. **WHERE Condition Support**: Supports rewriting specific data ranges through WHERE conditions, including various data types (BIGINT, STRING, INT, DOUBLE, BOOLEAN, DATE, TIMESTAMP, DECIMAL) and complex conditional expressions 5. **Concurrent Execution**: Supports concurrent execution of multiple rewrite tasks for improved processing efficiency After execution, detailed statistics are returned, including: - `rewritten_data_files_count`: Number of data files that were rewritten - `added_data_files_count`: Number of new data files generated - `rewritten_bytes_count`: Number of bytes rewritten - `removed_delete_files_count`: Number of delete files removed --- ## Usage Example ### Basic Usage ```sql -- Rewrite data files with default parameters ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files(); ``` ### Custom Parameters ```sql -- Specify target file size and minimum input files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3" ); ``` ### Rewrite with WHERE Conditions ```sql -- Rewrite only data within specific date range ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "104857600", "min-input-files" = "3", "delete-ratio-threshold" = "0.2" ) WHERE created_date >= '2024-01-01' AND status = 'active'; -- Rewrite data satisfying complex conditions ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "target-file-size-bytes" = "536870912" ) WHERE age > 25 AND salary > 50000.0 AND is_active = true; ``` ### Rewrite All Files ```sql -- Ignore file size limits and rewrite all files ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files("rewrite-all" = "true"); ``` ### Handle Delete Files ```sql -- Trigger rewrite when delete file count or ratio exceeds threshold ALTER TABLE iceberg_catalog.db.table EXECUTE rewrite_data_files( "delete-file-threshold" = "10", "delete-ratio-threshold" = "0.3" ); ``` --- ## Parameter List ### File Size Parameters | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `target-file-size-bytes` | Long | 536870912 (512MB) | Target size in bytes for output files | | `min-file-size-bytes` | Long | 0 (auto-calculated as 75% of target) | Minimum file size in bytes for files to be rewritten | | `max-file-size-bytes` | Long | 0 (auto-calculated as 180% of target) | Maximum file size in bytes for files to be rewritten | ### Input Files Parameters | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `min-input-files` | Int | 5 | Minimum number of input files to rewrite together | | `rewrite-all` | Boolean | false | Whether to rewrite all files regardless of size | | `max-file-group-size-bytes` | Long | 107374182400 (100GB) | Maximum size in bytes for a file group to be rewritten | ### Delete Files Parameters | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `delete-file-threshold` | Int | Integer.MAX_VALUE | Minimum number of delete files to trigger rewrite | | `delete-ratio-threshold` | Double | 0.3 | Minimum ratio of delete records to total records to trigger rewrite (0.0-1.0) | ### Output Specification Parameters | Parameter Name | Type | Default Value | Description | |----------------|------|---------------|-------------| | `output-spec-id` | Long | 2 | Partition specification ID for output files | ### Parameter Notes - If `min-file-size-bytes` is not specified, default value is `target-file-size-bytes * 0.75` - If `max-file-size-bytes` is not specified, default value is `target-file-size-bytes * 1.8` - File groups are only rewritten when they meet the `min-input-files` condition - `delete-file-threshold` and `delete-ratio-threshold` are used to determine if rewrite is needed to handle delete files --- ## Execution Flow ### Overall Process ``` 1. Parameter Validation and Table Retrieval ├─ Validate rewrite parameters ├─ Get Iceberg table reference └─ Check if table has data snapshots 2. File Planning and Grouping ├─ Use RewriteDataFilePlanner to plan file scan tasks ├─ Filter file scan tasks based on WHERE conditions ├─ Organize file groups by partition and size constraints └─ Filter file groups that don't meet rewrite conditions 3. Concurrent Rewrite Execution ├─ Create RewriteDataFileExecutor ├─ Execute multiple file group rewrite tasks concurrently ├─ Each task executes INSERT-SELECT statements └─ Wait for all tasks to complete 4. Transaction Commit and Result Return ├─ Commit transaction and create new snapshot ├─ Update table metadata └─ Return detailed execution result statistics ``` ### Detailed Steps #### Step 1: Parameter Validation and Table Retrieval - Validate all parameters for validity and value ranges - If table has no snapshots, return empty result directly - Calculate default values for `min-file-size-bytes` and `max-file-size-bytes` based on parameters #### Step 2: File Planning and Grouping (RewriteDataFilePlanner) - **File Scanning**: Build `TableScan` based on WHERE conditions to get qualified `FileScanTask` - **File Filtering**: Filter files based on `min-file-size-bytes`, `max-file-size-bytes`, and `rewrite-all` parameters - **Partition Grouping**: Group files into `RewriteDataGroup` by partition specification - **Size Constraints**: Ensure each file group doesn't exceed `max-file-group-size-bytes` - **Delete File Check**: Determine if rewrite is needed based on `delete-file-threshold` and `delete-ratio-threshold` #### Step 3: Concurrent Rewrite Execution (RewriteDataFileExecutor) - **Task Creation**: Create `RewriteGroupTask` for each `RewriteDataGroup` - **Concurrent Execution**: Use thread pool to execute multiple rewrite tasks concurrently - **Data Writing**: Each task executes `INSERT INTO ... SELECT FROM ...` statements to write data to new files - **Progress Tracking**: Use atomic counters and `CountDownLatch` to track task completion #### Step 4: Transaction Commit and Result Return - **Transaction Management**: Use `IcebergTransaction` to manage transactions, ensuring atomicity - **Metadata Update**: Commit transaction to create new snapshot and update table metadata - **Result Statistics**: Aggregate execution results from all tasks and return statistics
What problem does this PR solve?
Issue Number: #56002
Related PR: #55679 #56638
This PR implements the
rewrite_data_filesaction for Apache Iceberg tables in Doris, providing comprehensive table optimization and data file compaction capabilities. This feature allows users to reorganize data files to improve query performance, optimize storage efficiency, and maintain delete files according to Iceberg's official specification.Feature Description
This PR implements the
rewrite_data_filesoperation for Iceberg tables, providing table optimization and data file compaction capabilities. The feature follows Iceberg's officialRewriteDataFilesspecification and provides the following core capabilities:After execution, detailed statistics are returned, including:
rewritten_data_files_count: Number of data files that were rewrittenadded_data_files_count: Number of new data files generatedrewritten_bytes_count: Number of bytes rewrittenremoved_delete_files_count: Number of delete files removedUsage Example
Basic Usage
Custom Parameters
Rewrite with WHERE Conditions
Rewrite All Files
Handle Delete Files
Parameter List
File Size Parameters
target-file-size-bytesmin-file-size-bytesmax-file-size-bytesInput Files Parameters
min-input-filesrewrite-allmax-file-group-size-bytesDelete Files Parameters
delete-file-thresholddelete-ratio-thresholdOutput Specification Parameters
output-spec-idParameter Notes
min-file-size-bytesis not specified, default value istarget-file-size-bytes * 0.75max-file-size-bytesis not specified, default value istarget-file-size-bytes * 1.8min-input-filesconditiondelete-file-thresholdanddelete-ratio-thresholdare used to determine if rewrite is needed to handle delete filesExecution Flow
Overall Process
Detailed Steps
Step 1: Parameter Validation and Table Retrieval
min-file-size-bytesandmax-file-size-bytesbased on parametersStep 2: File Planning and Grouping (RewriteDataFilePlanner)
TableScanbased on WHERE conditions to get qualifiedFileScanTaskmin-file-size-bytes,max-file-size-bytes, andrewrite-allparametersRewriteDataGroupby partition specificationmax-file-group-size-bytesdelete-file-thresholdanddelete-ratio-thresholdStep 3: Concurrent Rewrite Execution (RewriteDataFileExecutor)
RewriteGroupTaskfor eachRewriteDataGroupINSERT INTO ... SELECT FROM ...statements to write data to new filesCountDownLatchto track task completionStep 4: Transaction Commit and Result Return
IcebergTransactionto manage transactions, ensuring atomicityCheck List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)