Skip to content

Commit

Permalink
[HUDI-7988] ListingBasedRollbackStrategy support log compact (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
KnightChess authored Jul 17, 2024
1 parent e97aedc commit 103b822
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
if (commitMetadataOptional.isPresent()) {
isCompaction.set(commitMetadataOptional.get().getOperationType() == WriteOperationType.COMPACT);
}
AtomicBoolean isLogCompaction = new AtomicBoolean(false);
if (commitMetadataOptional.isPresent()) {
isLogCompaction.set(commitMetadataOptional.get().getOperationType() == WriteOperationType.LOG_COMPACT);
}

return context.flatMap(partitionPaths, partitionPath -> {
List<HoodieRollbackRequest> hoodieRollbackRequests = new ArrayList<>(partitionPaths.size());
Expand All @@ -125,6 +129,9 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
if (isCompaction.get()) { // compaction's action in hoodie instant will be "commit". So, we might need to override.
action = HoodieTimeline.COMPACTION_ACTION;
}
if (isLogCompaction.get()) {
action = HoodieTimeline.LOG_COMPACTION_ACTION;
}
switch (action) {
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.REPLACE_COMMIT_ACTION:
Expand Down Expand Up @@ -152,6 +159,7 @@ public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant instantToRo
}
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.LOG_COMPACTION_ACTION:

// In case all data was inserts and the commit failed, delete the file belonging to that commit
// We do not know fileIds for inserts (first inserts are either log files or base files),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.hudi.table.action.rollback;

import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -36,12 +38,14 @@
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
Expand Down Expand Up @@ -152,14 +156,104 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, "002").doesMarkerDirExist());
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testMergeOnReadRollbackLogCompactActionExecutorWithListingStrategy(boolean isComplete) throws IOException {
//1. prepare data and assert data result
List<FileSlice> firstPartitionCommit2FileSlices = new ArrayList<>();
List<FileSlice> secondPartitionCommit2FileSlices = new ArrayList<>();
HoodieWriteConfig cfg = getConfigBuilder()
.withRollbackUsingMarkers(false).withAutoCommit(false)
.withMetadataConfig(
HoodieMetadataConfig.newBuilder().enable(true).build())
.build();
twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices, secondPartitionCommit2FileSlices, cfg, true);
List<HoodieLogFile> firstPartitionCommit2LogFiles = new ArrayList<>();
List<HoodieLogFile> secondPartitionCommit2LogFiles = new ArrayList<>();
firstPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile -> firstPartitionCommit2LogFiles.add(logFile));
assertEquals(1, firstPartitionCommit2LogFiles.size());
secondPartitionCommit2FileSlices.get(0).getLogFiles().collect(Collectors.toList()).forEach(logFile -> secondPartitionCommit2LogFiles.add(logFile));
assertEquals(1, secondPartitionCommit2LogFiles.size());

//2. log compact
cfg = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withLogCompactionBlocksThreshold(1)
.withMaxNumDeltaCommitsBeforeCompaction(1)
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_COMMITS).build())
.withRollbackUsingMarkers(false).withAutoCommit(false)
.withMetadataConfig(
HoodieMetadataConfig.newBuilder().enable(false).build())
.build();

String action = HoodieTimeline.LOG_COMPACTION_ACTION;
if (isComplete) {
cfg.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
action = HoodieTimeline.DELTA_COMMIT_ACTION;
}
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
client.scheduleLogCompactionAtInstant("003", Option.empty());
client.logCompact("003");

//3. rollback log compact
metaClient.reloadActiveTimeline();
HoodieInstant rollBackInstant = new HoodieInstant(!isComplete, action, "003");
HoodieTable table = this.getHoodieTable(metaClient, cfg);
BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, cfg, table, "004", rollBackInstant, false,
cfg.shouldRollbackUsingMarkers(), false);
mergeOnReadRollbackPlanActionExecutor.execute().get();
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
cfg,
table,
"004",
rollBackInstant,
true,
false);
//4. assert the rollback stat
final HoodieRollbackMetadata execute = mergeOnReadRollbackActionExecutor.execute();
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = execute.getPartitionMetadata();
assertEquals(2, rollbackMetadata.size());

for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : rollbackMetadata.entrySet()) {
HoodieRollbackPartitionMetadata meta = entry.getValue();
assertEquals(0, meta.getFailedDeleteFiles().size());
assertEquals(1, meta.getSuccessDeleteFiles().size());
}

//4. assert file group after rollback, and compare to the rollbackstat
// assert the first partition data and log file size
metaClient.reloadActiveTimeline();
table = this.getHoodieTable(metaClient, cfg);
List<HoodieFileGroup> firstPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionRollBack1FileGroups.size());
List<FileSlice> firstPartitionRollBack1FileSlices = firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
assertEquals(1, firstPartitionRollBack1FileSlices.size());
FileSlice firstPartitionRollBack1FileSlice = firstPartitionRollBack1FileSlices.get(0);
List<HoodieLogFile> firstPartitionRollBackLogFiles = firstPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
assertEquals(1, firstPartitionRollBackLogFiles.size());

// assert the second partition data and log file size
List<HoodieFileGroup> secondPartitionRollBack1FileGroups = table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, secondPartitionRollBack1FileGroups.size());
List<FileSlice> secondPartitionRollBack1FileSlices = secondPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
assertEquals(1, secondPartitionRollBack1FileSlices.size());
FileSlice secondPartitionRollBack1FileSlice = secondPartitionRollBack1FileSlices.get(0);
List<HoodieLogFile> secondPartitionRollBackLogFiles = secondPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
assertEquals(1, secondPartitionRollBackLogFiles.size());

assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, "003").doesMarkerDirExist());
}

@Test
public void testMergeOnReadRestoreCompactionCommit() throws IOException {
boolean isUsingMarkers = false;
HoodieWriteConfig cfg = getConfigBuilder().withRollbackUsingMarkers(isUsingMarkers).withAutoCommit(false).build();

// 1. ingest data to partition 3.
//just generate two partitions
HoodieTestDataGenerator dataGenPartition3 = new HoodieTestDataGenerator(new String[]{DEFAULT_THIRD_PARTITION_PATH});
HoodieTestDataGenerator dataGenPartition3 = new HoodieTestDataGenerator(new String[] {DEFAULT_THIRD_PARTITION_PATH});
HoodieTestDataGenerator.writePartitionMetadataDeprecated(storage,
new String[] {DEFAULT_THIRD_PARTITION_PATH}, basePath);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
Expand Down Expand Up @@ -262,9 +261,6 @@ public static <T extends SpecificRecordBase> T convertCommitMetadata(HoodieCommi
}
hoodieCommitMetadata.getPartitionToWriteStats().remove(null);
org.apache.hudi.avro.model.HoodieCommitMetadata avroMetaData = JsonUtils.getObjectMapper().convertValue(hoodieCommitMetadata, org.apache.hudi.avro.model.HoodieCommitMetadata.class);
if (hoodieCommitMetadata.getCompacted()) {
avroMetaData.setOperationType(WriteOperationType.COMPACT.name());
}
return (T) avroMetaData;
}

Expand Down

0 comments on commit 103b822

Please sign in to comment.