From 0f846753cdf2ac47447a1f83cf43022c54c45fad Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Mon, 23 Sep 2019 17:28:49 +0200
Subject: [PATCH 1/2] Track Shard Snapshot Generation in CS (#46864)
* Track Shard Snapshot Generation in CS
Adds communication of new shard generations from datanodes to master
and tracking of those generations in the CS.
This is a preliminary to #46250
---
.../cluster/SnapshotsInProgress.java | 37 +++++++--
.../snapshots/IndexShardSnapshotStatus.java | 25 +++---
.../repositories/FilterRepository.java | 2 +-
.../repositories/Repository.java | 2 +-
.../blobstore/BlobStoreRepository.java | 77 +++++--------------
.../snapshots/SnapshotShardsService.java | 24 +++---
.../snapshots/SnapshotsService.java | 53 +++++++------
.../cluster/SnapshotsInProgressTests.java | 12 +--
.../MetaDataIndexStateServiceTests.java | 2 +-
.../RepositoriesServiceTests.java | 2 +-
.../BlobStoreRepositoryRestoreTests.java | 3 +-
.../repositories/fs/FsRepositoryTests.java | 9 ++-
...SnapshotsInProgressSerializationTests.java | 2 +-
.../index/shard/IndexShardTestCase.java | 25 +++---
.../index/shard/RestoreOnlyRepository.java | 2 +-
.../xpack/ccr/repository/CcrRepository.java | 2 +-
.../SourceOnlySnapshotRepository.java | 2 +-
.../SourceOnlySnapshotShardTests.java | 27 +++----
18 files changed, 159 insertions(+), 149 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
index d30f488c5d70d..da4a8d9cc6eea 100644
--- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
+++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
@@ -35,6 +35,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
+import org.elasticsearch.snapshots.SnapshotsService;
import java.io.IOException;
import java.util.ArrayList;
@@ -307,20 +308,26 @@ public static boolean completed(ObjectContainer shards) {
public static class ShardSnapshotStatus {
private final ShardState state;
private final String nodeId;
+
+ @Nullable
+ private final String generation;
+
+ @Nullable
private final String reason;
- public ShardSnapshotStatus(String nodeId) {
- this(nodeId, ShardState.INIT);
+ public ShardSnapshotStatus(String nodeId, String generation) {
+ this(nodeId, ShardState.INIT, generation);
}
- public ShardSnapshotStatus(String nodeId, ShardState state) {
- this(nodeId, state, null);
+ public ShardSnapshotStatus(String nodeId, ShardState state, String generation) {
+ this(nodeId, state, null, generation);
}
- public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
+ public ShardSnapshotStatus(String nodeId, ShardState state, String reason, String generation) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
+ this.generation = generation;
// If the state is failed we have to have a reason for this failure
assert state.failed() == false || reason != null;
}
@@ -328,6 +335,12 @@ public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = ShardState.fromValue(in.readByte());
+ if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
+ generation = in.readOptionalString();
+ assert generation != null || state != ShardState.SUCCESS : "Received null generation for shard state [" + state + "]";
+ } else {
+ generation = null;
+ }
reason = in.readOptionalString();
}
@@ -339,6 +352,10 @@ public String nodeId() {
return nodeId;
}
+ public String generation() {
+ return this.generation;
+ }
+
public String reason() {
return reason;
}
@@ -346,6 +363,9 @@ public String reason() {
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
+ if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
+ out.writeOptionalString(generation);
+ }
out.writeOptionalString(reason);
}
@@ -354,8 +374,8 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
- return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;
-
+ return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason)
+ && Objects.equals(generation, status.generation) && state == status.state;
}
@Override
@@ -363,12 +383,13 @@ public int hashCode() {
int result = state != null ? state.hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (reason != null ? reason.hashCode() : 0);
+ result = 31 * result + (generation != null ? generation.hashCode() : 0);
return result;
}
@Override
public String toString() {
- return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
+ return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]";
}
}
diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java
index 97cadce643097..129ce08ebd825 100644
--- a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java
+++ b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java
@@ -58,6 +58,7 @@ public enum Stage {
}
private final AtomicReference stage;
+ private final AtomicReference generation;
private long startTime;
private long totalTime;
private int incrementalFileCount;
@@ -71,9 +72,10 @@ public enum Stage {
private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime,
final int incrementalFileCount, final int totalFileCount, final int processedFileCount,
- final long incrementalSize, final long totalSize, final long processedSize,
- final long indexVersion, final String failure) {
+ final long incrementalSize, final long totalSize, final long processedSize, final String failure,
+ final String generation) {
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
+ this.generation = new AtomicReference<>(generation);
this.startTime = startTime;
this.totalTime = totalTime;
this.incrementalFileCount = incrementalFileCount;
@@ -82,7 +84,6 @@ private IndexShardSnapshotStatus(final Stage stage, final long startTime, final
this.totalSize = totalSize;
this.processedSize = processedSize;
this.incrementalSize = incrementalSize;
- this.indexVersion = indexVersion;
this.failure = failure;
}
@@ -111,9 +112,11 @@ public synchronized Copy moveToFinalize(final long indexVersion) {
return asCopy();
}
- public synchronized void moveToDone(final long endTime) {
+ public synchronized void moveToDone(final long endTime, final String newGeneration) {
+ assert newGeneration != null;
if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) {
this.totalTime = Math.max(0L, endTime - startTime);
+ this.generation.set(newGeneration);
} else {
throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " +
"expecting [FINALIZE] but got [" + stage.get() + "]");
@@ -133,6 +136,10 @@ public synchronized void moveToFailed(final long endTime, final String failure)
}
}
+ public String generation() {
+ return generation.get();
+ }
+
public boolean isAborted() {
return stage.get() == Stage.ABORTED;
}
@@ -158,8 +165,8 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() {
indexVersion, failure);
}
- public static IndexShardSnapshotStatus newInitializing() {
- return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, 0, null);
+ public static IndexShardSnapshotStatus newInitializing(String generation) {
+ return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation);
}
public static IndexShardSnapshotStatus newFailed(final String failure) {
@@ -167,15 +174,15 @@ public static IndexShardSnapshotStatus newFailed(final String failure) {
if (failure == null) {
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
}
- return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, 0, failure);
+ return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null);
}
public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime,
final int incrementalFileCount, final int fileCount,
- final long incrementalSize, final long size) {
+ final long incrementalSize, final long size, String generation) {
// The snapshot is done which means the number of processed files is the same as total
return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, incrementalFileCount, fileCount, incrementalFileCount,
- incrementalSize, size, incrementalSize, 0, null);
+ incrementalSize, size, incrementalSize, null, generation);
}
/**
diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
index 775fd46edd81a..45bbf1fd4ec66 100644
--- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
@@ -122,7 +122,7 @@ public boolean isReadOnly() {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
- IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java
index 355920478ae4f..d249278c1fa40 100644
--- a/server/src/main/java/org/elasticsearch/repositories/Repository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java
@@ -208,7 +208,7 @@ void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTi
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
- IndexShardSnapshotStatus snapshotStatus, ActionListener listener);
+ IndexShardSnapshotStatus snapshotStatus, ActionListener listener);
/**
* Restores snapshot of the shard.
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 159f9df239e94..b1c065814a0f8 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -122,45 +122,9 @@
*
* This repository works with any {@link BlobStore} implementation. The blobStore could be (and preferred) lazy initialized in
* {@link #createBlobStore()}.
- *
- * BlobStoreRepository maintains the following structure in the blob store
- *
- * {@code
- * STORE_ROOT
- * |- index-N - JSON serialized {@link RepositoryData} containing a list of all snapshot ids and the indices belonging to
- * | each snapshot, N is the generation of the file
- * |- index.latest - contains the numeric value of the latest generation of the index file (i.e. N from above)
- * |- incompatible-snapshots - list of all snapshot ids that are no longer compatible with the current version of the cluster
- * |- snap-20131010.dat - SMILE serialized {@link SnapshotInfo} for snapshot "20131010"
- * |- meta-20131010.dat - SMILE serialized {@link MetaData} for snapshot "20131010" (includes only global metadata)
- * |- snap-20131011.dat - SMILE serialized {@link SnapshotInfo} for snapshot "20131011"
- * |- meta-20131011.dat - SMILE serialized {@link MetaData} for snapshot "20131011"
- * .....
- * |- indices/ - data for all indices
- * |- Ac1342-B_x/ - data for index "foo" which was assigned the unique id of Ac1342-B_x in the repository
- * | |- meta-20131010.dat - JSON Serialized {@link IndexMetaData} for index "foo"
- * | |- 0/ - data for shard "0" of index "foo"
- * | | |- __1 \ (files with numeric names were created by older ES versions)
- * | | |- __2 |
- * | | |- __VPO5oDMVT5y4Akv8T_AO_A |- files from different segments see snap-* for their mappings to real segment files
- * | | |- __1gbJy18wS_2kv1qI7FgKuQ |
- * | | |- __R8JvZAHlSMyMXyZc2SS8Zg /
- * | | .....
- * | | |- snap-20131010.dat - SMILE serialized {@link BlobStoreIndexShardSnapshot} for snapshot "20131010"
- * | | |- snap-20131011.dat - SMILE serialized {@link BlobStoreIndexShardSnapshot} for snapshot "20131011"
- * | | |- index-123 - SMILE serialized {@link BlobStoreIndexShardSnapshots} for the shard
- * | |
- * | |- 1/ - data for shard "1" of index "foo"
- * | | |- __1
- * | | .....
- * | |
- * | |-2/
- * | ......
- * |
- * |- 1xB0D8_B3y/ - data for index "bar" which was assigned the unique id of 1xB0D8_B3y in the repository
- * ......
- * }
- *
+ *
+ * For in depth documentation on how exactly implementations of this class interact with the snapshot functionality please refer to the
+ * documentation of the package {@link org.elasticsearch.repositories.blobstore}.
*/
public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository {
private static final Logger logger = LogManager.getLogger(BlobStoreRepository.class);
@@ -514,7 +478,7 @@ private void cleanupStaleBlobs(Map foundIndices, MapDeleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
*
* @param repositoryStateId Current repository state id
- * @param listener Lister to complete when done
+ * @param listener Listener to complete when done
*/
public void cleanup(long repositoryStateId, ActionListener listener) {
try {
@@ -998,11 +962,10 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
- IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
-
- final ActionListener snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> {
+ final ActionListener snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> {
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e));
listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e));
});
@@ -1010,14 +973,14 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
final BlobContainer shardContainer = shardContainer(indexId, shardId);
- final Map blobs;
+ final Set blobs;
try {
- blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX);
+ blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}
- Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs.keySet(), shardContainer);
+ Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
long fileListGeneration = tuple.v2();
@@ -1111,7 +1074,6 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
- // delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
@@ -1124,8 +1086,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
// Delete all previous index-N blobs
- blobsToDelete =
- blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
+ blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
.max().orElse(-1L) < Long.parseLong(indexGeneration)
: "Tried to delete an index-N blob newer than the current generation [" + indexGeneration
@@ -1141,8 +1102,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
snapshotId, shardId), e);
}
- snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis());
- snapshotDoneListener.onResponse(null);
+ snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration);
+ snapshotDoneListener.onResponse(indexGeneration);
}, snapshotDoneListener::onFailure);
if (indexIncrementalFileCount == 0) {
allFilesUploadedListener.onResponse(Collections.emptyList());
@@ -1246,7 +1207,7 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(),
snapshot.incrementalFileCount(), snapshot.totalFileCount(),
- snapshot.incrementalSize(), snapshot.totalSize());
+ snapshot.incrementalSize(), snapshot.totalSize(), null); // Not adding a real generation here as it doesn't matter to callers
}
@Override
@@ -1301,14 +1262,14 @@ public String toString() {
private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId,
SnapshotId snapshotId) throws IOException {
final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId);
- final Map blobs;
+ final Set blobs;
try {
- blobs = shardContainer.listBlobs();
+ blobs = shardContainer.listBlobs().keySet();
} catch (IOException e) {
throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e);
}
- Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs.keySet(), shardContainer);
+ Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
long fileListGeneration = tuple.v2();
@@ -1326,7 +1287,7 @@ private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData reposit
final List blobsToDelete;
if (newSnapshotsList.isEmpty()) {
// If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
- blobsToDelete = new ArrayList<>(blobs.keySet());
+ blobsToDelete = new ArrayList<>(blobs);
} else {
final Set survivingSnapshotUUIDs = repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getUUID)
.collect(Collectors.toSet());
@@ -1344,9 +1305,9 @@ private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData reposit
// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
// temporary blobs
- private static List unusedBlobs(Map blobs, Set survivingSnapshotUUIDs,
+ private static List unusedBlobs(Set blobs, Set survivingSnapshotUUIDs,
BlobStoreIndexShardSnapshots updatedSnapshots) {
- return blobs.keySet().stream().filter(blob ->
+ return blobs.stream().filter(blob ->
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|| (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat")
&& survivingSnapshotUUIDs.contains(
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
index 083382b74b03f..ec9d4d7394df0 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
@@ -243,7 +243,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
if (startedShards == null) {
startedShards = new HashMap<>();
}
- startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing());
+ startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing(shardSnapshotStatus.generation()));
}
}
if (startedShards != null && startedShards.isEmpty() == false) {
@@ -280,14 +280,17 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() {
+ snapshot(shardId, snapshot, indexId, snapshotStatus, new ActionListener() {
@Override
- public void onResponse(final Void aVoid) {
+ public void onResponse(String newGeneration) {
+ assert newGeneration != null;
+ assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
- logger.debug("snapshot ({}) completed to {} with {}", snapshot, snapshot.getRepository(), lastSnapshotStatus);
+ logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]",
+ snapshot, snapshot.getRepository(), lastSnapshotStatus, snapshotStatus.generation());
}
- notifySuccessfulSnapshotShard(snapshot, shardId);
+ notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
}
@Override
@@ -307,7 +310,7 @@ public void onFailure(Exception e) {
* @param snapshotStatus snapshot status
*/
private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId,
- final IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ final IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
if (indexShard.routingEntry().primary() == false) {
@@ -365,7 +368,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
// but we think the shard is done - we need to make new master know that the shard is done
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " +
"updating status on the master", snapshot.snapshot(), shardId);
- notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId);
+ notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localShard.getValue().generation());
} else if (stage == Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
@@ -437,15 +440,16 @@ public String toString() {
}
/** Notify the master node that the given shard has been successfully snapshotted **/
- private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) {
+ private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, String generation) {
+ assert generation != null;
sendSnapshotShardUpdate(snapshot, shardId,
- new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS));
+ new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS, generation));
}
/** Notify the master node that the given shard failed to be snapshotted **/
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) {
sendSnapshotShardUpdate(snapshot, shardId,
- new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure));
+ new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, null));
}
/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
index 1dbd4be144178..c94600e0e80af 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
@@ -123,6 +123,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
*/
public static final Version NO_REPO_INITIALIZE_VERSION = Version.V_7_5_0;
+ public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0;
+
private static final Logger logger = LogManager.getLogger(SnapshotsService.class);
private final ClusterService clusterService;
@@ -351,7 +353,7 @@ public TimeValue timeout() {
* @param snapshotName snapshot name
* @param state current cluster state
*/
- private void validate(String repositoryName, String snapshotName, ClusterState state) {
+ private static void validate(String repositoryName, String snapshotName, ClusterState state) {
RepositoriesMetaData repositoriesMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE);
if (repositoriesMetaData == null || repositoriesMetaData.repository(repositoryName) == null) {
throw new RepositoryMissingException(repositoryName);
@@ -414,8 +416,9 @@ protected void doRun() {
throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
}
final String snapshotName = snapshot.snapshot().getSnapshotId().getName();
+ final RepositoryData repositoryData = repository.getRepositoryData();
// check if the snapshot name already exists in the repository
- if (repository.getRepositoryData().getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
+ if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
}
@@ -746,7 +749,7 @@ public void applyClusterState(ClusterChangedEvent event) {
).forEach(entry -> endSnapshot(entry, event.state().metaData()));
}
if (newMaster) {
- finalizeSnapshotDeletionFromPreviousMaster(event);
+ finalizeSnapshotDeletionFromPreviousMaster(event.state());
}
}
} catch (Exception e) {
@@ -765,8 +768,8 @@ public void applyClusterState(ClusterChangedEvent event) {
* the old master's snapshot deletion will just respond with an error but in actuality, the
* snapshot was deleted and a call to GET snapshots would reveal that the snapshot no longer exists.
*/
- private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent event) {
- SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE);
+ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterState state) {
+ SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0);
@@ -805,7 +808,8 @@ public ClusterState execute(ClusterState currentState) {
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
shardId, shardStatus.nodeId());
shards.put(shardId,
- new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown"));
+ new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown",
+ shardStatus.generation()));
}
} else {
shards.put(shardId, shardStatus);
@@ -914,7 +918,8 @@ private static ImmutableOpenMap processWaitingShar
// Shard that we were waiting for has started on a node, let's process it
snapshotChanged = true;
logger.trace("starting shard that we were waiting for [{}] on node [{}]", shardId, shardStatus.nodeId());
- shards.put(shardId, new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId()));
+ shards.put(shardId,
+ new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId(), shardStatus.generation()));
continue;
} else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) {
// Shard that we were waiting for hasn't started yet or still relocating - will continue to wait
@@ -926,7 +931,8 @@ private static ImmutableOpenMap processWaitingShar
// Shard that we were waiting for went into unassigned state or disappeared - giving up
snapshotChanged = true;
logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId());
- shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned"));
+ shards.put(shardId, new ShardSnapshotStatus(
+ shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned", shardStatus.generation()));
} else {
shards.put(shardId, shardStatus);
}
@@ -971,7 +977,7 @@ private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsIn
* @param shards list of shard statuses
* @return list of failed and closed indices
*/
- private Tuple, Set> indicesWithMissingShards(
+ private static Tuple, Set> indicesWithMissingShards(
ImmutableOpenMap shards, MetaData metaData) {
Set missing = new HashSet<>();
Set closed = new HashSet<>();
@@ -1164,13 +1170,14 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam
*/
private void deleteSnapshot(final Snapshot snapshot, final ActionListener listener, final long repositoryStateId,
final boolean immediatePriority) {
+ logger.info("deleting snapshot [{}]", snapshot);
Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL;
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
boolean waitForSnapshot = false;
@Override
- public ClusterState execute(ClusterState currentState) throws Exception {
+ public ClusterState execute(ClusterState currentState) {
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(snapshot,
@@ -1230,7 +1237,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (status.state().completed() == false) {
- status = new ShardSnapshotStatus(status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion");
+ status = new ShardSnapshotStatus(
+ status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation());
}
shardsBuilder.put(shardEntry.key, status);
}
@@ -1416,8 +1424,7 @@ private static ImmutableOpenMap shards = ImmutableOpenMap.builder();
// test more than one waiting shard in an index
- shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
- shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
- shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
+ shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
+ shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
+ shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
// test exactly one waiting shard in an index
- shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
- shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
+ shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1"));
+ shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
// test no waiting shards in an index
- shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
+ shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT,
indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata());
diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java
index a7ad9d79e7439..244bdcc21ac5b 100644
--- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java
@@ -465,7 +465,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh
final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder();
for (ShardRouting shardRouting : newState.routingTable().index(index).randomAllActiveShardsIt()) {
- shardsBuilder.put(shardRouting.shardId(), new SnapshotsInProgress.ShardSnapshotStatus(shardRouting.currentNodeId()));
+ shardsBuilder.put(shardRouting.shardId(), new SnapshotsInProgress.ShardSnapshotStatus(shardRouting.currentNodeId(), "1"));
}
final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));
diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
index e25e342c596c8..0052cef1a3fe2 100644
--- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
@@ -203,7 +203,7 @@ public boolean isReadOnly() {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
- snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
}
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
index a904879321d58..b2f21e89a7d61 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java
@@ -159,7 +159,8 @@ public void testSnapshotWithConflictingName() throws IOException {
// snapshot the shard
final Repository repository = createRepository();
final Snapshot snapshot = new Snapshot(repository.getMetadata().name(), new SnapshotId(randomAlphaOfLength(10), "_uuid"));
- snapshotShard(shard, snapshot, repository);
+ final String shardGen = snapshotShard(shard, snapshot, repository);
+ assertNotNull(shardGen);
final Snapshot snapshotWithSameName = new Snapshot(repository.getMetadata().name(), new SnapshotId(
snapshot.getSnapshotId().getName(), "_uuid2"));
IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class,
diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java
index e1c1f49dac8c7..c98a606402bff 100644
--- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java
@@ -100,15 +100,16 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException {
IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());
IndexCommit indexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
- final PlainActionFuture future1 = PlainActionFuture.newFuture();
+ final PlainActionFuture future1 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> {
- IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
+ IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
repository.snapshotShard(store, null, snapshotId, indexId, indexCommit,
snapshotStatus, future1);
future1.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
});
+ final String shardGeneration = future1.actionGet();
Lucene.cleanLuceneIndex(directory);
expectThrows(org.apache.lucene.index.IndexNotFoundException.class, () -> Lucene.readSegmentInfos(directory));
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
@@ -127,9 +128,9 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException {
SnapshotId incSnapshotId = new SnapshotId("test1", "test1");
IndexCommit incIndexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
Collection commitFileNames = incIndexCommit.getFileNames();
- final PlainActionFuture future2 = PlainActionFuture.newFuture();
+ final PlainActionFuture future2 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> {
- IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
+ IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, future2);
future2.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java
index 03a1e96ea2a89..7d59c71ed18ea 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java
@@ -72,7 +72,7 @@ private Entry randomSnapshot() {
String nodeId = randomAlphaOfLength(10);
ShardState shardState = randomFrom(ShardState.values());
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState,
- shardState.failed() ? randomAlphaOfLength(10) : null));
+ shardState.failed() ? randomAlphaOfLength(10) : null, "1"));
}
}
ImmutableOpenMap shards = builder.build();
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
index 21d1d679d4f5a..c9b0ed3e0fb29 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java
@@ -812,25 +812,30 @@ protected void recoverShardFromSnapshot(final IndexShard shard,
shard.recoveryState());
}
- /** Snapshot a shard using a given repository **/
- protected void snapshotShard(final IndexShard shard,
- final Snapshot snapshot,
- final Repository repository) throws IOException {
- final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
- final PlainActionFuture future = PlainActionFuture.newFuture();
+ /**
+ * Snapshot a shard using a given repository.
+ *
+ * @return new shard generation
+ */
+ protected String snapshotShard(final IndexShard shard,
+ final Snapshot snapshot,
+ final Repository repository) throws IOException {
+ final Index index = shard.shardId().getIndex();
+ final IndexId indexId = new IndexId(index.getName(), index.getUUID());
+ final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
+ final PlainActionFuture future = PlainActionFuture.newFuture();
+ final String shardGen;
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
- Index index = shard.shardId().getIndex();
- IndexId indexId = new IndexId(index.getName(), index.getUUID());
-
repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
indexCommitRef.getIndexCommit(), snapshotStatus, future);
- future.actionGet();
+ shardGen = future.actionGet();
}
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
assertEquals(IndexShardSnapshotStatus.Stage.DONE, lastSnapshotStatus.getStage());
assertEquals(shard.snapshotStoreMetadata().size(), lastSnapshotStatus.getTotalFileCount());
assertNull(lastSnapshotStatus.getFailure());
+ return shardGen;
}
/**
diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
index 072170afa956d..86b1fefa190e1 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java
@@ -135,7 +135,7 @@ public boolean isReadOnly() {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
- IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
}
@Override
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
index cd071e4c448ec..d6fcd8fa24863 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java
@@ -292,7 +292,7 @@ public boolean isReadOnly() {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
- IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java
index 711abf781894c..c979856d4e951 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java
@@ -133,7 +133,7 @@ private static MetaData metadataToSnapshot(List indices, MetaData metaD
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
- IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
if (mapperService.documentMapper() != null // if there is no mapping this is null
&& mapperService.documentMapper().sourceMapper().isComplete() == false) {
listener.onFailure(
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java
index 3a4151d5ba0c7..db290ea01e246 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java
@@ -96,8 +96,8 @@ public void testSourceIncomplete() throws IOException {
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
- IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
- final PlainActionFuture future = PlainActionFuture.newFuture();
+ IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1");
+ final PlainActionFuture future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet);
@@ -118,14 +118,15 @@ public void testIncrementalSnapshot() throws IOException {
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
repository.start();
- int totalFileCount = -1;
+ int totalFileCount;
+ String shardGeneration;
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
- IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
+ IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
SnapshotId snapshotId = new SnapshotId("test", "test");
- final PlainActionFuture future = PlainActionFuture.newFuture();
+ final PlainActionFuture future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
- future.actionGet();
+ shardGeneration = future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
totalFileCount = copy.getTotalFileCount();
@@ -137,11 +138,11 @@ public void testIncrementalSnapshot() throws IOException {
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
SnapshotId snapshotId = new SnapshotId("test_1", "test_1");
- IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
- final PlainActionFuture future = PlainActionFuture.newFuture();
+ IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
+ final PlainActionFuture future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
- future.actionGet();
+ shardGeneration = future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt
assertEquals(5, copy.getIncrementalFileCount());
@@ -153,8 +154,8 @@ public void testIncrementalSnapshot() throws IOException {
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
SnapshotId snapshotId = new SnapshotId("test_2", "test_2");
- IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
- final PlainActionFuture future = PlainActionFuture.newFuture();
+ IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
+ final PlainActionFuture future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future));
future.actionGet();
@@ -200,8 +201,8 @@ public void testRestoreMinmal() throws IOException {
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
- IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
- final PlainActionFuture future = PlainActionFuture.newFuture();
+ IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
+ final PlainActionFuture future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
indexShardSnapshotStatus, future);
From 0b6fe77d8c06381f8199aa06044a507c37947937 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Tue, 22 Oct 2019 19:26:28 +0100
Subject: [PATCH 2/2] Track Shard-Snapshot Index Generation at Repository Root
(#46250)
This change adds a new field `"shards"` to `RepositoryData` that contains a mapping of `IndexId` to a `String[]`. This string array can be accessed by shard id to get the generation of a shard's shard folder (i.e. the `N` in the name of the currently valid `/indices/${indexId}/${shardId}/index-${N}` for the shard in question).
This allows for creating a new snapshot in the shard without doing any LIST operations on the shard's folder. In the case of AWS S3, this saves about 1/3 of the cost for updating an empty shard (see #45736) and removes one out of two remaining potential issues with eventually consistent blob stores (see #38941 ... now only the root `index-${N}` is determined by listing).
Also and equally if not more important, a number of possible failure modes on eventually consistent blob stores like AWS S3 are eliminated by moving all delete operations to the `master` node and moving from incremental naming of shard level index-N to uuid suffixes for these blobs.
This change moves the deleting of the previous shard level `index-${uuid}` blob to the master node instead of the data node allowing for a safe and consistent update of the shard's generation in the `RepositoryData` by first updating `RepositoryData` and then deleting the now unreferenced `index-${newUUID}` blob.
__No deletes are executed on the data nodes at all for any operation with this change.__
Note also: Previous issues with hanging data nodes interfering with master nodes are completely impossible, even on S3 (see next section for details).
This change changes the naming of the shard level `index-${N}` blobs to a uuid suffix `index-${UUID}`. The reason for this is the fact that writing a new shard-level `index-` generation blob is not atomic anymore in its effect. Not only does the blob have to be written to have an effect, it must also be referenced by the root level `index-N` (`RepositoryData`) to become an effective part of the snapshot repository.
This leads to a problem if we were to use incrementing names like we did before. If a blob `index-${N+1}` is written but due to the node/network/cluster/... crashes the root level `RepositoryData` has not been updated then a future operation will determine the shard's generation to be `N` and try to write a new `index-${N+1}` to the already existing path. Updates like that are problematic on S3 for consistency reasons, but also create numerous issues when thinking about stuck data nodes.
Previously stuck data nodes that were tasked to write `index-${N+1}` but got stuck and tried to do so after some other node had already written `index-${N+1}` were prevented form doing so (except for on S3) by us not allowing overwrites for that blob and thus no corruption could occur.
Were we to continue using incrementing names, we could not do this. The stuck node scenario would either allow for overwriting the `N+1` generation or force us to continue using a `LIST` operation to figure out the next `N` (which would make this change pointless).
With uuid naming and moving all deletes to `master` this becomes a non-issue. Data nodes write updated shard generation `index-${uuid}` and `master` makes those `index-${uuid}` part of the `RepositoryData` that it deems correct and cleans up all those `index-` that are unused.
Co-authored-by: Yannick Welsch
Co-authored-by: Tanguy Leroux
---
.../TransportCleanupRepositoryAction.java | 5 +-
.../cluster/SnapshotsInProgress.java | 39 +-
.../repositories/FilterRepository.java | 20 +-
.../repositories/Repository.java | 33 +-
.../repositories/RepositoryData.java | 89 +++-
.../repositories/ShardGenerations.java | 215 +++++++++
.../blobstore/BlobStoreRepository.java | 446 +++++++++++-------
.../repositories/blobstore/package-info.java | 34 +-
.../snapshots/SnapshotShardsService.java | 16 +-
.../snapshots/SnapshotsService.java | 108 +++--
.../cluster/ClusterStateDiffIT.java | 3 +-
.../cluster/SnapshotsInProgressTests.java | 2 +-
.../MetaDataDeleteIndexServiceTests.java | 2 +-
.../MetaDataIndexStateServiceTests.java | 2 +-
.../RepositoriesServiceTests.java | 9 +-
.../repositories/RepositoryDataTests.java | 39 +-
.../BlobStoreRepositoryRestoreTests.java | 12 +
.../blobstore/BlobStoreRepositoryTests.java | 27 +-
.../repositories/fs/FsRepositoryTests.java | 5 +-
.../SharedClusterSnapshotRestoreIT.java | 112 ++++-
...SnapshotsInProgressSerializationTests.java | 2 +-
...ckEventuallyConsistentRepositoryTests.java | 14 +-
.../index/shard/IndexShardTestCase.java | 5 +-
.../index/shard/RestoreOnlyRepository.java | 18 +-
.../blobstore/BlobStoreTestUtil.java | 51 +-
.../xpack/ccr/repository/CcrRepository.java | 14 +-
.../SourceOnlySnapshotRepository.java | 20 +-
.../SourceOnlySnapshotShardTests.java | 15 +-
.../xpack/slm/SnapshotRetentionTaskTests.java | 3 +-
29 files changed, 1002 insertions(+), 358 deletions(-)
create mode 100644 server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java
index 42b5160417114..15e58e9165029 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java
@@ -42,6 +42,7 @@
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
+import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@@ -200,7 +201,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
- repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
+ repositoryStateId,
+ newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
+ ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
}
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
index da4a8d9cc6eea..22bee83e3482c 100644
--- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
+++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java
@@ -91,12 +91,14 @@ public static class Entry implements ToXContent {
private final ImmutableOpenMap> waitingIndices;
private final long startTime;
private final long repositoryStateId;
+ // see #useShardGenerations
+ private final boolean useShardGenerations;
@Nullable private final Map userMetadata;
@Nullable private final String failure;
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices,
long startTime, long repositoryStateId, ImmutableOpenMap shards,
- String failure, Map userMetadata) {
+ String failure, Map userMetadata, boolean useShardGenerations) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
@@ -114,6 +116,7 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
+ this.useShardGenerations = useShardGenerations;
}
private static boolean assertShardsConsistent(State state, List indices,
@@ -128,20 +131,22 @@ private static boolean assertShardsConsistent(State state, List indices
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
return true;
}
+
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices,
long startTime, long repositoryStateId, ImmutableOpenMap shards,
- Map userMetadata) {
- this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata);
+ Map userMetadata, boolean useShardGenerations) {
+ this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
+ useShardGenerations);
}
public Entry(Entry entry, State state, ImmutableOpenMap shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
- entry.repositoryStateId, shards, entry.failure, entry.userMetadata);
+ entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
}
public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
- entry.repositoryStateId, shards, failure, entry.userMetadata);
+ entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations);
}
public Entry(Entry entry, ImmutableOpenMap shards) {
@@ -192,6 +197,16 @@ public String failure() {
return failure;
}
+ /**
+ * Whether to write to the repository in a format only understood by versions newer than
+ * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
+ *
+ * @return true if writing to repository in new format
+ */
+ public boolean useShardGenerations() {
+ return useShardGenerations;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -207,6 +222,7 @@ public boolean equals(Object o) {
if (!snapshot.equals(entry.snapshot)) return false;
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
+ if (useShardGenerations != entry.useShardGenerations) return false;
return true;
}
@@ -221,6 +237,7 @@ public int hashCode() {
result = 31 * result + indices.hashCode();
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
+ result = 31 * result + (useShardGenerations ? 1 : 0);
return result;
}
@@ -518,6 +535,12 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
userMetadata = in.readMap();
}
+ final boolean useShardGenerations;
+ if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
+ useShardGenerations = in.readBoolean();
+ } else {
+ useShardGenerations = false;
+ }
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
@@ -527,7 +550,8 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
repositoryStateId,
builder.build(),
failure,
- userMetadata
+ userMetadata,
+ useShardGenerations
);
}
this.entries = Arrays.asList(entries);
@@ -563,6 +587,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(entry.userMetadata);
}
+ if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
+ out.writeBoolean(entry.useShardGenerations);
+ }
}
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
index 45bbf1fd4ec66..0693cc17be80d 100644
--- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
@@ -78,16 +78,17 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met
}
@Override
- public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards,
- List shardFailures, long repositoryStateId, boolean includeGlobalState,
- MetaData metaData, Map userMetadata, ActionListener listener) {
- in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
- includeGlobalState, metaData, userMetadata, listener);
+ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
+ int totalShards, List shardFailures, long repositoryStateId,
+ boolean includeGlobalState, MetaData metaData, Map userMetadata,
+ boolean writeShardGens, ActionListener listener) {
+ in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
+ includeGlobalState, metaData, userMetadata, writeShardGens, listener);
}
@Override
- public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) {
- in.deleteSnapshot(snapshotId, repositoryStateId, listener);
+ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) {
+ in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
}
@Override
@@ -122,8 +123,9 @@ public boolean isReadOnly() {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
- IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
- in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
+ IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+ ActionListener listener) {
+ in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java
index d249278c1fa40..8fc8f40270366 100644
--- a/server/src/main/java/org/elasticsearch/repositories/Repository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java
@@ -125,28 +125,33 @@ default Repository create(RepositoryMetaData metaData, Function
* This method is called on master after all shards are snapshotted.
*
- * @param snapshotId snapshot id
- * @param indices list of indices in the snapshot
- * @param startTime start time of the snapshot
- * @param failure global failure reason or null
- * @param totalShards total number of shards
- * @param shardFailures list of shard failures
- * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
+ * @param snapshotId snapshot id
+ * @param shardGenerations updated shard generations
+ * @param startTime start time of the snapshot
+ * @param failure global failure reason or null
+ * @param totalShards total number of shards
+ * @param shardFailures list of shard failures
+ * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param includeGlobalState include cluster global state
+ * @param clusterMetaData cluster metadata
+ * @param userMetadata user metadata
+ * @param writeShardGens if shard generations should be written to the repository
* @param listener listener to be called on completion of the snapshot
*/
- void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards,
- List shardFailures, long repositoryStateId, boolean includeGlobalState,
- MetaData clusterMetaData, Map userMetadata, ActionListener listener);
+ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
+ int totalShards, List shardFailures, long repositoryStateId,
+ boolean includeGlobalState, MetaData clusterMetaData, Map userMetadata,
+ boolean writeShardGens, ActionListener listener);
/**
* Deletes snapshot
*
- * @param snapshotId snapshot id
+ * @param snapshotId snapshot id
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
- * @param listener completion listener
+ * @param writeShardGens if shard generations should be written to the repository
+ * @param listener completion listener
*/
- void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener);
+ void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener);
/**
* Returns snapshot throttle time in nanoseconds
@@ -208,7 +213,7 @@ void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTi
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
- IndexShardSnapshotStatus snapshotStatus, ActionListener listener);
+ IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener listener);
/**
* Restores snapshot of the shard.
diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java
index cf57776fc50c2..8589d2efdfc6d 100644
--- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java
+++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java
@@ -25,6 +25,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotState;
@@ -55,7 +56,7 @@ public final class RepositoryData {
* An instance initialized for an empty repository.
*/
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN,
- Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+ Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
/**
* The generational id of the index file from which the repository data was read.
@@ -78,19 +79,30 @@ public final class RepositoryData {
*/
private final Map> indexSnapshots;
+ /**
+ * Shard generations.
+ */
+ private final ShardGenerations shardGenerations;
public RepositoryData(long genId, Map snapshotIds, Map snapshotStates,
- Map> indexSnapshots) {
+ Map> indexSnapshots, ShardGenerations shardGenerations) {
this.genId = genId;
this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet().stream()
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
+ this.shardGenerations = Objects.requireNonNull(shardGenerations);
+ assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
+ + shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
}
protected RepositoryData copy() {
- return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots);
+ return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots, shardGenerations);
+ }
+
+ public ShardGenerations shardGenerations() {
+ return shardGenerations;
}
/**
@@ -140,10 +152,15 @@ public List indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId)
/**
* Add a snapshot and its indices to the repository; returns a new instance. If the snapshot
* already exists in the repository data, this method throws an IllegalArgumentException.
+ *
+ * @param snapshotId Id of the new snapshot
+ * @param snapshotState State of the new snapshot
+ * @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new
+ * generations indexed by the shard id they correspond to must be supplied.
*/
public RepositoryData addSnapshot(final SnapshotId snapshotId,
final SnapshotState snapshotState,
- final List snapshottedIndices) {
+ final ShardGenerations shardGenerations) {
if (snapshotIds.containsKey(snapshotId.getUUID())) {
// if the snapshot id already exists in the repository data, it means an old master
// that is blocked from the cluster is trying to finalize a snapshot concurrently with
@@ -155,21 +172,11 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId,
Map newSnapshotStates = new HashMap<>(snapshotStates);
newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
Map> allIndexSnapshots = new HashMap<>(indexSnapshots);
- for (final IndexId indexId : snapshottedIndices) {
- if (allIndexSnapshots.containsKey(indexId)) {
- Set ids = allIndexSnapshots.get(indexId);
- if (ids == null) {
- ids = new LinkedHashSet<>();
- allIndexSnapshots.put(indexId, ids);
- }
- ids.add(snapshotId);
- } else {
- Set ids = new LinkedHashSet<>();
- ids.add(snapshotId);
- allIndexSnapshots.put(indexId, ids);
- }
+ for (final IndexId indexId : shardGenerations.indices()) {
+ allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId);
}
- return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots);
+ return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots,
+ ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build());
}
/**
@@ -182,13 +189,18 @@ public RepositoryData withGenId(long newGeneration) {
if (newGeneration == genId) {
return this;
}
- return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots);
+ return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations);
}
/**
* Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot.
+ *
+ * @param snapshotId Snapshot Id
+ * @param updatedShardGenerations Shard generations that changed as a result of removing the snapshot.
+ * The {@code String[]} passed for each {@link IndexId} contains the new shard generation id for each
+ * changed shard indexed by its shardId
*/
- public RepositoryData removeSnapshot(final SnapshotId snapshotId) {
+ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGenerations updatedShardGenerations) {
Map newSnapshotIds = snapshotIds.values().stream()
.filter(id -> !snapshotId.equals(id))
.collect(Collectors.toMap(SnapshotId::getUUID, Function.identity()));
@@ -216,7 +228,10 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId) {
indexSnapshots.put(indexId, set);
}
- return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots);
+ return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots,
+ ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
+ .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build()
+ );
}
/**
@@ -242,12 +257,13 @@ public boolean equals(Object obj) {
return snapshotIds.equals(that.snapshotIds)
&& snapshotStates.equals(that.snapshotStates)
&& indices.equals(that.indices)
- && indexSnapshots.equals(that.indexSnapshots);
+ && indexSnapshots.equals(that.indexSnapshots)
+ && shardGenerations.equals(that.shardGenerations);
}
@Override
public int hashCode() {
- return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots);
+ return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations);
}
/**
@@ -287,6 +303,7 @@ public List resolveNewIndices(final List indicesToResolve) {
return snapshotIndices;
}
+ private static final String SHARD_GENERATIONS = "shard_generations";
private static final String SNAPSHOTS = "snapshots";
private static final String INDICES = "indices";
private static final String INDEX_ID = "id";
@@ -297,7 +314,10 @@ public List resolveNewIndices(final List indicesToResolve) {
/**
* Writes the snapshots metadata and the related indices metadata to x-content.
*/
- public XContentBuilder snapshotsToXContent(final XContentBuilder builder) throws IOException {
+ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final boolean shouldWriteShardGens) throws IOException {
+ assert shouldWriteShardGens || shardGenerations.indices().isEmpty() :
+ "Should not build shard generations in BwC mode but saw generations [" + shardGenerations + "]";
+
builder.startObject();
// write the snapshots list
builder.startArray(SNAPSHOTS);
@@ -323,6 +343,13 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder) throws
builder.value(snapshotId.getUUID());
}
builder.endArray();
+ if (shouldWriteShardGens) {
+ builder.startArray(SHARD_GENERATIONS);
+ for (String gen : shardGenerations.getGens(indexId)) {
+ builder.value(gen);
+ }
+ builder.endArray();
+ }
builder.endObject();
}
builder.endObject();
@@ -337,6 +364,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
final Map snapshots = new HashMap<>();
final Map snapshotStates = new HashMap<>();
final Map> indexSnapshots = new HashMap<>();
+ final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
@@ -374,6 +402,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String indexName = parser.currentName();
final Set snapshotIds = new LinkedHashSet<>();
+ final List gens = new ArrayList<>();
IndexId indexId = null;
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
@@ -416,10 +445,20 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
+ " references an unknown snapshot uuid [" + uuid + "]");
}
}
+ } else if (SHARD_GENERATIONS.equals(indexMetaFieldName)) {
+ XContentParserUtils.ensureExpectedToken(
+ XContentParser.Token.START_ARRAY, parser.currentToken(), parser::getTokenLocation);
+ while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
+ gens.add(parser.textOrNull());
+ }
+
}
}
assert indexId != null;
indexSnapshots.put(indexId, snapshotIds);
+ for (int i = 0; i < gens.size(); i++) {
+ shardGenerations.put(indexId, i, gens.get(i));
+ }
}
} else {
throw new ElasticsearchParseException("unknown field name [" + field + "]");
@@ -428,7 +467,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
} else {
throw new ElasticsearchParseException("start object expected");
}
- return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots);
+ return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build());
}
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java
new file mode 100644
index 0000000000000..6351d5e2f2bf0
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.repositories;
+
+import org.elasticsearch.common.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public final class ShardGenerations {
+
+ public static final ShardGenerations EMPTY = new ShardGenerations(Collections.emptyMap());
+
+ /**
+ * Special generation that signifies that a shard is new and the repository does not yet contain a valid
+ * {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob for it.
+ */
+ public static final String NEW_SHARD_GEN = "_new";
+
+ /**
+ * Special generation that signifies that the shard has been deleted from the repository.
+ * This generation is only used during computations. It should never be written to disk.
+ */
+ public static final String DELETED_SHARD_GEN = "_deleted";
+
+ private final Map> shardGenerations;
+
+ private ShardGenerations(Map> shardGenerations) {
+ this.shardGenerations = shardGenerations;
+ }
+
+ /**
+ * Returns all indices for which shard generations are tracked.
+ *
+ * @return indices for which shard generations are tracked
+ */
+ public Collection indices() {
+ return Collections.unmodifiableSet(shardGenerations.keySet());
+ }
+
+ /**
+ * Computes the obsolete shard index generations that can be deleted once this instance was written to the repository.
+ * Note: This method should only be used when finalizing a snapshot and we can safely assume that data has only been added but not
+ * removed from shard paths.
+ *
+ * @param previous Previous {@code ShardGenerations}
+ * @return Map of obsolete shard index generations in indices that are still tracked by this instance
+ */
+ public Map> obsoleteShardGenerations(ShardGenerations previous) {
+ final Map> result = new HashMap<>();
+ previous.shardGenerations.forEach(((indexId, oldGens) -> {
+ final List updatedGenerations = shardGenerations.get(indexId);
+ final Map obsoleteShardIndices = new HashMap<>();
+ assert updatedGenerations != null
+ : "Index [" + indexId + "] present in previous shard generations, but missing from updated generations";
+ for (int i = 0; i < Math.min(oldGens.size(), updatedGenerations.size()); i++) {
+ final String oldGeneration = oldGens.get(i);
+ final String updatedGeneration = updatedGenerations.get(i);
+ // If we had a previous generation that is different from an updated generation it's obsolete
+ // Since this method assumes only additions and no removals of shards, a null updated generation means no update
+ if (updatedGeneration != null && oldGeneration != null && oldGeneration.equals(updatedGeneration) == false) {
+ obsoleteShardIndices.put(i, oldGeneration);
+ }
+ }
+ result.put(indexId, Collections.unmodifiableMap(obsoleteShardIndices));
+ }));
+ return Collections.unmodifiableMap(result);
+ }
+
+ /**
+ * Get the generation of the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob for a given index
+ * and shard.
+ * There are three special kinds of generations that can be returned here.
+ *
+ * - {@link #DELETED_SHARD_GEN} a deleted shard that isn't referenced by any snapshot in the repository any longer
+ * - {@link #NEW_SHARD_GEN} a new shard that we know doesn't hold any valid data yet in the repository
+ * - {@code null} unknown state. The shard either does not exist at all or it was created by a node older than
+ * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. If a caller expects a shard to exist in the
+ * repository but sees a {@code null} return, it should try to recover the generation by falling back to listing the contents
+ * of the respective shard directory.
+ *
+ *
+ * @param indexId IndexId
+ * @param shardId Shard Id
+ * @return generation of the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob
+ */
+ @Nullable
+ public String getShardGen(IndexId indexId, int shardId) {
+ final List generations = shardGenerations.get(indexId);
+ if (generations == null || generations.size() < shardId + 1) {
+ return null;
+ }
+ return generations.get(shardId);
+ }
+
+ public List getGens(IndexId indexId) {
+ final List existing = shardGenerations.get(indexId);
+ return existing == null ? Collections.emptyList() : Collections.unmodifiableList(existing);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ShardGenerations that = (ShardGenerations) o;
+ return shardGenerations.equals(that.shardGenerations);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(shardGenerations);
+ }
+
+ @Override
+ public String toString() {
+ return "ShardGenerations{generations:" + this.shardGenerations + "}";
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+
+ private final Map> generations = new HashMap<>();
+
+ /**
+ * Filters out all generations that don't belong to any of the supplied {@code indices} and prunes all {@link #DELETED_SHARD_GEN}
+ * entries from the builder.
+ *
+ * @param indices indices to filter for
+ * @return builder that contains only the given {@code indices} and no {@link #DELETED_SHARD_GEN} entries
+ */
+ public Builder retainIndicesAndPruneDeletes(Set indices) {
+ generations.keySet().retainAll(indices);
+ for (IndexId index : indices) {
+ final Map shards = generations.getOrDefault(index, Collections.emptyMap());
+ final Iterator> iterator = shards.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry entry = iterator.next();
+ final String generation = entry.getValue();
+ if (generation.equals(DELETED_SHARD_GEN)) {
+ iterator.remove();
+ }
+ }
+ if (shards.isEmpty()) {
+ generations.remove(index);
+ }
+ }
+ return this;
+ }
+
+ public Builder putAll(ShardGenerations shardGenerations) {
+ shardGenerations.shardGenerations.forEach((indexId, gens) -> {
+ for (int i = 0; i < gens.size(); i++) {
+ final String gen = gens.get(i);
+ if (gen != null) {
+ put(indexId, i, gens.get(i));
+ }
+ }
+ });
+ return this;
+ }
+
+ public Builder put(IndexId indexId, int shardId, String generation) {
+ generations.computeIfAbsent(indexId, i -> new HashMap<>()).put(shardId, generation);
+ return this;
+ }
+
+ public ShardGenerations build() {
+ return new ShardGenerations(generations.entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ final Set shardIds = entry.getValue().keySet();
+ assert shardIds.isEmpty() == false;
+ final int size = shardIds.stream().mapToInt(i -> i).max().getAsInt() + 1;
+ // Create a list that can hold the highest shard id as index and leave null values for shards that don't have
+ // a map entry.
+ final String[] gens = new String[size];
+ entry.getValue().forEach((shardId, generation) -> gens[shardId] = generation);
+ return Arrays.asList(gens);
+ }
+ )));
+ }
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index b1c065814a0f8..fa6794ab70bdb 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -91,11 +91,13 @@
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotCreationException;
+import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotShardFailure;
+import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.FilterInputStream;
@@ -365,7 +367,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met
}
@Override
- public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) {
+ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
} else {
@@ -375,7 +377,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map foundIndices = blobStore().blobContainer(indicesPath()).children();
- doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, listener);
+ doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, writeShardGens, listener);
} catch (Exception ex) {
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
}
@@ -396,47 +398,170 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
* @param listener Listener to invoke once finished
*/
private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map foundIndices,
- Map rootBlobs, RepositoryData repositoryData,
+ Map rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
ActionListener listener) throws IOException {
- final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
- writeIndexGen(updatedRepositoryData, repositoryStateId);
- final ActionListener afterCleanupsListener =
- new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
-
- // Run unreferenced blobs cleanup in parallel to snapshot deletion
- threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(afterCleanupsListener,
- l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null))));
-
- deleteIndices(
- updatedRepositoryData,
- repositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId),
- snapshotId,
- ActionListener.runAfter(
- ActionListener.wrap(
- deleteResults -> {
- // Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths)
- // has been updated we can execute the delete operations for all blobs that have become unreferenced as a result
- final String basePath = basePath().buildAsString();
- final int basePathLen = basePath.length();
- blobContainer().deleteBlobsIgnoringIfNotExists(
- Stream.concat(
- deleteResults.stream().flatMap(shardResult -> {
- final String shardPath =
- shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
- return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
- }),
- deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId ->
- indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID()))
- ).map(absolutePath -> {
- assert absolutePath.startsWith(basePath);
- return absolutePath.substring(basePathLen);
- }).collect(Collectors.toList()));
- },
- // Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request
- e -> logger.warn(
- () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)),
- () -> afterCleanupsListener.onResponse(null))
- );
+
+ if (writeShardGens) {
+ // First write the new shard state metadata (with the removed snapshot) and compute deletion targets
+ final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
+ writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
+ // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
+ // 1. Remove the snapshot from the list of existing snapshots
+ // 2. Update the index shard generations of all updated shard folders
+ //
+ // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
+ // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
+ // written if all shard paths have been successfully updated.
+ final StepListener writeUpdatedRepoDataStep = new StepListener<>();
+ writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> {
+ final ShardGenerations.Builder builder = ShardGenerations.builder();
+ for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
+ builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
+ }
+ final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
+ writeIndexGen(updatedRepoData, repositoryStateId, true);
+ writeUpdatedRepoDataStep.onResponse(updatedRepoData);
+ }, listener::onFailure);
+ // Once we have updated the repository, run the clean-ups
+ writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
+ // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
+ final ActionListener afterCleanupsListener =
+ new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
+ asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
+ asyncCleanupUnlinkedShardLevelBlobs(snapshotId, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener);
+ }, listener::onFailure);
+ } else {
+ // Write the new repository data first (with the removed snapshot), using no shard generations
+ final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
+ writeIndexGen(updatedRepoData, repositoryStateId, false);
+ // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
+ final ActionListener afterCleanupsListener =
+ new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
+ asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
+ final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>();
+ writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep);
+ writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
+ asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure);
+ }
+ }
+
+ private void asyncCleanupUnlinkedRootAndIndicesBlobs(Map foundIndices, Map rootBlobs,
+ RepositoryData updatedRepoData, ActionListener listener) {
+ threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
+ listener,
+ l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
+ }
+
+ private void asyncCleanupUnlinkedShardLevelBlobs(SnapshotId snapshotId, Collection deleteResults,
+ ActionListener listener) {
+ threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
+ listener,
+ l -> {
+ try {
+ blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults));
+ l.onResponse(null);
+ } catch (Exception e) {
+ logger.warn(
+ () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId),
+ e);
+ throw e;
+ }
+ }));
+ }
+
+ // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
+ private void writeUpdatedShardMetaDataAndComputeDeletes(SnapshotId snapshotId, RepositoryData oldRepositoryData,
+ boolean useUUIDs, ActionListener> onAllShardsCompleted) {
+
+ final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
+ final List indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId);
+
+ if (indices.isEmpty()) {
+ onAllShardsCompleted.onResponse(Collections.emptyList());
+ return;
+ }
+
+ // Listener that flattens out the delete results for each index
+ final ActionListener> deleteIndexMetaDataListener = new GroupedActionListener<>(
+ ActionListener.map(onAllShardsCompleted,
+ res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
+
+ for (IndexId indexId : indices) {
+ final Set survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream()
+ .filter(id -> id.equals(snapshotId) == false).collect(Collectors.toSet());
+ executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener, deleteIdxMetaListener -> {
+ final IndexMetaData indexMetaData;
+ try {
+ indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
+ } catch (Exception ex) {
+ logger.warn(() ->
+ new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
+ // Just invoke the listener without any shard generations to count it down, this index will be cleaned up
+ // by the stale data cleanup in the end.
+ // TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just ignoring
+ // it and letting the cleanup deal with it.
+ deleteIdxMetaListener.onResponse(null);
+ return;
+ }
+ final int shardCount = indexMetaData.getNumberOfShards();
+ assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
+ // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
+ final ActionListener allShardsListener =
+ new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
+ final Index index = indexMetaData.getIndex();
+ for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
+ final ShardId shard = new ShardId(index, shardId);
+ executor.execute(new AbstractRunnable() {
+ @Override
+ protected void doRun() throws Exception {
+ final BlobContainer shardContainer = shardContainer(indexId, shard);
+ final Set blobs = getShardBlobs(shard, shardContainer);
+ final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
+ final String newGen;
+ if (useUUIDs) {
+ newGen = UUIDs.randomBase64UUID();
+ blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(blobs, shardContainer,
+ oldRepositoryData.shardGenerations().getShardGen(indexId, shard.getId())).v1();
+ } else {
+ Tuple tuple =
+ buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
+ newGen = Long.toString(tuple.v2() + 1);
+ blobStoreIndexShardSnapshots = tuple.v1();
+ }
+ allShardsListener.onResponse(deleteFromShardSnapshotMeta(survivingSnapshots, indexId, shard, snapshotId,
+ shardContainer, blobs, blobStoreIndexShardSnapshots, newGen));
+ }
+
+ @Override
+ public void onFailure(Exception ex) {
+ logger.warn(
+ () -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
+ snapshotId, indexId.getName(), shard.id()), ex);
+ // Just passing null here to count down the listener instead of failing it, the stale data left behind
+ // here will be retried in the next delete or repository cleanup
+ allShardsListener.onResponse(null);
+ }
+ });
+ }
+ }));
+ }
+ }
+
+ private List resolveFilesToDelete(SnapshotId snapshotId, Collection deleteResults) {
+ final String basePath = basePath().buildAsString();
+ final int basePathLen = basePath.length();
+ return Stream.concat(
+ deleteResults.stream().flatMap(shardResult -> {
+ final String shardPath =
+ shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
+ return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
+ }),
+ deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId ->
+ indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID()))
+ ).map(absolutePath -> {
+ assert absolutePath.startsWith(basePath);
+ return absolutePath.substring(basePathLen);
+ }).collect(Collectors.toList());
}
/**
@@ -478,9 +603,10 @@ private void cleanupStaleBlobs(Map foundIndices, MapDeleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
*
* @param repositoryStateId Current repository state id
+ * @param writeShardGens If shard generations should be written to the repository
* @param listener Listener to complete when done
*/
- public void cleanup(long repositoryStateId, ActionListener listener) {
+ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListener listener) {
try {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
@@ -502,7 +628,7 @@ public void cleanup(long repositoryStateId, ActionListener foundIndices
return deleteResult;
}
- /**
- * @param repositoryData RepositoryData with the snapshot removed
- * @param indices Indices to remove the snapshot from (should not contain indices that become completely unreferenced with the
- * removal of this snapshot as those are cleaned up afterwards by {@link #cleanupStaleBlobs})
- * @param snapshotId SnapshotId to remove from all the given indices
- * @param listener Listener to invoke when finished
- */
- private void deleteIndices(RepositoryData repositoryData, List indices, SnapshotId snapshotId,
- ActionListener> listener) {
-
- if (indices.isEmpty()) {
- listener.onResponse(Collections.emptyList());
- return;
- }
-
- // Listener that flattens out the delete results for each index
- final ActionListener> deleteIndexMetaDataListener = new GroupedActionListener<>(
- ActionListener.map(listener, res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
- final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
- for (IndexId indexId : indices) {
- executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener,
- deleteIdxMetaListener -> {
- final IndexMetaData indexMetaData;
- try {
- indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
- } catch (Exception ex) {
- logger.warn(() ->
- new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
- // Just invoke the listener without any shard generations to count it down, this index will be cleaned up
- // by the stale data cleanup in the end.
- deleteIdxMetaListener.onResponse(null);
- return;
- }
- final int shardCount = indexMetaData.getNumberOfShards();
- assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
- // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
- final ActionListener allShardsListener =
- new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
- final Index index = indexMetaData.getIndex();
- for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
- final ShardId shard = new ShardId(index, shardId);
- executor.execute(new AbstractRunnable() {
- @Override
- protected void doRun() throws Exception {
- allShardsListener.onResponse(
- deleteShardSnapshot(repositoryData, indexId, shard, snapshotId));
- }
-
- @Override
- public void onFailure(Exception ex) {
- logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
- snapshotId, indexId.getName(), shard.id()), ex);
- // Just passing null here to count down the listener instead of failing it, the stale data left behind
- // here will be retried in the next delete or repository cleanup
- allShardsListener.onResponse(null);
- }
- });
- }
- }));
- }
- }
-
@Override
public void finalizeSnapshot(final SnapshotId snapshotId,
- final List indices,
+ final ShardGenerations shardGenerations,
final long startTime,
final String failure,
final int totalShards,
@@ -659,15 +723,25 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
final boolean includeGlobalState,
final MetaData clusterMetaData,
final Map userMetadata,
+ boolean writeShardGens,
final ActionListener listener) {
- // We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob
- // Once we're done writing all metadata, we update the index-N blob to finalize the snapshot
+ final Collection indices = shardGenerations.indices();
+ // Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard
+ // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
+ // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
+ // when writing the index-${N} to each shard directory.
final ActionListener allMetaListener = new GroupedActionListener<>(
ActionListener.wrap(snapshotInfos -> {
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
- writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId);
+ final RepositoryData existingRepositoryData = getRepositoryData();
+ final RepositoryData updatedRepositoryData =
+ existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
+ writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens);
+ if (writeShardGens) {
+ cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
+ }
listener.onResponse(snapshotInfo);
},
e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e))),
@@ -700,6 +774,20 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
}));
}
+ // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
+ private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) {
+ final List toDelete = new ArrayList<>();
+ final int prefixPathLen = basePath().buildAsString().length();
+ updatedRepositoryData.shardGenerations().obsoleteShardGenerations(existingRepositoryData.shardGenerations()).forEach(
+ (indexId, gens) -> gens.forEach((shardId, oldGen) -> toDelete.add(
+ shardContainer(indexId, shardId).path().buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen)));
+ try {
+ blobContainer().deleteBlobsIgnoringIfNotExists(toDelete);
+ } catch (Exception e) {
+ logger.warn("Failed to clean up old shard generation blobs", e);
+ }
+ }
+
@Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try {
@@ -860,7 +948,8 @@ public boolean isReadOnly() {
return readOnly;
}
- protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen) throws IOException {
+ protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen,
+ final boolean writeShardGens) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
@@ -874,7 +963,8 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
- writeAtomic(indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder())), true);
+ writeAtomic(indexBlob,
+ BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
@@ -962,7 +1052,8 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
- IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+ ActionListener listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
final ActionListener snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> {
@@ -970,19 +1061,23 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e));
});
try {
- logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
-
+ final String generation = snapshotStatus.generation();
+ logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(indexId, shardId);
final Set blobs;
- try {
- blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet();
- } catch (IOException e) {
- throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
+ if (generation == null) {
+ try {
+ blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet();
+ } catch (IOException e) {
+ throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
+ }
+ } else {
+ blobs = Collections.singleton(INDEX_FILE_PREFIX + generation);
}
- Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
+ Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer, generation);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
- long fileListGeneration = tuple.v2();
+ String fileListGeneration = tuple.v2();
if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) {
throw new IndexShardSnapshotFailedException(shardId,
@@ -1080,27 +1175,34 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
- final String indexGeneration = Long.toString(fileListGeneration + 1);
final List blobsToDelete;
- try {
- final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
- indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
+ final String indexGeneration;
+ if (writeShardGens) {
+ indexGeneration = UUIDs.randomBase64UUID();
+ blobsToDelete = Collections.emptyList();
+ } else {
+ indexGeneration = Long.toString(Long.parseLong(fileListGeneration) + 1);
// Delete all previous index-N blobs
blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
.max().orElse(-1L) < Long.parseLong(indexGeneration)
: "Tried to delete an index-N blob newer than the current generation [" + indexGeneration
+ "] when deleting index-N blobs " + blobsToDelete;
+ }
+ try {
+ writeShardIndexBlob(shardContainer, indexGeneration, new BlobStoreIndexShardSnapshots(newSnapshotsList));
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId,
"Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
+ indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
}
- try {
- shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
- } catch (IOException e) {
- logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
- snapshotId, shardId), e);
+ if (writeShardGens == false) {
+ try {
+ shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
+ } catch (IOException e) {
+ logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
+ snapshotId, shardId), e);
+ }
}
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration);
snapshotDoneListener.onResponse(indexGeneration);
@@ -1257,45 +1359,30 @@ public String toString() {
}
/**
- * Delete shard snapshot
+ * Delete snapshot from shard level metadata.
*/
- private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId,
- SnapshotId snapshotId) throws IOException {
- final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId);
- final Set blobs;
- try {
- blobs = shardContainer.listBlobs().keySet();
- } catch (IOException e) {
- throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e);
- }
-
- Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
- BlobStoreIndexShardSnapshots snapshots = tuple.v1();
- long fileListGeneration = tuple.v2();
-
+ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(Set survivingSnapshots, IndexId indexId,
+ ShardId snapshotShardId, SnapshotId snapshotId,
+ BlobContainer shardContainer, Set blobs,
+ BlobStoreIndexShardSnapshots snapshots, String indexGeneration) {
// Build a list of snapshots that should be preserved
List newSnapshotsList = new ArrayList<>();
- final Set survivingSnapshotNames =
- repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getName).collect(Collectors.toSet());
+ final Set survivingSnapshotNames = survivingSnapshots.stream().map(SnapshotId::getName).collect(Collectors.toSet());
for (SnapshotFiles point : snapshots) {
if (survivingSnapshotNames.contains(point.snapshot())) {
newSnapshotsList.add(point);
}
}
- final String indexGeneration = Long.toString(fileListGeneration + 1);
try {
- final List blobsToDelete;
if (newSnapshotsList.isEmpty()) {
- // If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
- blobsToDelete = new ArrayList<>(blobs);
+ return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), ShardGenerations.DELETED_SHARD_GEN, blobs);
} else {
- final Set survivingSnapshotUUIDs = repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getUUID)
- .collect(Collectors.toSet());
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
- indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
- blobsToDelete = unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots);
+ writeShardIndexBlob(shardContainer, indexGeneration, updatedSnapshots);
+ final Set survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
+ return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), indexGeneration,
+ unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots));
}
- return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), blobsToDelete);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(snapshotShardId,
"Failed to finalize snapshot deletion [" + snapshotId + "] with shard index ["
@@ -1303,6 +1390,23 @@ private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData reposit
}
}
+ private void writeShardIndexBlob(BlobContainer shardContainer, String indexGeneration,
+ BlobStoreIndexShardSnapshots updatedSnapshots) throws IOException {
+ assert ShardGenerations.NEW_SHARD_GEN.equals(indexGeneration) == false;
+ assert ShardGenerations.DELETED_SHARD_GEN.equals(indexGeneration) == false;
+ indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
+ }
+
+ private static Set getShardBlobs(final ShardId snapshotShardId, final BlobContainer shardContainer) {
+ final Set blobs;
+ try {
+ blobs = shardContainer.listBlobs().keySet();
+ } catch (IOException e) {
+ throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e);
+ }
+ return blobs;
+ }
+
// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
// temporary blobs
private static List unusedBlobs(Set blobs, Set survivingSnapshotUUIDs,
@@ -1316,7 +1420,6 @@ private static List unusedBlobs(Set blobs, Set surviving
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
}
-
/**
* Loads information about shard snapshot
*/
@@ -1331,6 +1434,29 @@ private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContain
}
}
+ /**
+ * Loads all available snapshots in the repository using the given {@code generation} or falling back to trying to determine it from
+ * the given list of blobs in the shard container.
+ *
+ * @param blobs list of blobs in repository
+ * @param generation shard generation or {@code null} in case there was no shard generation tracked in the {@link RepositoryData} for
+ * this shard because its snapshot was created in a version older than
+ * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
+ * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation
+ */
+ private Tuple buildBlobStoreIndexShardSnapshots(Set blobs,
+ BlobContainer shardContainer,
+ @Nullable String generation) throws IOException {
+ if (generation != null) {
+ if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
+ return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
+ }
+ return new Tuple<>(indexShardSnapshotsFormat.read(shardContainer, generation), generation);
+ }
+ final Tuple legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
+ return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
+ }
+
/**
* Loads all available snapshots in the repository
*
@@ -1419,12 +1545,16 @@ private static final class ShardSnapshotMetaDeleteResult {
// Shard id that the snapshot was removed from
private final int shardId;
+ // Id of the new index-${uuid} blob that does not include the snapshot any more
+ private final String newGeneration;
+
// Blob names in the shard directory that have become unreferenced in the new shard generation
private final Collection blobsToDelete;
- ShardSnapshotMetaDeleteResult(IndexId indexId, int shardId, Collection blobsToDelete) {
+ ShardSnapshotMetaDeleteResult(IndexId indexId, int shardId, String newGeneration, Collection blobsToDelete) {
this.indexId = indexId;
this.shardId = shardId;
+ this.newGeneration = newGeneration;
this.blobsToDelete = blobsToDelete;
}
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
index 5cc98f6c3e99b..0b72670a9bc37 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java
@@ -70,10 +70,12 @@
* | | |- snap-20131011.dat - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} for
* | | | snapshot "20131011"
* | | |- index-123 - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} for
- * | | | the shard
+ * | | | the shard (files with numeric suffixes were created by older versions, newer ES versions use a uuid
+ * | | | suffix instead)
* | |
* | |- 1/ - data for shard "1" of index "foo"
* | | |- __1
+ * | | |- index-Zc2SS8ZgR8JvZAHlSMyMXy - SMILE serialized {@code BlobStoreIndexShardSnapshots} for the shard
* | | .....
* | |
* | |-2/
@@ -146,8 +148,9 @@
*
*
* - Create the {@link org.apache.lucene.index.IndexCommit} for the shard to snapshot.
- * - List all blobs in the shard's path. Find the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob
- * with name {@code index-${N}} for the highest possible value of {@code N} in the list to get the information of what segment files are
+ *
- Get the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob
+ * with name {@code index-${uuid}} with the {@code uuid} generation returned by
+ * {@link org.elasticsearch.repositories.ShardGenerations#getShardGen} to get the information of what segment files are
* already available in the blobstore.
* - By comparing the files in the {@code IndexCommit} and the available file list from the previous step, determine the segment files
* that need to be written to the blob store. For each segment that needs to be added to the blob store, generate a unique name by combining
@@ -157,7 +160,7 @@
* the shard's path and contains a list of all the files referenced by the snapshot as well as some metadata about the snapshot. See the
* documentation of {@code BlobStoreIndexShardSnapshot} for details on its contents.
* - Once all the segments and the {@code BlobStoreIndexShardSnapshot} blob have been written, an updated
- * {@code BlobStoreIndexShardSnapshots} blob is written to the shard's path with name {@code index-${N+1}}.
+ * {@code BlobStoreIndexShardSnapshots} blob is written to the shard's path with name {@code index-${newUUID}}.
*
*
* Finalizing the Snapshot
@@ -185,11 +188,6 @@
*
*
* - Get the current {@code RepositoryData} from the latest {@code index-N} blob at the repository root.
- * - Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the
- * repository root.
- * - Write an updated {@code index.latest} blob containing {@code N + 1}.
- * - Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot
- * as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.
* - For each index referenced by the snapshot:
*
* - Delete the snapshot's {@code IndexMetaData} at {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}}.
@@ -198,16 +196,22 @@
* - Remove the {@code BlobStoreIndexShardSnapshot} blob at {@code /indices/${index-snapshot-uuid}/${i}/snap-${snapshot-uuid}.dat}.
* - List all blobs in the shard path {@code /indices/${index-snapshot-uuid}} and build a new {@code BlobStoreIndexShardSnapshots} from
* the remaining {@code BlobStoreIndexShardSnapshot} blobs in the shard. Afterwards, write it to the next shard generation blob at
- * {@code /indices/${index-snapshot-uuid}/${i}/index-${N+1}} (The shard's generation is determined from the list of {@code index-N} blobs
- * in the shard directory).
- * - Delete all segment blobs (identified by having the data blob prefix {@code __}) in the shard directory which are not referenced by
- * the new {@code BlobStoreIndexShardSnapshots} that has been written in the previous step.
+ * {@code /indices/${index-snapshot-uuid}/${i}/index-${uuid}} (The shard's generation is determined from the map of shard generations in
+ * the {@link org.elasticsearch.repositories.RepositoryData} in the root {@code index-${N}} blob of the repository.
+ * - Collect all segment blobs (identified by having the data blob prefix {@code __}) in the shard directory which are not referenced by
+ * the new {@code BlobStoreIndexShardSnapshots} that has been written in the previous step as well as the previous index-${uuid}
+ * blob so that it can be deleted at the end of the snapshot delete process.
*
*
+ * Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the
+ * repository root and the repository generations that were changed in the affected shards adjusted.
+ * Write an updated {@code index.latest} blob containing {@code N + 1}.
+ * Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot
+ * as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.
+ * Delete all unreferenced blobs previously collected when updating the shard directories. Also, remove any index folders or blobs
+ * under the repository root that are not referenced by the new {@code RepositoryData} written in the previous step.
*
*
*
- * TODO: The above sequence of actions can lead to leaking files when an index completely goes out of scope. Adjust this documentation once
- * https://github.com/elastic/elasticsearch/issues/13159 is fixed.
*/
package org.elasticsearch.repositories.blobstore;
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
index ec9d4d7394df0..f940cdae01441 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
@@ -280,7 +280,9 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() {
+ assert entry.useShardGenerations() || snapshotStatus.generation() == null :
+ "Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility";
+ snapshot(shardId, snapshot, indexId, snapshotStatus, entry.useShardGenerations(), new ActionListener() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
@@ -296,7 +298,7 @@ public void onResponse(String newGeneration) {
@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
- notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(e));
+ notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.stackTrace(e));
}
});
}
@@ -310,7 +312,7 @@ public void onFailure(Exception e) {
* @param snapshotStatus snapshot status
*/
private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId,
- final IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ final IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener listener) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
if (indexShard.routingEntry().primary() == false) {
@@ -333,7 +335,7 @@ private void snapshot(final ShardId shardId, final Snapshot snapshot, final Inde
// we flush first to make sure we get the latest writes snapshotted
snapshotRef = indexShard.acquireLastIndexCommit(true);
repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId,
- snapshotRef.getIndexCommit(), snapshotStatus, ActionListener.runBefore(listener, snapshotRef::close));
+ snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, ActionListener.runBefore(listener, snapshotRef::close));
} catch (Exception e) {
IOUtils.close(snapshotRef);
throw e;
@@ -391,8 +393,6 @@ public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequ
private ShardId shardId;
private ShardSnapshotStatus status;
- public UpdateIndexShardSnapshotStatusRequest() {}
-
public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException {
super(in);
snapshot = new Snapshot(in);
@@ -453,7 +453,7 @@ private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId sh
}
/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
- void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) {
+ private void sendSnapshotShardUpdate(final Snapshot snapshot, final ShardId shardId, final ShardSnapshotStatus status) {
remoteFailedRequestDeduplicator.executeOnce(
new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status),
new ActionListener() {
@@ -519,7 +519,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}
- private class SnapshotStateExecutor implements ClusterStateTaskExecutor {
+ private static class SnapshotStateExecutor implements ClusterStateTaskExecutor {
@Override
public ClusterTasksResult
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
index c94600e0e80af..bfc3319ada15f 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
@@ -71,6 +71,7 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryMissingException;
+import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@@ -291,15 +292,16 @@ public ClusterState execute(ClusterState currentState) {
request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
List snapshotIndices = repositoryData.resolveNewIndices(indices);
- newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId),
- request.includeGlobalState(),
- request.partial(),
- State.INIT,
- snapshotIndices,
- threadPool.absoluteTimeInMillis(),
- repositoryData.getGenId(),
- null,
- request.userMetadata());
+ newSnapshot = new SnapshotsInProgress.Entry(
+ new Snapshot(repositoryName, snapshotId),
+ request.includeGlobalState(), request.partial(),
+ State.INIT,
+ snapshotIndices,
+ threadPool.absoluteTimeInMillis(),
+ repositoryData.getGenId(),
+ null,
+ request.userMetadata(),
+ clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
@@ -456,8 +458,7 @@ public ClusterState execute(ClusterState currentState) {
hadAbortedInitializations = true;
} else {
// Replace the snapshot that was just initialized
- ImmutableOpenMap shards =
- shards(currentState, entry.indices());
+ ImmutableOpenMap shards = shards(currentState, entry, repositoryData);
if (!partial) {
Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards,
currentState.metaData());
@@ -569,7 +570,7 @@ private void cleanupAfterError(Exception exception) {
if (snapshotCreated) {
repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
- snapshot.indices(),
+ buildGenerations(snapshot),
snapshot.startTime(),
ExceptionsHelper.stackTrace(exception),
0,
@@ -577,7 +578,9 @@ private void cleanupAfterError(Exception exception) {
snapshot.getRepositoryStateId(),
snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
- snapshot.userMetadata(), ActionListener.runAfter(ActionListener.wrap(ignored -> {
+ snapshot.userMetadata(),
+ snapshot.useShardGenerations(),
+ ActionListener.runAfter(ActionListener.wrap(ignored -> {
}, inner -> {
inner.addSuppressed(exception);
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository",
@@ -590,6 +593,14 @@ private void cleanupAfterError(Exception exception) {
}
}
+ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) {
+ ShardGenerations.Builder builder = ShardGenerations.builder();
+ final Map indexLookup = new HashMap<>();
+ snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
+ snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation()));
+ return builder.build();
+ }
+
private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
if (snapshot.includeGlobalState() == false) {
// Remove global state from the cluster state
@@ -773,7 +784,8 @@ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterState state) {
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0);
- deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId());
+ deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId(),
+ state.nodes().getMinNodeVersion());
}
}
@@ -1022,7 +1034,7 @@ protected void doRun() {
}
repository.finalizeSnapshot(
snapshot.getSnapshotId(),
- entry.indices(),
+ buildGenerations(entry),
entry.startTime(),
failure,
entry.shards().size(),
@@ -1030,7 +1042,9 @@ protected void doRun() {
entry.getRepositoryStateId(),
entry.includeGlobalState(),
metaDataForSnapshot(entry, metaData),
- entry.userMetadata(), ActionListener.wrap(snapshotInfo -> {
+ entry.userMetadata(),
+ entry.useShardGenerations(),
+ ActionListener.wrap(snapshotInfo -> {
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
}, this::onFailure));
@@ -1312,7 +1326,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
));
} else {
logger.debug("deleted snapshot is not running - deleting files");
- deleteSnapshotFromRepository(snapshot, listener, repositoryStateId);
+ deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
}
}
});
@@ -1351,15 +1365,18 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi
* @param snapshot snapshot
* @param listener listener
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
+ * @param version minimum ES version the repository should be readable by
*/
- private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) {
+ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId,
+ Version version) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
Repository repository = repositoriesService.repository(snapshot.getRepository());
- repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> {
- logger.info("snapshot [{}] deleted", snapshot);
- removeSnapshotDeletionFromClusterState(snapshot, null, l);
- }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
- ));
+ repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION),
+ ActionListener.wrap(v -> {
+ logger.info("snapshot [{}] deleted", snapshot);
+ removeSnapshotDeletionFromClusterState(snapshot, null, l);
+ }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
+ ));
}));
}
@@ -1412,38 +1429,59 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
* Calculates the list of shards that should be included into the current snapshot
*
* @param clusterState cluster state
- * @param indices list of indices to be snapshotted
+ * @param snapshot SnapshotsInProgress Entry
* @return list of shard to be included into current snapshot
*/
private static ImmutableOpenMap shards(ClusterState clusterState,
- List indices) {
+ SnapshotsInProgress.Entry snapshot,
+ RepositoryData repositoryData) {
ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder();
MetaData metaData = clusterState.metaData();
- for (IndexId index : indices) {
+ final ShardGenerations shardGenerations = repositoryData.shardGenerations();
+ for (IndexId index : snapshot.indices()) {
final String indexName = index.getName();
+ final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false;
IndexMetaData indexMetaData = metaData.index(indexName);
if (indexMetaData == null) {
// The index was deleted before we managed to start the snapshot - mark it as missing.
- builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), missingStatus(null, "missing index"));
+ builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0),
+ new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index", null));
} else {
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
+ final String shardRepoGeneration;
+ if (snapshot.useShardGenerations()) {
+ if (isNewIndex) {
+ assert shardGenerations.getShardGen(index, shardId.getId()) == null
+ : "Found shard generation for new index [" + index + "]";
+ shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN;
+ } else {
+ shardRepoGeneration = shardGenerations.getShardGen(index, shardId.getId());
+ }
+ } else {
+ shardRepoGeneration = null;
+ }
if (indexRoutingTable != null) {
ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
if (primary == null || !primary.assignedToNode()) {
- builder.put(shardId, missingStatus(null, "primary shard is not allocated"));
+ builder.put(shardId,
+ new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated",
+ shardRepoGeneration));
} else if (primary.relocating() || primary.initializing()) {
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(
- primary.currentNodeId(), ShardState.WAITING, null));
+ primary.currentNodeId(), ShardState.WAITING, shardRepoGeneration));
} else if (!primary.started()) {
- builder.put(shardId, missingStatus(primary.currentNodeId(), "primary shard hasn't been started yet"));
+ builder.put(shardId,
+ new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
+ "primary shard hasn't been started yet", shardRepoGeneration));
} else {
- builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(
- primary.currentNodeId(), null));
+ builder.put(shardId,
+ new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration));
}
} else {
- builder.put(shardId, missingStatus(null, "missing routing table"));
+ builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING,
+ "missing routing table", shardRepoGeneration));
}
}
}
@@ -1452,10 +1490,6 @@ private static ImmutableOpenMap> waitingIndices = entry.waitingIndices();
assertEquals(2, waitingIndices.get(idx1Name).size());
diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java
index 70d1ef34b092b..b77653c34c769 100644
--- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java
@@ -62,7 +62,7 @@ public void testDeleteSnapshotting() {
SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false,
SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")),
System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(),
- SnapshotInfoTests.randomUserMetadata()));
+ SnapshotInfoTests.randomUserMetadata(), randomBoolean()));
ClusterState state = ClusterState.builder(clusterState(index))
.putCustom(SnapshotsInProgress.TYPE, snaps)
.build();
diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java
index 244bdcc21ac5b..7eb58d80fecfd 100644
--- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java
@@ -472,7 +472,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh
final SnapshotsInProgress.Entry entry =
new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build(),
- SnapshotInfoTests.randomUserMetadata());
+ SnapshotInfoTests.randomUserMetadata(), randomBoolean());
return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build();
}
diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
index 0052cef1a3fe2..8c80f741dff96 100644
--- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java
@@ -159,15 +159,15 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met
}
@Override
- public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure,
+ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure,
int totalShards, List shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map userMetadata,
- ActionListener listener) {
+ boolean writeShardGens, ActionListener listener) {
listener.onResponse(null);
}
@Override
- public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) {
+ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) {
listener.onResponse(null);
}
@@ -203,7 +203,8 @@ public boolean isReadOnly() {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
- snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) {
+ snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
+ ActionListener listener) {
}
diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java
index d938010fcb537..f0923b9804837 100644
--- a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java
@@ -72,7 +72,7 @@ public void testIndicesToUpdateAfterRemovingSnapshot() {
public void testXContent() throws IOException {
RepositoryData repositoryData = generateRandomRepoData();
XContentBuilder builder = JsonXContent.contentBuilder();
- repositoryData.snapshotsToXContent(builder);
+ repositoryData.snapshotsToXContent(builder, true);
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
long gen = (long) randomIntBetween(0, 500);
RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen);
@@ -90,18 +90,22 @@ public void testAddSnapshots() {
List indices = new ArrayList<>();
Set