From b9e029b245ccc835d418900bf484a081ce9544ac Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Fri, 6 Sep 2024 11:02:59 +0530 Subject: [PATCH] Add wait in replica recovery for allocation id to propagate on source node (#15558) * Add wait for target allocation id to appear Signed-off-by: Gaurav Bafna * making waitForAssignment same Signed-off-by: Gaurav Bafna * Add more test Signed-off-by: Gaurav Bafna --------- Signed-off-by: Gaurav Bafna --- .../LocalStorePeerRecoverySourceHandler.java | 19 +- .../recovery/RecoverySourceHandler.java | 51 +++++ .../RemoteStorePeerRecoverySourceHandler.java | 6 +- ...alStorePeerRecoverySourceHandlerTests.java | 204 ++++++++++++++++++ ...teStorePeerRecoverySourceHandlerTests.java | 162 ++++++++++++++ 5 files changed, 423 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java index ac6b2e6b77d18..234e9ba66f340 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java @@ -12,15 +12,12 @@ import org.opensearch.action.StepListener; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.SetOnce; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.engine.RecoveryEngineException; -import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeaseNotFoundException; import org.opensearch.index.seqno.RetentionLeases; @@ -58,21 +55,7 @@ public LocalStorePeerRecoverySourceHandler( @Override protected void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException { final SetOnce retentionLeaseRef = new SetOnce<>(); - - RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying recovery of {} as it is not listed as assigned to target node {}", - request.shardId(), - request.targetNode() - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting))); - }, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger); + waitForAssignmentPropagate(retentionLeaseRef); final Closeable retentionLock = shard.acquireHistoryRetentionLock(); resources.add(retentionLock); final long startingSeqNo; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 8966516d3ef7f..7aa90d2f9fa3e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -41,10 +41,14 @@ import org.apache.lucene.util.ArrayUtil; import org.opensearch.action.ActionRunnable; import org.opensearch.action.StepListener; +import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.ThreadedActionListener; import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.SetOnce; import org.opensearch.common.StopWatch; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; @@ -59,6 +63,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.engine.RecoveryEngineException; +import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeaseNotFoundException; import org.opensearch.index.seqno.RetentionLeases; @@ -79,12 +84,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.IntSupplier; import java.util.stream.StreamSupport; @@ -191,6 +198,50 @@ public void recoverToTarget(ActionListener listener) { protected abstract void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException; + /* + Waits for cluster state propagation of assignment of replica on the target node + */ + void waitForAssignmentPropagate(SetOnce retentionLeaseRef) { + BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 5); + AtomicReference targetShardRouting = new AtomicReference<>(); + Iterator backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator(); + while (backoffDelayIterator.hasNext()) { + RunUnderPrimaryPermit.run(() -> { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId())); + if (targetShardRouting.get() == null) { + logger.info( + "delaying recovery of {} as it is not listed as assigned to target node {}", + request.shardId(), + request.targetNode() + ); + Thread.sleep(backoffDelayIterator.next().millis()); + } + if (targetShardRouting.get() != null) { + assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was " + + targetShardRouting; + retentionLeaseRef.set( + shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get())) + ); + } + + }, + shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", + shard, + cancellableThreads, + logger + ); + + if (targetShardRouting.get() != null) { + return; + } + } + if (targetShardRouting.get() != null) { + return; + } + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } + protected void finalizeStepAndCompleteFuture( long startingSeqNo, StepListener sendSnapshotStep, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java index 66c7a3b48f28f..383ed92314165 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java @@ -10,11 +10,13 @@ import org.apache.lucene.index.IndexCommit; import org.opensearch.action.StepListener; +import org.opensearch.common.SetOnce; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.engine.RecoveryEngineException; +import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.RunUnderPrimaryPermit; @@ -48,7 +50,8 @@ protected void innerRecoveryToTarget(ActionListener listener, // A replica of an index with remote translog does not require the translogs locally and keeps receiving the // updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed // and there is no translog replay done. - + final SetOnce retentionLeaseRef = new SetOnce<>(); + waitForAssignmentPropagate(retentionLeaseRef); final StepListener sendFileStep = new StepListener<>(); final StepListener prepareEngineStep = new StepListener<>(); final StepListener sendSnapshotStep = new StepListener<>(); @@ -102,4 +105,5 @@ protected void innerRecoveryToTarget(ActionListener listener, finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure); } + } diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 4685a7196b85a..a4598c4294a33 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -52,6 +52,8 @@ import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Numbers; import org.opensearch.common.Randomness; import org.opensearch.common.SetOnce; @@ -141,6 +143,8 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -746,6 +750,206 @@ void phase2( assertFalse(phase2Called.get()); } + /* + If the replica allocation id is not reflected in source nodes routing table even after retries, + recoveries should fail + */ + public void testThrowExceptionOnNoTargetInRouting() throws IOException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + final StartRecoveryRequest request = getStartRecoveryRequest(); + final IndexShard shard = mock(IndexShard.class); + when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.isRelocatedPrimary()).thenReturn(false); + final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class); + final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class); + when(routingTable.getByAllocationId(anyString())).thenReturn(null); + when(shard.getReplicationGroup()).thenReturn(replicationGroup); + when(replicationGroup.getRoutingTable()).thenReturn(routingTable); + when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class)); + doAnswer(invocation -> { + ((ActionListener) invocation.getArguments()[0]).onResponse(() -> {}); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any()); + + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random())) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ); + if (randomBoolean()) { + indexMetadata.state(IndexMetadata.State.CLOSE); + } + when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY)); + + final AtomicBoolean phase1Called = new AtomicBoolean(); + final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); + final AtomicBoolean phase2Called = new AtomicBoolean(); + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( + shard, + mock(RecoveryTargetHandler.class), + threadPool, + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + between(1, 8), + between(1, 8) + ) { + + @Override + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { + phase1Called.set(true); + super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep); + } + + @Override + void prepareTargetForTranslog(int totalTranslogOps, ActionListener listener) { + prepareTargetForTranslogCalled.set(true); + super.prepareTargetForTranslog(totalTranslogOps, listener); + } + + @Override + void phase2( + long startingSeqNo, + long endingSeqNo, + Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, + RetentionLeases retentionLeases, + long mappingVersion, + ActionListener listener + ) throws IOException { + phase2Called.set(true); + super.phase2( + startingSeqNo, + endingSeqNo, + snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersion, + listener + ); + } + + }; + PlainActionFuture future = new PlainActionFuture<>(); + expectThrows(DelayRecoveryException.class, () -> { + handler.recoverToTarget(future); + future.actionGet(); + }); + verify(routingTable, times(5)).getByAllocationId(null); + assertFalse(phase1Called.get()); + assertFalse(prepareTargetForTranslogCalled.get()); + assertFalse(phase2Called.get()); + } + + /* + Tests when the replica allocation id is reflected in source nodes routing table even after 1 retry + */ + public void testTargetInRoutingInSecondAttempt() throws IOException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + final StartRecoveryRequest request = getStartRecoveryRequest(); + final IndexShard shard = mock(IndexShard.class); + when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.isRelocatedPrimary()).thenReturn(false); + when(shard.getRetentionLeases()).thenReturn(mock(RetentionLeases.class)); + final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class); + final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class); + final ShardRouting shardRouting = mock(ShardRouting.class); + when(shardRouting.initializing()).thenReturn(true); + when(shardRouting.currentNodeId()).thenReturn("node"); + when(routingTable.getByAllocationId(any())).thenReturn(null, shardRouting); + when(shard.getReplicationGroup()).thenReturn(replicationGroup); + when(replicationGroup.getRoutingTable()).thenReturn(routingTable); + when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class)); + doAnswer(invocation -> { + ((ActionListener) invocation.getArguments()[0]).onResponse(() -> {}); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any()); + + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random())) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ); + if (randomBoolean()) { + indexMetadata.state(IndexMetadata.State.CLOSE); + } + when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY)); + + final AtomicBoolean phase1Called = new AtomicBoolean(); + final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); + final AtomicBoolean phase2Called = new AtomicBoolean(); + final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler( + shard, + mock(RecoveryTargetHandler.class), + threadPool, + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + between(1, 8), + between(1, 8) + ) { + + @Override + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { + phase1Called.set(true); + super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep); + } + + @Override + void prepareTargetForTranslog(int totalTranslogOps, ActionListener listener) { + prepareTargetForTranslogCalled.set(true); + super.prepareTargetForTranslog(totalTranslogOps, listener); + } + + @Override + void phase2( + long startingSeqNo, + long endingSeqNo, + Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, + RetentionLeases retentionLeases, + long mappingVersion, + ActionListener listener + ) throws IOException { + phase2Called.set(true); + super.phase2( + startingSeqNo, + endingSeqNo, + snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersion, + listener + ); + } + + }; + handler.waitForAssignmentPropagate(new SetOnce<>()); + verify(routingTable, times(2)).getByAllocationId(null); + } + public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { final CancellableThreads cancellableThreads = new CancellableThreads(); final IndexShard shard = mock(IndexShard.class); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 131514eb019b3..7f913f3c8596a 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -8,20 +8,64 @@ package org.opensearch.indices.recovery; +import org.apache.lucene.index.IndexCommit; +import org.opensearch.Version; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.common.UUIDs; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.engine.SegmentsStats; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.VersionUtils; +import java.io.IOException; import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLevelReplicationTestCase { + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build() + ); + private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1); + + private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private static final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") @@ -73,4 +117,122 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { assertFalse(primary.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(replica2.routingEntry()))); } } + + public void testThrowExceptionOnNoTargetInRouting() throws IOException { + final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); + final StartRecoveryRequest request = getStartRecoveryRequest(); + final IndexShard shard = mock(IndexShard.class); + when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); + when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class)); + when(shard.isRelocatedPrimary()).thenReturn(false); + final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class); + final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class); + when(routingTable.getByAllocationId(anyString())).thenReturn(null); + when(shard.getReplicationGroup()).thenReturn(replicationGroup); + when(replicationGroup.getRoutingTable()).thenReturn(routingTable); + when(shard.acquireSafeIndexCommit()).thenReturn(mock(GatedCloseable.class)); + doAnswer(invocation -> { + ((ActionListener) invocation.getArguments()[0]).onResponse(() -> {}); + return null; + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), any()); + + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomVersion(random())) + .put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())) + ); + if (randomBoolean()) { + indexMetadata.state(IndexMetadata.State.CLOSE); + } + when(shard.indexSettings()).thenReturn(new IndexSettings(indexMetadata.build(), Settings.EMPTY)); + + final AtomicBoolean phase1Called = new AtomicBoolean(); + final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); + final AtomicBoolean phase2Called = new AtomicBoolean(); + final RecoverySourceHandler handler = new RemoteStorePeerRecoverySourceHandler( + shard, + mock(RecoveryTargetHandler.class), + threadPool, + request, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + between(1, 8), + between(1, 8) + ) { + + @Override + void phase1( + IndexCommit snapshot, + long startingSeqNo, + IntSupplier translogOps, + ActionListener listener, + boolean skipCreateRetentionLeaseStep + ) { + phase1Called.set(true); + super.phase1(snapshot, startingSeqNo, translogOps, listener, skipCreateRetentionLeaseStep); + } + + @Override + void prepareTargetForTranslog(int totalTranslogOps, ActionListener listener) { + prepareTargetForTranslogCalled.set(true); + super.prepareTargetForTranslog(totalTranslogOps, listener); + } + + @Override + void phase2( + long startingSeqNo, + long endingSeqNo, + Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp, + long maxSeqNoOfUpdatesOrDeletes, + RetentionLeases retentionLeases, + long mappingVersion, + ActionListener listener + ) throws IOException { + phase2Called.set(true); + super.phase2( + startingSeqNo, + endingSeqNo, + snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersion, + listener + ); + } + + }; + PlainActionFuture future = new PlainActionFuture<>(); + expectThrows(DelayRecoveryException.class, () -> { + handler.recoverToTarget(future); + future.actionGet(); + }); + verify(routingTable, times(5)).getByAllocationId(null); + assertFalse(phase1Called.get()); + assertFalse(prepareTargetForTranslogCalled.get()); + assertFalse(phase2Called.get()); + } + + public StartRecoveryRequest getStartRecoveryRequest() throws IOException { + Store.MetadataSnapshot metadataSnapshot = randomBoolean() + ? Store.MetadataSnapshot.EMPTY + : new Store.MetadataSnapshot( + Collections.emptyMap(), + Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()), + randomIntBetween(0, 100) + ); + return new StartRecoveryRequest( + shardId, + null, + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), + new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), + metadataSnapshot, + randomBoolean(), + randomNonNegativeLong(), + randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong() + ); + } }