From 74504c2e7d01d2c825ef39c653d211ca0adbf575 Mon Sep 17 00:00:00 2001 From: Qishang Zhong Date: Tue, 12 Sep 2023 19:41:44 +0800 Subject: [PATCH] [AMORO-1953][AMS] Improve the triggering conditions for segment file rewriting (#1954) * [AMORO-1953][AMS] Improve the triggering conditions for segment file rewriting * [AMORO-1953][AMS] Fix test error * [AMORO-1953][AMS] Add comments --- .../optimizing/plan/CommonPartitionEvaluator.java | 13 ++++++++++--- .../flow/TestKeyedContinuousOptimizing.java | 7 ++----- .../flow/TestUnKeyedContinuousOptimizing.java | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java index 60d969578c..9b015f8311 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/CommonPartitionEvaluator.java @@ -159,7 +159,13 @@ public boolean fileShouldRewrite(DataFile dataFile, List> deletes if (isFragmentFile(dataFile)) { return true; } - return getRecordCount(deletes) > dataFile.recordCount() * config.getMajorDuplicateRatio(); + // When Upsert writing is enabled in the Flink engine, both INSERT and UPDATE_AFTER will generate deletes files + // (Most are eq-delete), and eq-delete file will be associated with the data file before the current snapshot. + // The eq-delete does not accurately reflect how much data has been deleted in the current segment file + // (That is, whether the segment file needs to be rewritten). + // And the eq-delete file will be converted to pos-delete during minor optimizing, so only pos-delete record + // count is calculated here. + return getPosDeletesRecordCount(deletes) > dataFile.recordCount() * config.getMajorDuplicateRatio(); } public boolean segmentFileShouldRewritePos(DataFile dataFile, List> deletes) { @@ -182,8 +188,9 @@ protected boolean isFullOptimizing() { return reachFullInterval(); } - private long getRecordCount(List> files) { - return files.stream().mapToLong(ContentFile::recordCount).sum(); + private long getPosDeletesRecordCount(List> files) { + return files.stream().filter(file -> file.content() == FileContent.POSITION_DELETES) + .mapToLong(ContentFile::recordCount).sum(); } private void addDelete(ContentFile delete) { diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/flow/TestKeyedContinuousOptimizing.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/flow/TestKeyedContinuousOptimizing.java index 1ee15c5825..7dd5084dce 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/flow/TestKeyedContinuousOptimizing.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/flow/TestKeyedContinuousOptimizing.java @@ -30,7 +30,6 @@ import com.netease.arctic.server.optimizing.flow.checker.DataConcurrencyChecker; import com.netease.arctic.server.optimizing.flow.checker.FullOptimizingMove2HiveChecker; import com.netease.arctic.server.optimizing.flow.checker.FullOptimizingWrite2HiveChecker; -import com.netease.arctic.server.optimizing.flow.checker.MajorOptimizingChecker; import com.netease.arctic.server.optimizing.flow.checker.MinorOptimizingCheck; import com.netease.arctic.server.optimizing.flow.checker.OptimizingCountChecker; import com.netease.arctic.server.optimizing.flow.view.KeyedTableDataView; @@ -55,7 +54,7 @@ public TestKeyedContinuousOptimizing(CatalogTestHelper catalogTestHelper, TableT super(catalogTestHelper, tableTestHelper); } - @Parameterized.Parameters(name = "{1}.{2}") + @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { return new Object[][] { { @@ -114,7 +113,6 @@ public void run() throws Exception { FullOptimizingWrite2HiveChecker fullOptimizingWrite2HiveChecker = new FullOptimizingWrite2HiveChecker(view); FullOptimizingMove2HiveChecker fullOptimizingMove2HiveChecker = new FullOptimizingMove2HiveChecker(view); MinorOptimizingCheck minorOptimizingCheck = new MinorOptimizingCheck(); - MajorOptimizingChecker majorOptimizingChecker = new MajorOptimizingChecker(); CompleteOptimizingFlow.Builder builder = CompleteOptimizingFlow .builder(table, availableCore) @@ -124,8 +122,7 @@ public void run() throws Exception { .setMinorTriggerFileCount(minorTriggerCount) .addChecker(dataConcurrencyChecker) .addChecker(optimizingCountChecker) - .addChecker(minorOptimizingCheck) - .addChecker(majorOptimizingChecker); + .addChecker(minorOptimizingCheck); if (table.format() == TableFormat.MIXED_HIVE) { builder diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/flow/TestUnKeyedContinuousOptimizing.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/flow/TestUnKeyedContinuousOptimizing.java index 987657e4e2..4535831343 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/flow/TestUnKeyedContinuousOptimizing.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/flow/TestUnKeyedContinuousOptimizing.java @@ -55,7 +55,7 @@ public TestUnKeyedContinuousOptimizing(CatalogTestHelper catalogTestHelper, Tabl super(catalogTestHelper, tableTestHelper); } - @Parameterized.Parameters(name = "{1}.{2}") + @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { return new Object[][] { {