Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unneeded metadata read during update event generation #11829

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,18 @@ public CherryPickOperation cherrypick(long snapshotId) {
}

@Override
public Object updateEvent() {
public Object updateEvent(Snapshot committedSnapshot) {
if (cherrypickSnapshot == null) {
// NOOP operation, no snapshot created
return null;
}

TableMetadata tableMetadata = refresh();
long snapshotId = tableMetadata.currentSnapshot().snapshotId();
if (cherrypickSnapshot.snapshotId() == snapshotId) {
if (cherrypickSnapshot.snapshotId() == committedSnapshot.snapshotId()) {
// No new snapshot is created for fast-forward
return null;
} else {
// New snapshot created, we rely on super class to fire a CreateSnapshotEvent
return super.updateEvent();
return super.updateEvent(committedSnapshot);
}
}

Expand Down
11 changes: 6 additions & 5 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,13 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
}

@Override
public Object updateEvent() {
long snapshotId = snapshotId();
Snapshot snapshot = ops().current().snapshot(snapshotId);
long sequenceNumber = snapshot.sequenceNumber();
public Object updateEvent(Snapshot committedSnapshot) {
return new CreateSnapshotEvent(
tableName, operation(), snapshotId, sequenceNumber, snapshot.summary());
tableName,
operation(),
committedSnapshot.snapshotId(),
committedSnapshot.sequenceNumber(),
committedSnapshot.summary());
}

@Override
Expand Down
28 changes: 7 additions & 21 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class);

// data is only added in "append" and "overwrite" operations
private static final Set<String> VALIDATE_ADDED_FILES_OPERATIONS =
ImmutableSet.of(DataOperations.APPEND, DataOperations.OVERWRITE);
Expand Down Expand Up @@ -956,23 +952,13 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
}

@Override
public Object updateEvent() {
long snapshotId = snapshotId();
Snapshot justSaved = ops().refresh().snapshot(snapshotId);
long sequenceNumber = TableMetadata.INVALID_SEQUENCE_NUMBER;
Map<String, String> summary;
if (justSaved == null) {
// The snapshot just saved may not be present if the latest metadata couldn't be loaded due to
// eventual
// consistency problems in refresh.
LOG.warn("Failed to load committed snapshot: omitting sequence number from notifications");
summary = summary();
} else {
sequenceNumber = justSaved.sequenceNumber();
summary = justSaved.summary();
}

return new CreateSnapshotEvent(tableName, operation(), snapshotId, sequenceNumber, summary);
public Object updateEvent(Snapshot committedSnapshot) {
return new CreateSnapshotEvent(
tableName,
operation(),
committedSnapshot.snapshotId(),
committedSnapshot.sequenceNumber(),
committedSnapshot.summary());
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
Expand Down
47 changes: 27 additions & 20 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ protected TableMetadata refresh() {
public void commit() {
// this is always set to the latest commit attempt's snapshot
AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
Snapshot committedSnapshot;
try (Timed ignore = commitMetrics().totalDuration().start()) {
try {
Tasks.foreach(ops)
Expand Down Expand Up @@ -444,7 +445,7 @@ public void commit() {
}

// at this point, the commit must have succeeded so the stagedSnapshot is committed
Snapshot committedSnapshot = stagedSnapshot.get();
committedSnapshot = stagedSnapshot.get();
try {
LOG.info(
"Committed snapshot {} ({})",
Expand All @@ -468,31 +469,37 @@ public void commit() {
}

try {
notifyListeners();
notifyListeners(committedSnapshot);
} catch (Throwable e) {
LOG.warn("Failed to notify event listeners", e);
}
}

private void notifyListeners() {
Object updateEvent(Snapshot committedSnapshot) {
Copy link
Contributor Author

@grantatspothero grantatspothero Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PendingUpdate.updateEvent only usage is in SnapshotProducer currently so could change the interface directly to updateEvent(Snapshot committedSnapshot), but did not want to make a backwards incompatible API change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SnapshotProducer is package private, so I think we're OK in terms of backwards compatibility since it's not like a public API is being broken.

Copy link
Contributor Author

@grantatspothero grantatspothero Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was trying to provide context why I didn't modify PendingUpdate.updateEvent directly (it is a public API)

But agree with you is safe to modify SnapshotProducer here.

return updateEvent();
}

private void notifyListeners(Snapshot committedSnapshot) {
try {
Object event = updateEvent();
if (event != null) {
Listeners.notifyAll(event);

if (event instanceof CreateSnapshotEvent) {
CreateSnapshotEvent createSnapshotEvent = (CreateSnapshotEvent) event;

reporter.report(
ImmutableCommitReport.builder()
.tableName(createSnapshotEvent.tableName())
.snapshotId(createSnapshotEvent.snapshotId())
.operation(createSnapshotEvent.operation())
.sequenceNumber(createSnapshotEvent.sequenceNumber())
.metadata(EnvironmentContext.get())
.commitMetrics(
CommitMetricsResult.from(commitMetrics(), createSnapshotEvent.summary()))
.build());
if (committedSnapshot != null) {
Object event = updateEvent(committedSnapshot);
if (event != null) {
Listeners.notifyAll(event);

if (event instanceof CreateSnapshotEvent) {
CreateSnapshotEvent createSnapshotEvent = (CreateSnapshotEvent) event;

reporter.report(
ImmutableCommitReport.builder()
.tableName(createSnapshotEvent.tableName())
.snapshotId(createSnapshotEvent.snapshotId())
.operation(createSnapshotEvent.operation())
.sequenceNumber(createSnapshotEvent.sequenceNumber())
.metadata(EnvironmentContext.get())
.commitMetrics(
CommitMetricsResult.from(commitMetrics(), createSnapshotEvent.summary()))
.build());
}
}
}
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
public class TestPendingUpdateUpdateEvents extends TestBase {
@Parameters(name = "formatVersion = {0}")
protected static List<Object> parameters() {
return Arrays.asList(1, 2, 3);
}

@TestTemplate
public void testFastAppendDoesNotPerformExtraMetadataReads() {
table.newFastAppend().appendFile(FILE_A).commit();
long startSnapshotId = table.currentSnapshot().snapshotId();

AppendFiles appendFiles = table.newFastAppend().appendFile(FILE_A);
int commitOnlyFetchCount = recordMetadataFetchesForCommitOnly(appendFiles);

assertThat(table.currentSnapshot().snapshotId()).isNotEqualTo(startSnapshotId);
// fetch metadata once
assertThat(commitOnlyFetchCount).isEqualTo(1);
}

@TestTemplate
public void testCherryPickOperationDoesNotPerformExtraMetadataReads() {
table.newFastAppend().appendFile(FILE_A).commit();
long startSnapshotId = table.currentSnapshot().snapshotId();

table.newAppend().appendFile(FILE_B).stageOnly().commit();
Snapshot stagedSnapshot = readMetadata().snapshots().get(1);

ManageSnapshots manageSnapshots =
table.manageSnapshots().cherrypick(stagedSnapshot.snapshotId());
int commitOnlyFetchCount = recordMetadataFetchesForCommitOnly(manageSnapshots);

assertThat(table.currentSnapshot().snapshotId()).isNotEqualTo(startSnapshotId);
// fetch metadata twice, once in BaseTransaction#applyUpdates and once normally
assertThat(commitOnlyFetchCount).isEqualTo(2);
}

private <T> int recordMetadataFetchesForCommitOnly(PendingUpdate<T> pendingUpdate) {
pendingUpdate.apply();

// determine number of metadata fetches during apply
int beforeApplyFetchCount = table.ops().getMetadataFetchCount();
pendingUpdate.apply();
int afterApplyFetchCount = table.ops().getMetadataFetchCount();
int applyOnlyFetchCount = afterApplyFetchCount - beforeApplyFetchCount;

// determine number of metadata fetches during commit
pendingUpdate.commit();
int afterCommitFetchCount = table.ops().getMetadataFetchCount();
return afterCommitFetchCount - afterApplyFetchCount - applyOnlyFetchCount;
}
}
8 changes: 8 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.File;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand Down Expand Up @@ -220,6 +221,7 @@ public static class TestTableOperations implements TableOperations {
private TableMetadata current = null;
private long lastSnapshotId = 0;
private int failCommits = 0;
private final AtomicInteger metadataFetchCount = new AtomicInteger();

public TestTableOperations(String tableName, File location) {
this.tableName = tableName;
Expand Down Expand Up @@ -255,8 +257,13 @@ void failCommits(int numFailures) {
this.failCommits = numFailures;
}

int getMetadataFetchCount() {
Copy link
Contributor Author

@grantatspothero grantatspothero Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to count calls to current()/refresh() as a proxy for reading metadata from objectstorage. For TestTables (which uses in memory metadata) that is the best we can do.

Is there some other test I could write that uses real file system metadata (not TestTables)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why current/refresh counting is not great: cached metadata fetches are free while uncached fetches are not, and this does not differentiate between the two.

Instrumenting at the fileio level would let us validate indeed only a single uncached metadata request is needed for the whole commit.

return metadataFetchCount.get();
}

@Override
public TableMetadata current() {
metadataFetchCount.incrementAndGet();
return current;
}

Expand All @@ -265,6 +272,7 @@ public TableMetadata refresh() {
synchronized (METADATA) {
this.current = METADATA.get(tableName);
}
metadataFetchCount.incrementAndGet();
return current;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ public void testMetadataFileLocationsWithMissingFiles() {
// delete v3.metadata.json making v2.metadata.json and v1.metadata.json inaccessible
table.io().deleteFile(location);

Set<String> metadataFileLocations = ReachableFileUtil.metadataFileLocations(table, true);
// original hadoop table will not see the file deletion
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is side effect of removing refresh() calls during update event generation. Code which relied upon table.refresh being called for correctness will now break.

This is mostly a problem for hadoop/filesystem tables and these are deprecated: https://iceberg.apache.org/spec/#file-system-tables

For metastore catalogs for example, table.refresh() can be manually called after commits. This does not work for filesystem tables hence the creating of a new table.

Table refreshedTable = TABLES.load(tableDir.toURI().toString());
Set<String> metadataFileLocations =
ReachableFileUtil.metadataFileLocations(refreshedTable, true);
assertThat(metadataFileLocations).hasSize(2);
}

Expand Down
Loading