Skip to content

Commit

Permalink
Try different approach, instead of skipping cleanup always cleanup
Browse files Browse the repository at this point in the history
But instead of reading metadata/manifest list in order to cleanup,
SnapshotProducer keeps track of committed files
  • Loading branch information
grantatspothero committed Jun 25, 2024
1 parent e510b3a commit fb61daf
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 33 deletions.
5 changes: 0 additions & 5 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@ protected void cleanUncommitted(Set<ManifestFile> committed) {
}
}

@Override
protected boolean cleanupAfterCommit() {
return commitMetrics().attempts().value() > 1;
}

private List<ManifestFile> writeNewManifests() throws IOException {
if (hasNewFiles && newManifests != null) {
newManifests.forEach(file -> deleteFile(file.path()));
Expand Down
45 changes: 17 additions & 28 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptingFileIO;
Expand Down Expand Up @@ -104,6 +104,7 @@ public void accept(String file) {
private ExecutorService workerPool = ThreadPools.getWorkerPool();
private String targetBranch = SnapshotRef.MAIN_BRANCH;
private CommitMetrics commitMetrics;
private List<ManifestFile> snapshotManifests;

protected SnapshotProducer(TableOperations ops) {
this.ops = ops;
Expand Down Expand Up @@ -237,6 +238,7 @@ public Snapshot apply() {

OutputFile manifestList = manifestListPath();

List<ManifestFile> filesToWrite;
try (ManifestListWriter writer =
ManifestLists.write(
ops.current().formatVersion(),
Expand All @@ -256,10 +258,12 @@ public Snapshot apply() {
.executeWith(workerPool)
.run(index -> manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index)));

writer.addAll(Arrays.asList(manifestFiles));
filesToWrite = Arrays.asList(manifestFiles);
writer.addAll(filesToWrite);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write manifest list file");
}
snapshotManifests = filesToWrite;

return new BaseSnapshot(
sequenceNumber,
Expand Down Expand Up @@ -368,8 +372,8 @@ protected TableMetadata refresh() {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public void commit() {
// this is always set to the latest commit attempt's snapshot id.
AtomicLong newSnapshotId = new AtomicLong(-1L);
// set to the successfully committed snapshot
AtomicReference<Snapshot> committedSnapshot = new AtomicReference<>();
Timed totalDuration = commitMetrics().totalDuration().start();
try {
Tasks.foreach(ops)
Expand All @@ -384,7 +388,6 @@ public void commit() {
.run(
taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata.Builder update = TableMetadata.buildFrom(base);
if (base.snapshot(newSnapshot.snapshotId()) != null) {
// this is a rollback operation
Expand All @@ -408,6 +411,7 @@ public void commit() {
// to ensure that if a concurrent operation assigns the UUID, this operation will
// not fail.
taskOps.commit(base, updated.withUUID());
committedSnapshot.set(newSnapshot);
});

} catch (CommitStateUnknownException commitStateUnknownException) {
Expand All @@ -421,27 +425,16 @@ public void commit() {
}

try {
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());

if (cleanupAfterCommit()) {
// at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
// id in case another commit was added between this commit and the refresh.
Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
if (saved != null) {
cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!saved.manifestListLocation().equals(manifestList)) {
deleteFile(manifestList);
}
}
} else {
// saved may not be present if the latest metadata couldn't be loaded due to eventual
// consistency problems in refresh. in that case, don't clean up.
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
Snapshot committed = committedSnapshot.get();
LOG.info("Committed snapshot {} ({})", committed.snapshotId(), getClass().getSimpleName());
// since the snapshot was committed, the snapshotManifests were committed
cleanUncommitted(Sets.newHashSet(snapshotManifests));
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if (!committed.manifestListLocation().equals(manifestList)) {
deleteFile(manifestList);
}
}

} catch (Throwable e) {
LOG.warn(
"Failed to load committed table metadata or during cleanup, skipping further cleanup", e);
Expand Down Expand Up @@ -565,10 +558,6 @@ protected boolean canInheritSnapshotId() {
return canInheritSnapshotId;
}

protected boolean cleanupAfterCommit() {
return true;
}

private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
Expand Down

0 comments on commit fb61daf

Please sign in to comment.