From 128809cfed530f45b11f80d08bb16bd99335c2f1 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 19 Sep 2019 13:08:06 +0200 Subject: [PATCH 1/2] Track Shard Snapshot Generation in CS Adds communication of new shard generations from datanodes to master and tracking of those generations in the CS. This is a preliminary to #46250 --- .../cluster/SnapshotsInProgress.java | 37 +++++++++++++++---- .../snapshots/IndexShardSnapshotStatus.java | 23 ++++++++---- .../repositories/FilterRepository.java | 2 +- .../repositories/Repository.java | 2 +- .../blobstore/BlobStoreRepository.java | 12 +++--- .../snapshots/SnapshotShardsService.java | 24 +++++++----- .../snapshots/SnapshotsService.java | 29 ++++++++++----- .../cluster/SnapshotsInProgressTests.java | 12 +++--- .../MetaDataIndexStateServiceTests.java | 2 +- .../RepositoriesServiceTests.java | 2 +- .../BlobStoreRepositoryRestoreTests.java | 3 +- .../repositories/fs/FsRepositoryTests.java | 9 +++-- .../SharedClusterSnapshotRestoreIT.java | 6 +-- ...SnapshotsInProgressSerializationTests.java | 2 +- .../index/shard/IndexShardTestCase.java | 19 +++++----- .../index/shard/RestoreOnlyRepository.java | 2 +- .../xpack/ccr/repository/CcrRepository.java | 2 +- .../SourceOnlySnapshotRepository.java | 2 +- .../SourceOnlySnapshotShardTests.java | 27 +++++++------- 19 files changed, 132 insertions(+), 85 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 3ce8615c47630..8a112950204c6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotsService; import java.io.IOException; import java.util.ArrayList; @@ -249,20 +250,26 @@ public static boolean completed(ObjectContainer shards) { public static class ShardSnapshotStatus { private final ShardState state; private final String nodeId; + + @Nullable + private final String generation; + + @Nullable private final String reason; - public ShardSnapshotStatus(String nodeId) { - this(nodeId, ShardState.INIT); + public ShardSnapshotStatus(String nodeId, String generation) { + this(nodeId, ShardState.INIT, generation); } - public ShardSnapshotStatus(String nodeId, ShardState state) { - this(nodeId, state, null); + public ShardSnapshotStatus(String nodeId, ShardState state, String generation) { + this(nodeId, state, null, generation); } - public ShardSnapshotStatus(String nodeId, ShardState state, String reason) { + public ShardSnapshotStatus(String nodeId, ShardState state, String reason, String generation) { this.nodeId = nodeId; this.state = state; this.reason = reason; + this.generation = generation; // If the state is failed we have to have a reason for this failure assert state.failed() == false || reason != null; } @@ -270,6 +277,12 @@ public ShardSnapshotStatus(String nodeId, ShardState state, String reason) { public ShardSnapshotStatus(StreamInput in) throws IOException { nodeId = in.readOptionalString(); state = ShardState.fromValue(in.readByte()); + if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + generation = in.readOptionalString(); + assert generation != null || state != ShardState.SUCCESS : "Received null generation for shard state [" + state + "]"; + } else { + generation = null; + } reason = in.readOptionalString(); } @@ -281,6 +294,10 @@ public String nodeId() { return nodeId; } + public String generation() { + return this.generation; + } + public String reason() { return reason; } @@ -288,6 +305,9 @@ public String reason() { public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(nodeId); out.writeByte(state.value); + if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + out.writeOptionalString(generation); + } out.writeOptionalString(reason); } @@ -296,8 +316,8 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ShardSnapshotStatus status = (ShardSnapshotStatus) o; - return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state; - + return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) + && Objects.equals(generation, status.generation) && state == status.state; } @Override @@ -305,12 +325,13 @@ public int hashCode() { int result = state != null ? state.hashCode() : 0; result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); result = 31 * result + (reason != null ? reason.hashCode() : 0); + result = 31 * result + (generation != null ? generation.hashCode() : 0); return result; } @Override public String toString() { - return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]"; + return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]"; } } diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index a558051c58d7e..fef11fae69588 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -58,6 +58,7 @@ public enum Stage { } private final AtomicReference stage; + private final AtomicReference generation; private long startTime; private long totalTime; private int incrementalFileCount; @@ -71,8 +72,10 @@ public enum Stage { private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime, final int incrementalFileCount, final int totalFileCount, final int processedFileCount, - final long incrementalSize, final long totalSize, final long processedSize, final String failure) { + final long incrementalSize, final long totalSize, final long processedSize, final String failure, + final String generation) { this.stage = new AtomicReference<>(Objects.requireNonNull(stage)); + this.generation = new AtomicReference<>(generation); this.startTime = startTime; this.totalTime = totalTime; this.incrementalFileCount = incrementalFileCount; @@ -109,13 +112,15 @@ public synchronized Copy moveToFinalize(final long indexVersion) { return asCopy(); } - public synchronized Copy moveToDone(final long endTime) { + public synchronized Copy moveToDone(final long endTime, final String newGeneration) { + assert newGeneration != null; if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) { this.totalTime = Math.max(0L, endTime - startTime); } else { throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " + "expecting [FINALIZE] but got [" + stage.get() + "]"); } + this.generation.set(newGeneration); return asCopy(); } @@ -133,6 +138,10 @@ public synchronized void moveToFailed(final long endTime, final String failure) } } + public String generation() { + return generation.get(); + } + public boolean isAborted() { return stage.get() == Stage.ABORTED; } @@ -158,8 +167,8 @@ public synchronized IndexShardSnapshotStatus.Copy asCopy() { indexVersion, failure); } - public static IndexShardSnapshotStatus newInitializing() { - return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null); + public static IndexShardSnapshotStatus newInitializing(String generation) { + return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, null, generation); } public static IndexShardSnapshotStatus newFailed(final String failure) { @@ -167,15 +176,15 @@ public static IndexShardSnapshotStatus newFailed(final String failure) { if (failure == null) { throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus"); } - return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure); + return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, failure, null); } public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime, final int incrementalFileCount, final int fileCount, - final long incrementalSize, final long size) { + final long incrementalSize, final long size, String generation) { // The snapshot is done which means the number of processed files is the same as total return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, incrementalFileCount, fileCount, incrementalFileCount, - incrementalSize, size, incrementalSize, null); + incrementalSize, size, incrementalSize, null, generation); } /** diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 6d9cba05748ea..07acd55cea03c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -123,7 +123,7 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index a02975e120b37..7eb2196884bb2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -209,7 +209,7 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long * @param listener listener invoked on completion */ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus, ActionListener listener); + IndexShardSnapshotStatus snapshotStatus, ActionListener listener); /** * Restores snapshot of the shard. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index f8bdca75c122c..796959b4fdbaa 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -477,7 +477,7 @@ private void cleanupStaleBlobs(Map foundIndices, MapDeleting unreferenced root level blobs {@link #cleanupStaleRootFiles} * * @param repositoryStateId Current repository state id - * @param listener Lister to complete when done + * @param listener Listener to complete when done */ public void cleanup(long repositoryStateId, ActionListener listener) { try { @@ -942,10 +942,10 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); - final StepListener snapshotDoneListener = new StepListener<>(); + final StepListener snapshotDoneListener = new StepListener<>(); snapshotDoneListener.whenComplete(listener::onResponse, e -> { snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e)); listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e @@ -1084,8 +1084,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization", snapshotId, shardId), e); } - snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis()); - snapshotDoneListener.onResponse(null); + snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration); + snapshotDoneListener.onResponse(indexGeneration); }, snapshotDoneListener::onFailure); if (indexIncrementalFileCount == 0) { allFilesUploadedListener.onResponse(Collections.emptyList()); @@ -1153,7 +1153,7 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId); return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(), snapshot.incrementalFileCount(), snapshot.totalFileCount(), - snapshot.incrementalSize(), snapshot.totalSize()); + snapshot.incrementalSize(), snapshot.totalSize(), null); // Not adding a real generation here as it doesn't matter to callers } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index c6f63130b7f7f..b9d0fdd599649 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -244,7 +244,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { if (startedShards == null) { startedShards = new HashMap<>(); } - startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing()); + startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing(shardSnapshotStatus.generation())); } } if (startedShards != null && startedShards.isEmpty() == false) { @@ -267,7 +267,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { } else if (stage == Stage.DONE) { logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + "updating status on the master", snapshot, shard.key); - notifySuccessfulSnapshotShard(snapshot, shard.key); + notifySuccessfulSnapshotShard(snapshot, shard.key, snapshotStatus.generation()); } else if (stage == Stage.FAILURE) { logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + "updating status on the master", snapshot, shard.key); @@ -297,12 +297,15 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { @Override - public void onResponse(final Void aVoid) { + public void onResponse(String newGeneration) { + assert newGeneration != null; + assert newGeneration.equals(snapshotStatus.generation()); if (logger.isDebugEnabled()) { final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug("snapshot ({}) completed to {} with {}", snapshot, snapshot.getRepository(), lastSnapshotStatus); + logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]", + snapshot, snapshot.getRepository(), lastSnapshotStatus, snapshotStatus.generation()); } - notifySuccessfulSnapshotShard(snapshot, shardId); + notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); } @Override @@ -322,7 +325,7 @@ public void onFailure(Exception e) { * @param snapshotStatus snapshot status */ private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, - final IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + final IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { try { final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); if (indexShard.routingEntry().primary() == false) { @@ -380,7 +383,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " + "updating status on the master", snapshot.snapshot(), shardId); - notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId); + notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localShard.getValue().generation()); } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed @@ -450,15 +453,16 @@ public String toString() { } /** Notify the master node that the given shard has been successfully snapshotted **/ - private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) { + private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, String generation) { + assert generation != null; sendSnapshotShardUpdate(snapshot, shardId, - new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS)); + new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS, generation)); } /** Notify the master node that the given shard failed to be snapshotted **/ private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) { sendSnapshotShardUpdate(snapshot, shardId, - new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure)); + new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, null)); } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index ecce50093ceae..be61b710d65cb 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -123,6 +123,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus */ public static final Version NO_REPO_INITIALIZE_VERSION = Version.V_7_5_0; + public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_8_0_0; + private static final Logger logger = LogManager.getLogger(SnapshotsService.class); private final ClusterService clusterService; @@ -803,7 +805,8 @@ public ClusterState execute(ClusterState currentState) { logger.warn("failing snapshot of shard [{}] on closed node [{}]", shardEntry.key, shardStatus.nodeId()); shards.put(shardEntry.key, - new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown")); + new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown", + shardStatus.generation())); } } } @@ -908,7 +911,8 @@ private static ImmutableOpenMap processWaitingShar // Shard that we were waiting for has started on a node, let's process it snapshotChanged = true; logger.trace("starting shard that we were waiting for [{}] on node [{}]", shardId, shardStatus.nodeId()); - shards.put(shardId, new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId())); + shards.put(shardId, + new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId(), shardStatus.generation())); continue; } else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) { // Shard that we were waiting for hasn't started yet or still relocating - will continue to wait @@ -920,7 +924,8 @@ private static ImmutableOpenMap processWaitingShar // Shard that we were waiting for went into unassigned state or disappeared - giving up snapshotChanged = true; logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId()); - shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned")); + shards.put(shardId, new ShardSnapshotStatus( + shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned", shardStatus.generation())); } else { shards.put(shardId, shardStatus); } @@ -1224,7 +1229,8 @@ public ClusterState execute(ClusterState currentState) { for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { ShardSnapshotStatus status = shardEntry.value; if (status.state().completed() == false) { - status = new ShardSnapshotStatus(status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion"); + status = new ShardSnapshotStatus( + status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation()); } shardsBuilder.put(shardEntry.key, status); } @@ -1411,7 +1417,7 @@ private static ImmutableOpenMap shards = ImmutableOpenMap.builder(); // test more than one waiting shard in an index - shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); - shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); - shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); + shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1")); + shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1")); + shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1")); // test exactly one waiting shard in an index - shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); - shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); + shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING, "1")); + shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1")); // test no waiting shards in an index - shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); + shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1")); Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT, indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index e128c7bc7192c..ac180d9e80ae8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -465,7 +465,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); for (ShardRouting shardRouting : newState.routingTable().index(index).randomAllActiveShardsIt()) { - shardsBuilder.put(shardRouting.shardId(), new SnapshotsInProgress.ShardSnapshotStatus(shardRouting.currentNodeId())); + shardsBuilder.put(shardRouting.shardId(), new SnapshotsInProgress.ShardSnapshotStatus(shardRouting.currentNodeId(), "1")); } final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 7a1bcefea9d95..6004e7fff4520 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -202,7 +202,7 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit - snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index a904879321d58..b2f21e89a7d61 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -159,7 +159,8 @@ public void testSnapshotWithConflictingName() throws IOException { // snapshot the shard final Repository repository = createRepository(); final Snapshot snapshot = new Snapshot(repository.getMetadata().name(), new SnapshotId(randomAlphaOfLength(10), "_uuid")); - snapshotShard(shard, snapshot, repository); + final String shardGen = snapshotShard(shard, snapshot, repository); + assertNotNull(shardGen); final Snapshot snapshotWithSameName = new Snapshot(repository.getMetadata().name(), new SnapshotId( snapshot.getSnapshotId().getName(), "_uuid2")); IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class, diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 6c48a19cbb5e6..3c2d59564deac 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -100,15 +100,16 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID()); IndexCommit indexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory()); - final PlainActionFuture future1 = PlainActionFuture.newFuture(); + final PlainActionFuture future1 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { - IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); + IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, future1); future1.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); }); + final String shardGeneration = future1.actionGet(); Lucene.cleanLuceneIndex(directory); expectThrows(org.apache.lucene.index.IndexNotFoundException.class, () -> Lucene.readSegmentInfos(directory)); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); @@ -127,9 +128,9 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { SnapshotId incSnapshotId = new SnapshotId("test1", "test1"); IndexCommit incIndexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory()); Collection commitFileNames = incIndexCommit.getFileNames(); - final PlainActionFuture future2 = PlainActionFuture.newFuture(); + final PlainActionFuture future2 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { - IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); + IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, future2); future2.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index dc8058b366a6f..1e640e9f3625d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2774,9 +2774,9 @@ public void testDeleteOrphanSnapshot() throws Exception { public ClusterState execute(ClusterState currentState) { // Simulate orphan snapshot ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); - shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); - shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted", null)); + shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted", null)); + shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted", null)); return ClusterState.builder(currentState) .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(List.of(new Entry( new Snapshot(repositoryName, createSnapshotResponse.getSnapshotInfo().snapshotId()), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 46404af9f4037..5d927249804b9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -68,7 +68,7 @@ private Entry randomSnapshot() { String nodeId = randomAlphaOfLength(10); ShardState shardState = randomFrom(ShardState.values()); builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState, - shardState.failed() ? randomAlphaOfLength(10) : null)); + shardState.failed() ? randomAlphaOfLength(10) : null, "1")); } ImmutableOpenMap shards = builder.build(); return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index cce9780b09223..e24c540e4f786 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -828,24 +828,25 @@ protected void recoverShardFromSnapshot(final IndexShard shard, } /** Snapshot a shard using a given repository **/ - protected void snapshotShard(final IndexShard shard, - final Snapshot snapshot, - final Repository repository) throws IOException { - final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); - final PlainActionFuture future = PlainActionFuture.newFuture(); + protected String snapshotShard(final IndexShard shard, + final Snapshot snapshot, + final Repository repository) throws IOException { + final Index index = shard.shardId().getIndex(); + final IndexId indexId = new IndexId(index.getName(), index.getUUID()); + final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final String shardGen; try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { - Index index = shard.shardId().getIndex(); - IndexId indexId = new IndexId(index.getName(), index.getUUID()); - repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus, future); - future.actionGet(); + shardGen = future.actionGet(); } final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); assertEquals(IndexShardSnapshotStatus.Stage.DONE, lastSnapshotStatus.getStage()); assertEquals(shard.snapshotStoreMetadata().size(), lastSnapshotStatus.getTotalFileCount()); assertNull(lastSnapshotStatus.getFailure()); + return shardGen; } /** diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 417e4e98649af..d68004eff4ad3 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -135,7 +135,7 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 293fc04989f98..ddf977cd6a70f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -296,7 +296,7 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 280e4a4344575..df422aec73cd4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -133,7 +133,7 @@ private static MetaData metadataToSnapshot(List indices, MetaData metaD @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { if (mapperService.documentMapper() != null // if there is no mapping this is null && mapperService.documentMapper().sourceMapper().isComplete() == false) { listener.onFailure( diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 34acf179c3fbf..1a124d6b1c7cc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -95,8 +95,8 @@ public void testSourceIncomplete() throws IOException { SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository()); repository.start(); try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { - IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); - final PlainActionFuture future = PlainActionFuture.newFuture(); + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1"); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet); @@ -117,14 +117,15 @@ public void testIncrementalSnapshot() throws IOException { IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository()); repository.start(); - int totalFileCount = -1; + int totalFileCount; + String shardGeneration; try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { - IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null); SnapshotId snapshotId = new SnapshotId("test", "test"); - final PlainActionFuture future = PlainActionFuture.newFuture(); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); - future.actionGet(); + shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); totalFileCount = copy.getTotalFileCount(); @@ -136,11 +137,11 @@ public void testIncrementalSnapshot() throws IOException { try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { SnapshotId snapshotId = new SnapshotId("test_1", "test_1"); - IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); - final PlainActionFuture future = PlainActionFuture.newFuture(); + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); - future.actionGet(); + shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt assertEquals(5, copy.getIncrementalFileCount()); @@ -152,8 +153,8 @@ public void testIncrementalSnapshot() throws IOException { try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { SnapshotId snapshotId = new SnapshotId("test_2", "test_2"); - IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); - final PlainActionFuture future = PlainActionFuture.newFuture(); + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); future.actionGet(); @@ -199,8 +200,8 @@ public void testRestoreMinmal() throws IOException { SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository()); repository.start(); try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { - IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); - final PlainActionFuture future = PlainActionFuture.newFuture(); + IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future); From 121e2f6a7ba2a906624d908fb0129716fe348f0a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 23 Sep 2019 15:28:37 +0200 Subject: [PATCH 2/2] CR comments --- .../snapshots/IndexShardSnapshotStatus.java | 2 +- .../snapshots/SnapshotsService.java | 18 ++++++++---------- .../index/shard/IndexShardTestCase.java | 6 +++++- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index dd3c8c05b22ff..129ce08ebd825 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -116,11 +116,11 @@ public synchronized void moveToDone(final long endTime, final String newGenerati assert newGeneration != null; if (stage.compareAndSet(Stage.FINALIZE, Stage.DONE)) { this.totalTime = Math.max(0L, endTime - startTime); + this.generation.set(newGeneration); } else { throw new IllegalStateException("Unable to move the shard snapshot status to [DONE]: " + "expecting [FINALIZE] but got [" + stage.get() + "]"); } - this.generation.set(newGeneration); } public synchronized void abortIfNotCompleted(final String failure) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index be61b710d65cb..aa8193c92f22a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1416,8 +1416,7 @@ private static ImmutableOpenMap