Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up Snapshot Finalization (#47283) #47309

Merged
merged 1 commit into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData metaData, Map<String, Object> userMetadata) {
return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata);
public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData metaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
* @param shardFailures list of shard failures
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param includeGlobalState include cluster global state
* @return snapshot description
* @param listener listener to be called on completion of the snapshot
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata);
void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata, ActionListener<SnapshotInfo> listener);

/**
* Deletes snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -665,53 +664,60 @@ private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexI
}

@Override
public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final List<IndexId> indices,
final long startTime,
final String failure,
final int totalShards,
final List<SnapshotShardFailure> shardFailures,
final long repositoryStateId,
final boolean includeGlobalState,
final MetaData clusterMetaData,
final Map<String, Object> userMetadata) {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);
public void finalizeSnapshot(final SnapshotId snapshotId,
final List<IndexId> indices,
final long startTime,
final String failure,
final int totalShards,
final List<SnapshotShardFailure> shardFailures,
final long repositoryStateId,
final boolean includeGlobalState,
final MetaData clusterMetaData,
final Map<String, Object> userMetadata,
final ActionListener<SnapshotInfo> listener) {

// Once we're done writing all metadata, we update the index-N blob to finalize the snapshot
final ActionListener<SnapshotInfo> afterMetaWrites = ActionListener.wrap(snapshotInfo -> {
writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId);
listener.onResponse(snapshotInfo);
}, ex -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", ex)));

// We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob
final GroupedActionListener<SnapshotInfo> allMetaListener =
new GroupedActionListener<>(ActionListener.map(afterMetaWrites, snapshotInfos -> {
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
return snapshotInfos.iterator().next();
}), 2 + indices.size());
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);

try {
// We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
// index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that
// decrements the generation it points at
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
// index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
// that decrements the generation it points at

// Write Global MetaData
// Write Global MetaData
executor.execute(ActionRunnable.wrap(allMetaListener, l -> {
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);
l.onResponse(null);
}));

// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
executor.execute(ActionRunnable.wrap(allMetaListener, l -> {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
}
} catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex);
l.onResponse(null);
}));
}

try {
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false);
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (FileAlreadyExistsException ex) {
// if another master was elected and took over finalizing the snapshot, it is possible
// that both nodes try to finalize the snapshot and write to the same blobs, so we just
// log a warning here and carry on
throw new RepositoryException(metadata.name(), "Blob already exists while " +
"finalizing snapshot, assume the snapshot has already been saved", ex);
} catch (IOException ex) {
throw new RepositoryException(metadata.name(), "failed to update snapshot in repository", ex);
}
return blobStoreSnapshot;
executor.execute(ActionRunnable.wrap(afterMetaWrites, afterMetaListener -> {
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);
snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
afterMetaListener.onResponse(snapshotInfo);
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,25 +561,25 @@ public void onNoLongerMaster() {
private void cleanupAfterError(Exception exception) {
threadPool.generic().execute(() -> {
if (snapshotCreated) {
try {
repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
snapshot.indices(),
snapshot.startTime(),
ExceptionsHelper.detailedMessage(exception),
0,
Collections.emptyList(),
snapshot.getRepositoryStateId(),
snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
snapshot.userMetadata());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository",
snapshot.snapshot()), inner);
}
repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
snapshot.indices(),
snapshot.startTime(),
ExceptionsHelper.stackTrace(exception),
0,
Collections.emptyList(),
snapshot.getRepositoryStateId(),
snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
snapshot.userMetadata(), ActionListener.runAfter(ActionListener.wrap(ignored -> {
}, inner -> {
inner.addSuppressed(exception);
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository",
snapshot.snapshot()), inner);
}), () -> userCreateSnapshotListener.onFailure(e)));
} else {
userCreateSnapshotListener.onFailure(e);
}
userCreateSnapshotListener.onFailure(e);
});
}
}
Expand Down Expand Up @@ -1007,7 +1007,7 @@ protected void doRun() {
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
}
}
SnapshotInfo snapshotInfo = repository.finalizeSnapshot(
repository.finalizeSnapshot(
snapshot.getSnapshotId(),
entry.indices(),
entry.startTime(),
Expand All @@ -1017,9 +1017,10 @@ protected void doRun() {
entry.getRepositoryStateId(),
entry.includeGlobalState(),
metaDataForSnapshot(entry, metaData),
entry.userMetadata());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
entry.userMetadata(), ActionListener.wrap(snapshotInfo -> {
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
}, this::onFailure));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata) {
return null;
public void finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
ActionListener<SnapshotInfo> listener) {
listener.onResponse(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,33 @@
*/
package org.elasticsearch.snapshots.mockstore;

import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;

import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MockEventuallyConsistentRepositoryTests extends ESTestCase {

private Environment environment;

@Override
public void setUp() throws Exception {
super.setUp();
final Path tempDir = createTempDir();
final String nodeName = "testNode";
environment = TestEnvironment.newEnvironment(Settings.builder()
.put(NODE_NAME_SETTING.getKey(), nodeName)
.put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath())
.put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath())
.build());
}

public void testReadAfterWriteConsistently() throws IOException {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
Expand Down Expand Up @@ -151,27 +136,37 @@ public void testOverwriteShardSnapBlobFails() throws IOException {

public void testOverwriteSnapshotInfoBlob() {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
final ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) {
xContentRegistry(), threadPool, blobStoreContext)) {
repository.start();

// We create a snap- blob for snapshot "foo" in the first generation
final PlainActionFuture<SnapshotInfo> future = PlainActionFuture.newFuture();
final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
-1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap());
-1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future);
future.actionGet();

// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
final AssertionError assertionError = expectThrows(AssertionError.class,
() -> repository.finalizeSnapshot(
snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(),
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()));
() -> {
final PlainActionFuture<SnapshotInfo> fut = PlainActionFuture.newFuture();
repository.finalizeSnapshot(
snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(),
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), fut);
fut.actionGet();
});
assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>"));

// We try to write yet another snap- blob for "foo" in the next generation.
// It passes cleanly because the content of the blob except for the timestamps.
final PlainActionFuture<SnapshotInfo> future2 = PlainActionFuture.newFuture();
repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap());
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future2);
future2.actionGet();
}
}

Expand Down
Loading