Skip to content

Commit

Permalink
Support SnapshotProducer.updateEvent(Snapshot committedSnapshot)
Browse files Browse the repository at this point in the history
avoids unnecessary iceberg metadata read of just committed snapshot
  • Loading branch information
grantatspothero committed Jan 6, 2025
1 parent 5dad79b commit 3887b94
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,18 @@ public CherryPickOperation cherrypick(long snapshotId) {
}

@Override
public Object updateEvent() {
public Object updateEvent(Snapshot committedSnapshot) {
if (cherrypickSnapshot == null) {
// NOOP operation, no snapshot created
return null;
}

TableMetadata tableMetadata = refresh();
long snapshotId = tableMetadata.currentSnapshot().snapshotId();
if (cherrypickSnapshot.snapshotId() == snapshotId) {
if (cherrypickSnapshot.snapshotId() == committedSnapshot.snapshotId()) {
// No new snapshot is created for fast-forward
return null;
} else {
// New snapshot created, we rely on super class to fire a CreateSnapshotEvent
return super.updateEvent();
return super.updateEvent(committedSnapshot);
}
}

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,13 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
}

@Override
public Object updateEvent() {
long snapshotId = snapshotId();
Snapshot snapshot = ops().current().snapshot(snapshotId);
long sequenceNumber = snapshot.sequenceNumber();
public Object updateEvent(Snapshot committedSnapshot) {
return new CreateSnapshotEvent(
tableName, operation(), snapshotId, sequenceNumber, snapshot.summary());
tableName,
operation(),
committedSnapshot.snapshotId(),
committedSnapshot.sequenceNumber(),
committedSnapshot.summary());
}

@Override
Expand Down
28 changes: 7 additions & 21 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class);

// data is only added in "append" and "overwrite" operations
private static final Set<String> VALIDATE_ADDED_FILES_OPERATIONS =
ImmutableSet.of(DataOperations.APPEND, DataOperations.OVERWRITE);
Expand Down Expand Up @@ -956,23 +952,13 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
}

@Override
public Object updateEvent() {
long snapshotId = snapshotId();
Snapshot justSaved = ops().refresh().snapshot(snapshotId);
long sequenceNumber = TableMetadata.INVALID_SEQUENCE_NUMBER;
Map<String, String> summary;
if (justSaved == null) {
// The snapshot just saved may not be present if the latest metadata couldn't be loaded due to
// eventual
// consistency problems in refresh.
LOG.warn("Failed to load committed snapshot: omitting sequence number from notifications");
summary = summary();
} else {
sequenceNumber = justSaved.sequenceNumber();
summary = justSaved.summary();
}

return new CreateSnapshotEvent(tableName, operation(), snapshotId, sequenceNumber, summary);
public Object updateEvent(Snapshot committedSnapshot) {
return new CreateSnapshotEvent(
tableName,
operation(),
committedSnapshot.snapshotId(),
committedSnapshot.sequenceNumber(),
committedSnapshot.summary());
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,14 @@ public void commit() {
}
}

Object updateEvent(Snapshot committedSnapshot) {
return updateEvent();
}

private void notifyListeners(Snapshot committedSnapshot) {
try {
if (committedSnapshot != null) {
Object event = updateEvent();
Object event = updateEvent(committedSnapshot);
if (event != null) {
Listeners.notifyAll(event);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
public class TestPendingUpdateUpdateEvents extends TestBase {
@Parameters(name = "formatVersion = {0}")
protected static List<Object> parameters() {
return Arrays.asList(1, 2, 3);
}

@TestTemplate
public void testFastAppendDoesNotPerformExtraMetadataReads() {
table.newFastAppend().appendFile(FILE_A).commit();
long startSnapshotId = table.currentSnapshot().snapshotId();

AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_A);
int commitOnlyFetchCount = recordMetadataFetchesForCommitOnly(appendFiles);

assertThat(table.currentSnapshot().snapshotId()).isNotEqualTo(startSnapshotId);
// fetch metadata once
assertThat(commitOnlyFetchCount).isEqualTo(1);
}

@TestTemplate
public void testCherryPickOperationDoesNotPerformExtraMetadataReads() {
table.newFastAppend().appendFile(FILE_A).commit();
long startSnapshotId = table.currentSnapshot().snapshotId();

table.newAppend().appendFile(FILE_B).stageOnly().commit();
Snapshot stagedSnapshot = readMetadata().snapshots().get(1);

ManageSnapshots manageSnapshots =
table.manageSnapshots().cherrypick(stagedSnapshot.snapshotId());
int commitOnlyFetchCount = recordMetadataFetchesForCommitOnly(manageSnapshots);

assertThat(table.currentSnapshot().snapshotId()).isNotEqualTo(startSnapshotId);
// fetch metadata twice, once in BaseTransaction#applyUpdates and once normally
assertThat(commitOnlyFetchCount).isEqualTo(2);
}

private <T> int recordMetadataFetchesForCommitOnly(PendingUpdate<T> pendingUpdate) {
pendingUpdate.apply();

// determine number of metadata fetches during apply
int beforeApplyFetchCount = table.ops().getMetadataFetchCount();
pendingUpdate.apply();
int afterApplyFetchCount = table.ops().getMetadataFetchCount();
int applyOnlyFetchCount = afterApplyFetchCount - beforeApplyFetchCount;

// determine number of metadata fetches during commit
pendingUpdate.commit();
int afterCommitFetchCount = table.ops().getMetadataFetchCount();
return afterCommitFetchCount - afterApplyFetchCount - applyOnlyFetchCount;
}
}
8 changes: 8 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.File;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand Down Expand Up @@ -220,6 +221,7 @@ public static class TestTableOperations implements TableOperations {
private TableMetadata current = null;
private long lastSnapshotId = 0;
private int failCommits = 0;
private final AtomicInteger metadataFetchCount = new AtomicInteger();

public TestTableOperations(String tableName, File location) {
this.tableName = tableName;
Expand Down Expand Up @@ -255,8 +257,13 @@ void failCommits(int numFailures) {
this.failCommits = numFailures;
}

int getMetadataFetchCount() {
return metadataFetchCount.get();
}

@Override
public TableMetadata current() {
metadataFetchCount.incrementAndGet();
return current;
}

Expand All @@ -265,6 +272,7 @@ public TableMetadata refresh() {
synchronized (METADATA) {
this.current = METADATA.get(tableName);
}
metadataFetchCount.incrementAndGet();
return current;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ public void testMetadataFileLocationsWithMissingFiles() {
// delete v3.metadata.json making v2.metadata.json and v1.metadata.json inaccessible
table.io().deleteFile(location);

Set<String> metadataFileLocations = ReachableFileUtil.metadataFileLocations(table, true);
// original hadoop table will not see the file deletion
Table refreshedTable = TABLES.load(tableDir.toURI().toString());
Set<String> metadataFileLocations =
ReachableFileUtil.metadataFileLocations(refreshedTable, true);
assertThat(metadataFileLocations).hasSize(2);
}

Expand Down

0 comments on commit 3887b94

Please sign in to comment.