Skip to content

Commit

Permalink
[ARCTIC-1095][AMS] Add the sequence number for the native iceberg tab…
Browse files Browse the repository at this point in the history
…le when the major optimizing commit (apache#1101)

* fix apache#1095
Adding the sequence number in the plan when the major commit for the native iceberg table

* fix code review

* add comment

---------

Co-authored-by: luting <dylzlt93299@gmail.com>
  • Loading branch information
2 people authored and ShawHee committed Dec 29, 2023
1 parent aa34615 commit 1b6530b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,31 @@ private void majorCommit(ArcticTable arcticTable,
}).filter(Objects::nonNull).collect(Collectors.toSet());

// rewrite DataFiles
RewriteFiles rewriteFiles = baseArcticTable.newRewrite();
rewriteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name());
RewriteFiles rewriteDataFiles = baseArcticTable.newRewrite();
if (baseSnapshotId != TableOptimizeRuntime.INVALID_SNAPSHOT_ID) {
rewriteFiles.validateFromSnapshot(baseSnapshotId);
rewriteDataFiles.validateFromSnapshot(baseSnapshotId);
long sequenceNumber = arcticTable.asUnkeyedTable().snapshot(baseSnapshotId).sequenceNumber();
rewriteDataFiles.rewriteFiles(deleteDataFiles, addDataFiles, sequenceNumber);
} else {
rewriteDataFiles.rewriteFiles(deleteDataFiles, addDataFiles);
}
rewriteDataFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name());
rewriteDataFiles.commit();

// remove DeleteFiles additional
if (CollectionUtils.isNotEmpty(deleteDeleteFiles)) {
RewriteFiles rewriteDeleteFiles = baseArcticTable.newRewrite();
rewriteDeleteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name());
rewriteDeleteFiles.rewriteFiles(Collections.emptySet(), deleteDeleteFiles,
Collections.emptySet(), Collections.emptySet());
try {
rewriteDeleteFiles.commit();
} catch (ValidationException e) {
// Iceberg will drop DeleteFiles that are older than the min Data sequence number. So some DeleteFiles
// maybe already dropped in the last commit, the exception can be ignored.
LOG.warn("Iceberg RewriteFiles commit failed, but ignore", e);
}
}
rewriteFiles.rewriteFiles(deleteDataFiles, deleteDeleteFiles, addDataFiles, Collections.emptySet());
rewriteFiles.commit();

LOG.info("{} major optimize committed, delete {} files [{} Delete files], " +
"add {} new files",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,8 @@ public void testNoPartitionTableMajorOptimizeCommit() throws Exception {
try (CloseableIterable<FileScanTask> filesIterable = icebergNoPartitionTable.asUnkeyedTable().newScan()
.planFiles()) {
filesIterable.forEach(fileScanTask -> {
if (fileScanTask.file().fileSizeInBytes() <= 1000) {
oldDataFilesPath.add((String) fileScanTask.file().path());
fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path()));
}
oldDataFilesPath.add((String) fileScanTask.file().path());
fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path()));
});
}

Expand Down Expand Up @@ -108,6 +106,7 @@ public void testPartitionTableMajorOptimizeCommit() throws Exception {
.set(TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO,
TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT / 1000 + "")
.set(TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO, "0")
.set(org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE, "full")
.commit();
List<DataFile> dataFiles = insertDataFiles(icebergPartitionTable.asUnkeyedTable(), 10);
insertEqDeleteFiles(icebergPartitionTable.asUnkeyedTable(), 5);
Expand All @@ -116,10 +115,8 @@ public void testPartitionTableMajorOptimizeCommit() throws Exception {
Set<String> oldDeleteFilesPath = new HashSet<>();
try (CloseableIterable<FileScanTask> filesIterable = icebergPartitionTable.asUnkeyedTable().newScan().planFiles()) {
filesIterable.forEach(fileScanTask -> {
if (fileScanTask.file().fileSizeInBytes() <= 1000) {
oldDataFilesPath.add((String) fileScanTask.file().path());
fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path()));
}
oldDataFilesPath.add((String) fileScanTask.file().path());
fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,18 @@ public void testNoPartitionTableMinorOptimizeCommit() throws Exception {
optimizeCommit.commit(icebergNoPartitionTable.asUnkeyedTable().currentSnapshot().snapshotId());

Set<String> newDataFilesPath = new HashSet<>();
Set<String> newDeleteFilesPath = new HashSet<>();
try (CloseableIterable<FileScanTask> filesIterable = icebergPartitionTable.asUnkeyedTable().newScan()
.planFiles()) {
filesIterable.forEach(fileScanTask -> {
if (fileScanTask.file().fileSizeInBytes() <= 1000) {
newDataFilesPath.add((String) fileScanTask.file().path());
fileScanTask.deletes().forEach(deleteFile -> newDeleteFilesPath.add((String) deleteFile.path()));
}
});
}
Assert.assertNotEquals(oldDataFilesPath, newDataFilesPath);
Assert.assertNotEquals(oldDeleteFilesPath, newDeleteFilesPath);
}

@Test
Expand All @@ -113,10 +124,8 @@ public void testPartitionTableMinorOptimizeCommit() throws Exception {
try (CloseableIterable<FileScanTask> filesIterable = icebergPartitionTable.asUnkeyedTable().newScan()
.planFiles()) {
filesIterable.forEach(fileScanTask -> {
if (fileScanTask.file().fileSizeInBytes() <= 1000) {
oldDataFilesPath.add((String) fileScanTask.file().path());
fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path()));
}
oldDataFilesPath.add((String) fileScanTask.file().path());
fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path()));
});
}

Expand Down Expand Up @@ -160,6 +169,17 @@ public void testPartitionTableMinorOptimizeCommit() throws Exception {
optimizeCommit.commit(icebergPartitionTable.asUnkeyedTable().currentSnapshot().snapshotId());

Set<String> newDataFilesPath = new HashSet<>();
Set<String> newDeleteFilesPath = new HashSet<>();
try (CloseableIterable<FileScanTask> filesIterable = icebergPartitionTable.asUnkeyedTable().newScan()
.planFiles()) {
filesIterable.forEach(fileScanTask -> {
if (fileScanTask.file().fileSizeInBytes() <= 1000) {
newDataFilesPath.add((String) fileScanTask.file().path());
fileScanTask.deletes().forEach(deleteFile -> newDeleteFilesPath.add((String) deleteFile.path()));
}
});
}
Assert.assertNotEquals(oldDataFilesPath, newDataFilesPath);
Assert.assertNotEquals(oldDeleteFilesPath, newDeleteFilesPath);
}
}

0 comments on commit 1b6530b

Please sign in to comment.