From 0387b34a847504c7a611554e21d805dbf300dd4c Mon Sep 17 00:00:00 2001 From: luting Date: Mon, 13 Feb 2023 14:04:18 +0800 Subject: [PATCH 1/3] fix #1095 Adding the sequence number in the plan when the major commit for the native iceberg table --- .../ams/server/optimize/IcebergOptimizeCommit.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java index 4e496edf97..867b0193ae 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java @@ -213,13 +213,16 @@ private void majorCommit(ArcticTable arcticTable, return null; }).filter(Objects::nonNull).collect(Collectors.toSet()); - // rewrite DataFiles + // rewrite DataFiles, the useless DeleteFiles will be deleted by the Iceberg MergingSnapshotProducer RewriteFiles rewriteFiles = baseArcticTable.newRewrite(); - rewriteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); if (baseSnapshotId != TableOptimizeRuntime.INVALID_SNAPSHOT_ID) { rewriteFiles.validateFromSnapshot(baseSnapshotId); + long sequenceNumber = arcticTable.asUnkeyedTable().snapshot(baseSnapshotId).sequenceNumber(); + rewriteFiles.rewriteFiles(deleteDataFiles, addDataFiles, sequenceNumber); + } else { + rewriteFiles.rewriteFiles(deleteDataFiles, addDataFiles); } - rewriteFiles.rewriteFiles(deleteDataFiles, deleteDeleteFiles, addDataFiles, Collections.emptySet()); + rewriteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); rewriteFiles.commit(); LOG.info("{} major optimize committed, delete {} files [{} Delete files], " + From c292199155e72b31cb5ecebe2184c2f99aec89f7 Mon Sep 17 00:00:00 2001 From: luting Date: Mon, 13 Feb 2023 20:23:29 +0800 Subject: [PATCH 2/3] fix code review --- .../optimize/IcebergOptimizeCommit.java | 27 +++++++++++++----- .../TestIcebergFullOptimizeCommit.java | 13 ++++----- .../TestIcebergMinorOptimizeCommit.java | 28 ++++++++++++++++--- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java index 867b0193ae..4123cdfb19 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java @@ -213,17 +213,30 @@ private void majorCommit(ArcticTable arcticTable, return null; }).filter(Objects::nonNull).collect(Collectors.toSet()); - // rewrite DataFiles, the useless DeleteFiles will be deleted by the Iceberg MergingSnapshotProducer - RewriteFiles rewriteFiles = baseArcticTable.newRewrite(); + // rewrite DataFiles + RewriteFiles rewriteDataFiles = baseArcticTable.newRewrite(); if (baseSnapshotId != TableOptimizeRuntime.INVALID_SNAPSHOT_ID) { - rewriteFiles.validateFromSnapshot(baseSnapshotId); + rewriteDataFiles.validateFromSnapshot(baseSnapshotId); long sequenceNumber = arcticTable.asUnkeyedTable().snapshot(baseSnapshotId).sequenceNumber(); - rewriteFiles.rewriteFiles(deleteDataFiles, addDataFiles, sequenceNumber); + rewriteDataFiles.rewriteFiles(deleteDataFiles, addDataFiles, sequenceNumber); } else { - rewriteFiles.rewriteFiles(deleteDataFiles, addDataFiles); + rewriteDataFiles.rewriteFiles(deleteDataFiles, addDataFiles); + } + rewriteDataFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); + rewriteDataFiles.commit(); + + // remove DeleteFiles additional, because DeleteFiles maybe not existed + if (CollectionUtils.isNotEmpty(deleteDeleteFiles)) { + RewriteFiles rewriteDeleteFiles = baseArcticTable.newRewrite(); + rewriteDeleteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); + rewriteDeleteFiles.rewriteFiles(Collections.emptySet(), deleteDeleteFiles, + Collections.emptySet(), Collections.emptySet()); + try { + rewriteDeleteFiles.commit(); + } catch (ValidationException e) { + LOG.warn("Iceberg RewriteFiles commit failed, but ignore", e); + } } - rewriteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); - rewriteFiles.commit(); LOG.info("{} major optimize committed, delete {} files [{} Delete files], " + "add {} new files", diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizeCommit.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizeCommit.java index 3456be8e5b..41d344199b 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizeCommit.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergFullOptimizeCommit.java @@ -46,10 +46,8 @@ public void testNoPartitionTableMajorOptimizeCommit() throws Exception { try (CloseableIterable filesIterable = icebergNoPartitionTable.asUnkeyedTable().newScan() .planFiles()) { filesIterable.forEach(fileScanTask -> { - if (fileScanTask.file().fileSizeInBytes() <= 1000) { - oldDataFilesPath.add((String) fileScanTask.file().path()); - fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path())); - } + oldDataFilesPath.add((String) fileScanTask.file().path()); + fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path())); }); } @@ -108,6 +106,7 @@ public void testPartitionTableMajorOptimizeCommit() throws Exception { .set(TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO, TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT / 1000 + "") .set(TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO, "0") + .set(org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE, "full") .commit(); List dataFiles = insertDataFiles(icebergPartitionTable.asUnkeyedTable(), 10); insertEqDeleteFiles(icebergPartitionTable.asUnkeyedTable(), 5); @@ -116,10 +115,8 @@ public void testPartitionTableMajorOptimizeCommit() throws Exception { Set oldDeleteFilesPath = new HashSet<>(); try (CloseableIterable filesIterable = icebergPartitionTable.asUnkeyedTable().newScan().planFiles()) { filesIterable.forEach(fileScanTask -> { - if (fileScanTask.file().fileSizeInBytes() <= 1000) { - oldDataFilesPath.add((String) fileScanTask.file().path()); - fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path())); - } + oldDataFilesPath.add((String) fileScanTask.file().path()); + fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path())); }); } diff --git a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergMinorOptimizeCommit.java b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergMinorOptimizeCommit.java index 0f613d583c..467bb3c4c0 100644 --- a/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergMinorOptimizeCommit.java +++ b/ams/ams-server/src/test/java/com/netease/arctic/ams/server/optimize/TestIcebergMinorOptimizeCommit.java @@ -96,7 +96,18 @@ public void testNoPartitionTableMinorOptimizeCommit() throws Exception { optimizeCommit.commit(icebergNoPartitionTable.asUnkeyedTable().currentSnapshot().snapshotId()); Set newDataFilesPath = new HashSet<>(); + Set newDeleteFilesPath = new HashSet<>(); + try (CloseableIterable filesIterable = icebergPartitionTable.asUnkeyedTable().newScan() + .planFiles()) { + filesIterable.forEach(fileScanTask -> { + if (fileScanTask.file().fileSizeInBytes() <= 1000) { + newDataFilesPath.add((String) fileScanTask.file().path()); + fileScanTask.deletes().forEach(deleteFile -> newDeleteFilesPath.add((String) deleteFile.path())); + } + }); + } Assert.assertNotEquals(oldDataFilesPath, newDataFilesPath); + Assert.assertNotEquals(oldDeleteFilesPath, newDeleteFilesPath); } @Test @@ -113,10 +124,8 @@ public void testPartitionTableMinorOptimizeCommit() throws Exception { try (CloseableIterable filesIterable = icebergPartitionTable.asUnkeyedTable().newScan() .planFiles()) { filesIterable.forEach(fileScanTask -> { - if (fileScanTask.file().fileSizeInBytes() <= 1000) { - oldDataFilesPath.add((String) fileScanTask.file().path()); - fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path())); - } + oldDataFilesPath.add((String) fileScanTask.file().path()); + fileScanTask.deletes().forEach(deleteFile -> oldDeleteFilesPath.add((String) deleteFile.path())); }); } @@ -160,6 +169,17 @@ public void testPartitionTableMinorOptimizeCommit() throws Exception { optimizeCommit.commit(icebergPartitionTable.asUnkeyedTable().currentSnapshot().snapshotId()); Set newDataFilesPath = new HashSet<>(); + Set newDeleteFilesPath = new HashSet<>(); + try (CloseableIterable filesIterable = icebergPartitionTable.asUnkeyedTable().newScan() + .planFiles()) { + filesIterable.forEach(fileScanTask -> { + if (fileScanTask.file().fileSizeInBytes() <= 1000) { + newDataFilesPath.add((String) fileScanTask.file().path()); + fileScanTask.deletes().forEach(deleteFile -> newDeleteFilesPath.add((String) deleteFile.path())); + } + }); + } Assert.assertNotEquals(oldDataFilesPath, newDataFilesPath); + Assert.assertNotEquals(oldDeleteFilesPath, newDeleteFilesPath); } } From 1c8347b15525a60eb3577ebcd6d43788c25d424d Mon Sep 17 00:00:00 2001 From: luting Date: Tue, 14 Feb 2023 17:12:56 +0800 Subject: [PATCH 3/3] add comment --- .../arctic/ams/server/optimize/IcebergOptimizeCommit.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java index 4123cdfb19..9c8669125c 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/IcebergOptimizeCommit.java @@ -225,7 +225,7 @@ private void majorCommit(ArcticTable arcticTable, rewriteDataFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); rewriteDataFiles.commit(); - // remove DeleteFiles additional, because DeleteFiles maybe not existed + // remove DeleteFiles additional if (CollectionUtils.isNotEmpty(deleteDeleteFiles)) { RewriteFiles rewriteDeleteFiles = baseArcticTable.newRewrite(); rewriteDeleteFiles.set(SnapshotSummary.SNAPSHOT_PRODUCER, CommitMetaProducer.OPTIMIZE.name()); @@ -234,6 +234,8 @@ private void majorCommit(ArcticTable arcticTable, try { rewriteDeleteFiles.commit(); } catch (ValidationException e) { + // Iceberg will drop DeleteFiles that are older than the min Data sequence number. So some DeleteFiles + // maybe already dropped in the last commit, the exception can be ignored. LOG.warn("Iceberg RewriteFiles commit failed, but ignore", e); } }