Skip to content

Commit 5dad79b

Browse files
Only fire updateEvent if snapshot has committed
1 parent dbfefb0 commit 5dad79b

File tree

1 file changed

+23
-20
lines changed

1 file changed

+23
-20
lines changed

core/src/main/java/org/apache/iceberg/SnapshotProducer.java

+23-20
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ protected TableMetadata refresh() {
391391
public void commit() {
392392
// this is always set to the latest commit attempt's snapshot
393393
AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
394+
Snapshot committedSnapshot;
394395
try (Timed ignore = commitMetrics().totalDuration().start()) {
395396
try {
396397
Tasks.foreach(ops)
@@ -444,7 +445,7 @@ public void commit() {
444445
}
445446

446447
// at this point, the commit must have succeeded so the stagedSnapshot is committed
447-
Snapshot committedSnapshot = stagedSnapshot.get();
448+
committedSnapshot = stagedSnapshot.get();
448449
try {
449450
LOG.info(
450451
"Committed snapshot {} ({})",
@@ -468,31 +469,33 @@ public void commit() {
468469
}
469470

470471
try {
471-
notifyListeners();
472+
notifyListeners(committedSnapshot);
472473
} catch (Throwable e) {
473474
LOG.warn("Failed to notify event listeners", e);
474475
}
475476
}
476477

477-
private void notifyListeners() {
478+
private void notifyListeners(Snapshot committedSnapshot) {
478479
try {
479-
Object event = updateEvent();
480-
if (event != null) {
481-
Listeners.notifyAll(event);
482-
483-
if (event instanceof CreateSnapshotEvent) {
484-
CreateSnapshotEvent createSnapshotEvent = (CreateSnapshotEvent) event;
485-
486-
reporter.report(
487-
ImmutableCommitReport.builder()
488-
.tableName(createSnapshotEvent.tableName())
489-
.snapshotId(createSnapshotEvent.snapshotId())
490-
.operation(createSnapshotEvent.operation())
491-
.sequenceNumber(createSnapshotEvent.sequenceNumber())
492-
.metadata(EnvironmentContext.get())
493-
.commitMetrics(
494-
CommitMetricsResult.from(commitMetrics(), createSnapshotEvent.summary()))
495-
.build());
480+
if (committedSnapshot != null) {
481+
Object event = updateEvent();
482+
if (event != null) {
483+
Listeners.notifyAll(event);
484+
485+
if (event instanceof CreateSnapshotEvent) {
486+
CreateSnapshotEvent createSnapshotEvent = (CreateSnapshotEvent) event;
487+
488+
reporter.report(
489+
ImmutableCommitReport.builder()
490+
.tableName(createSnapshotEvent.tableName())
491+
.snapshotId(createSnapshotEvent.snapshotId())
492+
.operation(createSnapshotEvent.operation())
493+
.sequenceNumber(createSnapshotEvent.sequenceNumber())
494+
.metadata(EnvironmentContext.get())
495+
.commitMetrics(
496+
CommitMetricsResult.from(commitMetrics(), createSnapshotEvent.summary()))
497+
.build());
498+
}
496499
}
497500
}
498501
} catch (RuntimeException e) {

0 commit comments

Comments
 (0)