-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature Request] Merge performance improvements #1827
Comments
This is fantastic. Thats pretty good bump in performance! |
Great! Related #1812 |
## Description This change is a plain refactor that will help future work to improve overall merge performance, see #1827 It creates a base merge class that gather functionalities that are shared by the current insert-only and classic merge code paths to allow splitting them in a following PR. Shared code to collect merge statistics is added there already. This is a non-functional refactor covered by extensive merge tests, e.p. MergeIntoMetricsBase tests. Closes #1834 GitOrigin-RevId: caf346b4136e6738e30bd15219eaaeabbd833bd5
This changes is part of a larger effort to improve merge performance, see #1827 ## Description This change rewrites the way modified data is written out in merge to improve performance. `writeAllChanges` now generates a dataframe containing all the updated and copied rows to write out by building a large expression that selectively applies the right merge action to each row. This replaces the previous method that relied on applying a function to individual rows. Changes: - Move `findTouchedFiles` and `writeAllchanges` to a dedicated new trait `ClassicMergeExecutor` implementing the regular merge path when `InsertOnlyMergeExecutor` is not used. - Introduce methods in `MergeOutputGeneration` to transform the merge clauses into expressions that can be applied to generate the output of the merge operation (both main data and CDC data). This change fully preserve the behavior of merge which is extensively tested in `MergeIntoSuiteBase`, `MergeIntoSQLSuite`, `MergeIntoScalaSuite`, `MergeCDCSuite`, `MergeIntoMetricsBase`, `MergeIntoNotMatchedBySourceSuite`. Closes #1854 GitOrigin-RevId: d8c8a0e9439c6710978f2ec345cb94b2b9b19e0e
This changes is part of a larger effort to improve merge performance, see delta-io#1827 ## Description This change rewrites the way modified data is written out in merge to improve performance. `writeAllChanges` now generates a dataframe containing all the updated and copied rows to write out by building a large expression that selectively applies the right merge action to each row. This replaces the previous method that relied on applying a function to individual rows. Changes: - Move `findTouchedFiles` and `writeAllchanges` to a dedicated new trait `ClassicMergeExecutor` implementing the regular merge path when `InsertOnlyMergeExecutor` is not used. - Introduce methods in `MergeOutputGeneration` to transform the merge clauses into expressions that can be applied to generate the output of the merge operation (both main data and CDC data). This change fully preserve the behavior of merge which is extensively tested in `MergeIntoSuiteBase`, `MergeIntoSQLSuite`, `MergeIntoScalaSuite`, `MergeCDCSuite`, `MergeIntoMetricsBase`, `MergeIntoNotMatchedBySourceSuite`. Closes delta-io#1854 GitOrigin-RevId: d8c8a0e9439c6710978f2ec345cb94b2b9b19e0e
This changes is part of a larger effort to improve merge performance, see #1827 ## Description This change rewrites the way modified data is written out in merge to improve performance. `writeAllChanges` now generates a dataframe containing all the updated and copied rows to write out by building a large expression that selectively applies the right merge action to each row. This replaces the previous method that relied on applying a function to individual rows. Changes: - Move `findTouchedFiles` and `writeAllchanges` to a dedicated new trait `ClassicMergeExecutor` implementing the regular merge path when `InsertOnlyMergeExecutor` is not used. - Introduce methods in `MergeOutputGeneration` to transform the merge clauses into expressions that can be applied to generate the output of the merge operation (both main data and CDC data). This change fully preserve the behavior of merge which is extensively tested in `MergeIntoSuiteBase`, `MergeIntoSQLSuite`, `MergeIntoScalaSuite`, `MergeCDCSuite`, `MergeIntoMetricsBase`, `MergeIntoNotMatchedBySourceSuite`. Closes #1854 GitOrigin-RevId: d8c8a0e9439c6710978f2ec345cb94b2b9b19e0e
Closed by #1854! |
@johanl-db @scottsand-db, this issue is closed, are you still proceeding with #1851? |
#1852 was stacked on top of it but it got merged first, picking up the changes from #1851 in the process which wasn't intentional. That does mean the changes from #1851 are effectively merged and we can close the PR though we end up with a single commit for both #1851 and #1852. |
This PR extends the existing benchmarks with new test cases dedicated to the MERGE INTO command, with two scale factors: 1GB & 3TB. The following type of test cases are created and can be extended in the future: - `SingleInsertOnlyTestCase` - `MultipleInsertOnlyTestCase` - `DeleteOnlyTestCase` - `UpsertTestCase` Each test case uses the same (cloned) target table and defines its source table using the following parameters: - `fileMatchedFraction`: The fraction of target files sampled to create the source table. - `rowMatchedFraction: The fraction of rows sampled in each selected target file to form the rows that match the `ON` condition. - `rowNotMatchedFraction`: The fraction of rows sampled in each selected target file to form the rows that don't match the `ON` condition. The target and source tables are created using the `merge-1gb-delta-load`/`merge-3tb-delta-load`, which collect all the source table configurations used in merge test cases and creates the required source tables. This benchmark is added to measure the impact of a series of changes to the merge command, see #1827 I followed the instructions in https://github.com/delta-io/delta/tree/master/benchmarks to create an EMR cluster and run the new benchmarks. Here are the result comparing the impact of #1827: Test case | Base duration (s) | Test duration (s) | Improvement ratio -- | -- | -- | -- delete_only_fileMatchedFraction_0.05_rowMatchedFraction_0.05 | 26,1 | 20,5 | 1,27 multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05 | 8,8 | 15,2 | 0,58 multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.5 | 27,7 | 17,5 | 1,58 multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0 | 36,3 | 21,2 | 1,71 single_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05 | 14,9 | 14,8 | 1,01 single_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.5 | 17,5 | 17,3 | 1,01 single_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0 | 20,3 | 20,7 | 0,98 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.01_rowNotMatchedFraction_0.1 | 39,5 | 28,8 | 1,37 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.0_rowNotMatchedFraction_0.1 | 19,9 | 19,3 | 1,03 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.0 | 39,1 | 29,9 | 1,31 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.01 | 39,1 | 31 | 1,26 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.5_rowNotMatchedFraction_0.001 | 41,9 | 32,5 | 1,29 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.99_rowNotMatchedFraction_0.001 | 43,3 | 33,8 | 1,28 upsert_fileMatchedFraction_0.05_rowMatchedFraction_1.0_rowNotMatchedFraction_0.001 | 43,8 | 34,1 | 1,28 upsert_fileMatchedFraction_0.5_rowMatchedFraction_0.01_rowNotMatchedFraction_0.001 | 147,9 | 84,8 | 1,74 upsert_fileMatchedFraction_1.0_rowMatchedFraction_0.01_rowNotMatchedFraction_0.001 | 266,9 | 142,5 | 1,87 Closes #1835 GitOrigin-RevId: 443099e8a02b98fffe5e5a9ec2cecb5d3b8f9537
This PR extends the existing benchmarks with new test cases dedicated to the MERGE INTO command, with two scale factors: 1GB & 3TB. The following type of test cases are created and can be extended in the future: - `SingleInsertOnlyTestCase` - `MultipleInsertOnlyTestCase` - `DeleteOnlyTestCase` - `UpsertTestCase` Each test case uses the same (cloned) target table and defines its source table using the following parameters: - `fileMatchedFraction`: The fraction of target files sampled to create the source table. - `rowMatchedFraction: The fraction of rows sampled in each selected target file to form the rows that match the `ON` condition. - `rowNotMatchedFraction`: The fraction of rows sampled in each selected target file to form the rows that don't match the `ON` condition. The target and source tables are created using the `merge-1gb-delta-load`/`merge-3tb-delta-load`, which collect all the source table configurations used in merge test cases and creates the required source tables. This benchmark is added to measure the impact of a series of changes to the merge command, see delta-io#1827 I followed the instructions in https://github.com/delta-io/delta/tree/master/benchmarks to create an EMR cluster and run the new benchmarks. Here are the result comparing the impact of delta-io#1827: Test case | Base duration (s) | Test duration (s) | Improvement ratio -- | -- | -- | -- delete_only_fileMatchedFraction_0.05_rowMatchedFraction_0.05 | 26,1 | 20,5 | 1,27 multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05 | 8,8 | 15,2 | 0,58 multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.5 | 27,7 | 17,5 | 1,58 multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0 | 36,3 | 21,2 | 1,71 single_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05 | 14,9 | 14,8 | 1,01 single_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.5 | 17,5 | 17,3 | 1,01 single_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0 | 20,3 | 20,7 | 0,98 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.01_rowNotMatchedFraction_0.1 | 39,5 | 28,8 | 1,37 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.0_rowNotMatchedFraction_0.1 | 19,9 | 19,3 | 1,03 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.0 | 39,1 | 29,9 | 1,31 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.01 | 39,1 | 31 | 1,26 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.5_rowNotMatchedFraction_0.001 | 41,9 | 32,5 | 1,29 upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.99_rowNotMatchedFraction_0.001 | 43,3 | 33,8 | 1,28 upsert_fileMatchedFraction_0.05_rowMatchedFraction_1.0_rowNotMatchedFraction_0.001 | 43,8 | 34,1 | 1,28 upsert_fileMatchedFraction_0.5_rowMatchedFraction_0.01_rowNotMatchedFraction_0.001 | 147,9 | 84,8 | 1,74 upsert_fileMatchedFraction_1.0_rowMatchedFraction_0.01_rowNotMatchedFraction_0.001 | 266,9 | 142,5 | 1,87 Closes delta-io#1835 GitOrigin-RevId: 443099e8a02b98fffe5e5a9ec2cecb5d3b8f9537
What is the setup you used to produce the mentioned gain? Ex. your instance sizes and any configurations overwrite? |
Feature request
Overview
This feature request covers a number of performance improvements for the MERGE INTO command.
Motivation
I have a prototype for improvements to the MERGE command that shows up to 2x faster execution with the biggest improvements on delete only merge and merge touching a lot of files. The results have been gathered using new benchmark tests cases for merge that can be added to the existing benchmarks.
Performance improvement:
Further details
The following improvements are contributing to better overall performance:
Data skipping for matched only merges using matched condition: In the case where the merge statement only includes MATCHED clauses, we can benefit from data skipping using the target-only MATCHED conditions when scanning for the target for matches. For example, with:
we can skip scanning files using predicate:
target.value =5 OR target.value = 7
Dedicated path for insert only merges with more than 1 insert clause: The current code path for insert-only only supports 1 NOT MATCHED clause. We can extend it to support any number of NOT MATCHED clause.
More efficient classic merge execution: We can improve overall merge performance by optimizing the way we write modified rows: instead of processing individual partitions, we can build a large single expression to process all rows at once.
Native counter for metrics instead of UDFs: We use UDFs to increment metrics across MERGE/UPDATE/DELETE. We can instead introduce a dedicated native expression that can be codegened.
Project Plan
Willingness to contribute
The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?
The text was updated successfully, but these errors were encountered: