diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index a2fb9a707eec..14e776a92d6a 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -192,11 +192,6 @@ protected void cleanUncommitted(Set committed) { } } - @Override - protected boolean cleanupAfterCommit() { - return commitMetrics().attempts().value() > 1; - } - private List writeNewManifests() throws IOException { if (hasNewFiles && newManifests != null) { newManifests.forEach(file -> deleteFile(file.path())); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index d920e36483ae..85747dc71174 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -41,7 +41,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptingFileIO; @@ -104,6 +104,7 @@ public void accept(String file) { private ExecutorService workerPool = ThreadPools.getWorkerPool(); private String targetBranch = SnapshotRef.MAIN_BRANCH; private CommitMetrics commitMetrics; + private List snapshotManifests; protected SnapshotProducer(TableOperations ops) { this.ops = ops; @@ -237,6 +238,7 @@ public Snapshot apply() { OutputFile manifestList = manifestListPath(); + List filesToWrite; try (ManifestListWriter writer = ManifestLists.write( ops.current().formatVersion(), @@ -256,10 +258,12 @@ public Snapshot apply() { .executeWith(workerPool) .run(index -> manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index))); - writer.addAll(Arrays.asList(manifestFiles)); + filesToWrite = Arrays.asList(manifestFiles); + writer.addAll(filesToWrite); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to write manifest list file"); } + snapshotManifests = filesToWrite; return new BaseSnapshot( sequenceNumber, @@ -368,8 +372,8 @@ protected TableMetadata refresh() { @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public void commit() { - // this is always set to the latest commit attempt's snapshot id. - AtomicLong newSnapshotId = new AtomicLong(-1L); + // set to the successfully committed snapshot + AtomicReference committedSnapshot = new AtomicReference<>(); Timed totalDuration = commitMetrics().totalDuration().start(); try { Tasks.foreach(ops) @@ -384,7 +388,6 @@ public void commit() { .run( taskOps -> { Snapshot newSnapshot = apply(); - newSnapshotId.set(newSnapshot.snapshotId()); TableMetadata.Builder update = TableMetadata.buildFrom(base); if (base.snapshot(newSnapshot.snapshotId()) != null) { // this is a rollback operation @@ -408,6 +411,7 @@ public void commit() { // to ensure that if a concurrent operation assigns the UUID, this operation will // not fail. taskOps.commit(base, updated.withUUID()); + committedSnapshot.set(newSnapshot); }); } catch (CommitStateUnknownException commitStateUnknownException) { @@ -421,27 +425,16 @@ public void commit() { } try { - LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName()); - - if (cleanupAfterCommit()) { - // at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by - // id in case another commit was added between this commit and the refresh. - Snapshot saved = ops.refresh().snapshot(newSnapshotId.get()); - if (saved != null) { - cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io()))); - // also clean up unused manifest lists created by multiple attempts - for (String manifestList : manifestLists) { - if (!saved.manifestListLocation().equals(manifestList)) { - deleteFile(manifestList); - } - } - } else { - // saved may not be present if the latest metadata couldn't be loaded due to eventual - // consistency problems in refresh. in that case, don't clean up. - LOG.warn("Failed to load committed snapshot, skipping manifest clean-up"); + Snapshot committed = committedSnapshot.get(); + LOG.info("Committed snapshot {} ({})", committed.snapshotId(), getClass().getSimpleName()); + // since the snapshot was committed, the snapshotManifests were committed + cleanUncommitted(Sets.newHashSet(snapshotManifests)); + // also clean up unused manifest lists created by multiple attempts + for (String manifestList : manifestLists) { + if (!committed.manifestListLocation().equals(manifestList)) { + deleteFile(manifestList); } } - } catch (Throwable e) { LOG.warn( "Failed to load committed table metadata or during cleanup, skipping further cleanup", e); @@ -565,10 +558,6 @@ protected boolean canInheritSnapshotId() { return canInheritSnapshotId; } - protected boolean cleanupAfterCommit() { - return true; - } - private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {