Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make BlobStoreRepository#writeIndexGen API Async #49584

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
*/
private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
ActionListener<Void> listener) throws IOException {
ActionListener<Void> listener) {

if (writeShardGens) {
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
Expand All @@ -442,14 +442,14 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
// written if all shard paths have been successfully updated.
final StepListener<RepositoryData> writeUpdatedRepoDataStep = new StepListener<>();
writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> {
final ShardGenerations.Builder builder = ShardGenerations.builder();
for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
}
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
writeIndexGen(updatedRepoData, repositoryStateId, true);
writeUpdatedRepoDataStep.onResponse(updatedRepoData);
}, listener::onFailure);
final ShardGenerations.Builder builder = ShardGenerations.builder();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting was just off here (double indent) before and I fixed it in this one.

for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
}
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
writeIndexGen(updatedRepoData, repositoryStateId, true,
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
}, listener::onFailure);
// Once we have updated the repository, run the clean-ups
writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
Expand All @@ -461,15 +461,17 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
} else {
// Write the new repository data first (with the removed snapshot), using no shard generations
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, false);
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep);
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure);
writeIndexGen(updatedRepoData, repositoryStateId, false, ActionListener.wrap(v -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep);
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener),
afterCleanupsListener::onFailure);
}, listener::onFailure));
}
}

Expand Down Expand Up @@ -650,8 +652,9 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
} else {
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(repositoryData, repositoryStateId, writeShardGens);
cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new));
writeIndexGen(repositoryData, repositoryStateId, writeShardGens,
ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData,
ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -762,11 +765,12 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
final RepositoryData updatedRepositoryData =
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens);
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
}
listener.onResponse(snapshotInfo);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
}
listener.onResponse(snapshotInfo);
}, onUpdateFailure));
}, onUpdateFailure));
}, onUpdateFailure), 2 + indices.size());
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
Expand Down Expand Up @@ -995,50 +999,58 @@ public boolean isReadOnly() {
return readOnly;
}

protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen,
final boolean writeShardGens) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
// the index file was updated by a concurrent operation, so we were operating on stale
// repository data
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
expectedGen + "], actual current generation [" + currentGen +
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
if (newGen < latestKnownGen) {
// Don't mess up the index.latest blob
throw new IllegalStateException(
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
}
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
bStream.writeLong(newGen);
genBytes = bStream.bytes();
}
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (newGen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
try {
blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
} catch (IOException e) {
logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile);
/**
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
* @param listener completion listener
*/
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
// the index file was updated by a concurrent operation, so we were operating on stale
// repository data
throw new RepositoryException(metadata.name(),
"concurrent modification of the index-N file, expected current generation [" + expectedGen +
"], actual current generation [" + currentGen + "] - possibly due to simultaneous snapshot deletion requests");
}
}
final long newGen = currentGen + 1;
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
if (newGen < latestKnownGen) {
// Don't mess up the index.latest blob
throw new IllegalStateException(
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
}
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
bStream.writeLong(newGen);
genBytes = bStream.bytes();
}
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (newGen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
try {
blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
} catch (IOException e) {
logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile);
}
}
return null;
});
}

/**
Expand Down Expand Up @@ -1432,7 +1444,7 @@ public void verify(String seed, DiscoveryNode localNode) {
public String toString() {
return "BlobStoreRepository[" +
"[" + metadata.name() +
"], [" + blobStore() + ']' +
"], [" + blobStore.get() + ']' +
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to the other changes, but this one was weird as it created a side effect (lazy initializing the blobStore) in the toString and I added that fix here.

']';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.UUIDs;
Expand All @@ -42,7 +43,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -141,7 +141,7 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
// write to and read from a index file with no entries
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0));
final RepositoryData emptyData = RepositoryData.EMPTY;
repository.writeIndexGen(emptyData, emptyData.getGenId(), true);
writeIndexGen(repository, emptyData, emptyData.getGenId());
RepositoryData repoData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository);
assertEquals(repoData, emptyData);
assertEquals(repoData.getIndices().size(), 0);
Expand All @@ -150,53 +150,55 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {

// write to and read from an index file with snapshots but no indices
repoData = addRandomSnapshotsToRepoData(repoData, false);
repository.writeIndexGen(repoData, repoData.getGenId(), true);
writeIndexGen(repository, repoData, repoData.getGenId());
assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository));

// write to and read from a index file with random repository data
repoData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
repository.writeIndexGen(repoData, repoData.getGenId(), true);
writeIndexGen(repository, repoData, repoData.getGenId());
assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository));
}

public void testIndexGenerationalFiles() throws Exception {
final BlobStoreRepository repository = setupRepo();
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), RepositoryData.EMPTY);

// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
writeIndexGen(repository, repositoryData, RepositoryData.EMPTY_REPO_GEN);
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData));
assertThat(repository.latestIndexBlobId(), equalTo(0L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));

// adding more and writing to a new index generational file
repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(1L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));

// removing a snapshot and writing to a new index generational file
repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot(
repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY);
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(2L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
}

public void testRepositoryDataConcurrentModificationNotAllowed() throws IOException {
public void testRepositoryDataConcurrentModificationNotAllowed() {
final BlobStoreRepository repository = setupRepo();

// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
final long startingGeneration = repositoryData.getGenId();
repository.writeIndexGen(repositoryData, startingGeneration, true);
final PlainActionFuture<Void> future1 = PlainActionFuture.newFuture();
repository.writeIndexGen(repositoryData, startingGeneration, true, future1);

// write repo data again to index generational file, errors because we already wrote to the
// N+1 generation from which this repository data instance was created
expectThrows(RepositoryException.class, () -> repository.writeIndexGen(
repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId(), true));
expectThrows(RepositoryException.class,
() -> writeIndexGen(repository, repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId()));
}

public void testBadChunksize() throws Exception {
Expand All @@ -213,6 +215,12 @@ public void testBadChunksize() throws Exception {
.get());
}

private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
repository.writeIndexGen(repositoryData, generation, true, future);
future.actionGet();
}

private BlobStoreRepository setupRepo() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
Expand Down