Skip to content

Commit

Permalink
[AMORO-1953][AMS] Improve the triggering conditions for segment file …
Browse files Browse the repository at this point in the history
…rewriting (apache#1954)

* [AMORO-1953][AMS] Improve the triggering conditions for segment file rewriting

* [AMORO-1953][AMS] Fix test error

* [AMORO-1953][AMS] Add comments
  • Loading branch information
zhongqishang authored and ShawHee committed Dec 29, 2023
1 parent f1be21a commit bd8eef9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,13 @@ public boolean fileShouldRewrite(DataFile dataFile, List<ContentFile<?>> deletes
if (isFragmentFile(dataFile)) {
return true;
}
return getRecordCount(deletes) > dataFile.recordCount() * config.getMajorDuplicateRatio();
// When Upsert writing is enabled in the Flink engine, both INSERT and UPDATE_AFTER will generate deletes files
// (Most are eq-delete), and eq-delete file will be associated with the data file before the current snapshot.
// The eq-delete does not accurately reflect how much data has been deleted in the current segment file
// (That is, whether the segment file needs to be rewritten).
// And the eq-delete file will be converted to pos-delete during minor optimizing, so only pos-delete record
// count is calculated here.
return getPosDeletesRecordCount(deletes) > dataFile.recordCount() * config.getMajorDuplicateRatio();
}

public boolean segmentFileShouldRewritePos(DataFile dataFile, List<ContentFile<?>> deletes) {
Expand All @@ -182,8 +188,9 @@ protected boolean isFullOptimizing() {
return reachFullInterval();
}

private long getRecordCount(List<ContentFile<?>> files) {
return files.stream().mapToLong(ContentFile::recordCount).sum();
private long getPosDeletesRecordCount(List<ContentFile<?>> files) {
return files.stream().filter(file -> file.content() == FileContent.POSITION_DELETES)
.mapToLong(ContentFile::recordCount).sum();
}

private void addDelete(ContentFile<?> delete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.netease.arctic.server.optimizing.flow.checker.DataConcurrencyChecker;
import com.netease.arctic.server.optimizing.flow.checker.FullOptimizingMove2HiveChecker;
import com.netease.arctic.server.optimizing.flow.checker.FullOptimizingWrite2HiveChecker;
import com.netease.arctic.server.optimizing.flow.checker.MajorOptimizingChecker;
import com.netease.arctic.server.optimizing.flow.checker.MinorOptimizingCheck;
import com.netease.arctic.server.optimizing.flow.checker.OptimizingCountChecker;
import com.netease.arctic.server.optimizing.flow.view.KeyedTableDataView;
Expand All @@ -55,7 +54,7 @@ public TestKeyedContinuousOptimizing(CatalogTestHelper catalogTestHelper, TableT
super(catalogTestHelper, tableTestHelper);
}

@Parameterized.Parameters(name = "{1}.{2}")
@Parameterized.Parameters(name = "{0}, {1}")
public static Object[] parameters() {
return new Object[][] {
{
Expand Down Expand Up @@ -114,7 +113,6 @@ public void run() throws Exception {
FullOptimizingWrite2HiveChecker fullOptimizingWrite2HiveChecker = new FullOptimizingWrite2HiveChecker(view);
FullOptimizingMove2HiveChecker fullOptimizingMove2HiveChecker = new FullOptimizingMove2HiveChecker(view);
MinorOptimizingCheck minorOptimizingCheck = new MinorOptimizingCheck();
MajorOptimizingChecker majorOptimizingChecker = new MajorOptimizingChecker();

CompleteOptimizingFlow.Builder builder = CompleteOptimizingFlow
.builder(table, availableCore)
Expand All @@ -124,8 +122,7 @@ public void run() throws Exception {
.setMinorTriggerFileCount(minorTriggerCount)
.addChecker(dataConcurrencyChecker)
.addChecker(optimizingCountChecker)
.addChecker(minorOptimizingCheck)
.addChecker(majorOptimizingChecker);
.addChecker(minorOptimizingCheck);

if (table.format() == TableFormat.MIXED_HIVE) {
builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public TestUnKeyedContinuousOptimizing(CatalogTestHelper catalogTestHelper, Tabl
super(catalogTestHelper, tableTestHelper);
}

@Parameterized.Parameters(name = "{1}.{2}")
@Parameterized.Parameters(name = "{0}, {1}")
public static Object[] parameters() {
return new Object[][] {
{
Expand Down

0 comments on commit bd8eef9

Please sign in to comment.