diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/IcebergCommit.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/IcebergCommit.java index a8b44a6b25..ba6aa4967f 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/IcebergCommit.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/IcebergCommit.java @@ -55,36 +55,66 @@ public class IcebergCommit { } public void commit() throws OptimizingCommitException { - try { - LOG.info("{} get tasks to commit {}", table.id(), tasks); + LOG.info("{} get tasks to commit {}", table.id(), tasks); - // collect files - Set addedDataFiles = Sets.newHashSet(); - Set removedDataFiles = Sets.newHashSet(); - Set addedDeleteFiles = Sets.newHashSet(); - Set removedDeleteFiles = Sets.newHashSet(); - for (TaskRuntime task : tasks) { - if (task.getOutput().getDataFiles() != null) { - addedDataFiles.addAll(Arrays.asList(task.getOutput().getDataFiles())); - } - if (task.getOutput().getDeleteFiles() != null) { - addedDeleteFiles.addAll(Arrays.asList(task.getOutput().getDeleteFiles())); - } - if (task.getInput().rewrittenDataFiles() != null) { - removedDataFiles.addAll(Arrays.asList(task.getInput().rewrittenDataFiles())); - } - if (task.getInput().deleteFiles() != null) { - removedDeleteFiles.addAll(Arrays.stream(task.getInput().deleteFiles()) - .map(IcebergContentFile::asDeleteFile).collect(Collectors.toSet())); - } + // collect files + Set addedDataFiles = Sets.newHashSet(); + Set removedDataFiles = Sets.newHashSet(); + Set addedDeleteFiles = Sets.newHashSet(); + Set removedDeleteFiles = Sets.newHashSet(); + for (TaskRuntime task : tasks) { + if (task.getOutput().getDataFiles() != null) { + addedDataFiles.addAll(Arrays.asList(task.getOutput().getDataFiles())); + } + if (task.getOutput().getDeleteFiles() != null) { + addedDeleteFiles.addAll(Arrays.asList(task.getOutput().getDeleteFiles())); + } + if (task.getInput().rewrittenDataFiles() != null) { + removedDataFiles.addAll(Arrays.asList(task.getInput().rewrittenDataFiles())); } + if (task.getInput().deleteFiles() != null) { + removedDeleteFiles.addAll(Arrays.stream(task.getInput().deleteFiles()) + .map(IcebergContentFile::asDeleteFile).collect(Collectors.toSet())); + } + } - UnkeyedTable icebergTable = table.asUnkeyedTable(); - Transaction transaction = icebergTable.newTransaction(); + UnkeyedTable icebergTable = table.asUnkeyedTable(); - replaceDataFiles(transaction, removedDataFiles, addedDataFiles); - replaceDeleteFiles(transaction, removedDeleteFiles, addedDeleteFiles); + replaceFiles(icebergTable, removedDataFiles, addedDataFiles, addedDeleteFiles); + removeOldDeleteFiles(icebergTable, removedDeleteFiles); + } + + protected void replaceFiles( + UnkeyedTable icebergTable, + Set removedDataFiles, + Set addedDataFiles, + Set addDeleteFiles) throws OptimizingCommitException { + try { + Transaction transaction = icebergTable.newTransaction(); + if (CollectionUtils.isNotEmpty(removedDataFiles) + || CollectionUtils.isNotEmpty(addedDataFiles)) { + RewriteFiles dataFileRewrite = transaction.newRewrite(); + if (targetSnapshotId != ArcticServiceConstants.INVALID_SNAPSHOT_ID) { + dataFileRewrite.validateFromSnapshot(targetSnapshotId); + long sequenceNumber = table.asUnkeyedTable().snapshot(targetSnapshotId).sequenceNumber(); + dataFileRewrite.rewriteFiles(removedDataFiles, addedDataFiles, sequenceNumber); + } else { + dataFileRewrite.rewriteFiles(removedDataFiles, addedDataFiles); + } + dataFileRewrite.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); + dataFileRewrite.commit(); + } + if (CollectionUtils.isNotEmpty(addDeleteFiles)) { + RewriteFiles addDeleteFileRewrite = transaction.newRewrite(); + addDeleteFileRewrite.rewriteFiles( + Collections.emptySet(), + Collections.emptySet(), + Collections.emptySet(), + addDeleteFiles); + addDeleteFileRewrite.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); + addDeleteFileRewrite.commit(); + } transaction.commitTransaction(); } catch (ValidationException e) { String missFileMessage = "Missing required files to delete"; @@ -104,42 +134,24 @@ public void commit() throws OptimizingCommitException { } } - protected void replaceDataFiles( - Transaction transaction, - Set removedDataFiles, - Set addedDataFiles) { - if (CollectionUtils.isNotEmpty(removedDataFiles) || CollectionUtils.isNotEmpty(addedDataFiles)) { - RewriteFiles dataFileRewrite = transaction.newRewrite(); - if (targetSnapshotId != ArcticServiceConstants.INVALID_SNAPSHOT_ID) { - dataFileRewrite.validateFromSnapshot(targetSnapshotId); - long sequenceNumber = table.asUnkeyedTable().snapshot(targetSnapshotId).sequenceNumber(); - dataFileRewrite.rewriteFiles(removedDataFiles, addedDataFiles, sequenceNumber); - } else { - dataFileRewrite.rewriteFiles(removedDataFiles, addedDataFiles); - } - dataFileRewrite.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); - dataFileRewrite.commit(); + protected void removeOldDeleteFiles( + UnkeyedTable icebergTable, + Set removedDeleteFiles) { + if (CollectionUtils.isEmpty(removedDeleteFiles)) { + return; } - } - - protected void replaceDeleteFiles( - Transaction transaction, - Set removedDeleteFiles, - Set addedDeleteFiles) { - if (CollectionUtils.isNotEmpty(removedDeleteFiles) || CollectionUtils.isNotEmpty(addedDeleteFiles)) { - RewriteFiles deleteFileRewrite = transaction.newRewrite(); - deleteFileRewrite.rewriteFiles(Collections.emptySet(), - removedDeleteFiles, Collections.emptySet(), addedDeleteFiles); + RewriteFiles deleteFileRewrite = icebergTable.newRewrite(); + deleteFileRewrite.rewriteFiles(Collections.emptySet(), + removedDeleteFiles, Collections.emptySet(), Collections.emptySet()); + deleteFileRewrite.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); - deleteFileRewrite.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); - try { - deleteFileRewrite.commit(); - } catch (ValidationException e) { - // Iceberg will drop DeleteFiles that are older than the min Data sequence number. So some DeleteFiles - // maybe already dropped in the last commit, the exception can be ignored. - LOG.warn("Iceberg RewriteFiles commit failed, but ignore", e); - } + try { + deleteFileRewrite.commit(); + } catch (ValidationException e) { + // Iceberg will drop DeleteFiles that are older than the min Data sequence number. So some DeleteFiles + // maybe already dropped in the last commit, the exception can be ignored. + LOG.warn("Iceberg RewriteFiles commit failed, but ignore", e); } } } \ No newline at end of file