Skip to content

Commit

Permalink
Added UT for checking starting seq no in replica recovery
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 5, 2022
1 parent 8b589e8 commit c46be7d
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@
package org.opensearch.index.shard;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.index.translog.WriteOnlyTranslogManager;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.common.ReplicationType;

import java.util.List;

import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;

public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchIndexLevelReplicationTestCase {

private static final Settings settings = Settings.builder()
Expand Down Expand Up @@ -92,4 +98,64 @@ public void testNoTranslogHistoryTransferred() throws Exception {
shards.assertAllEqual(numDocs + moreDocs);
}
}

public void testStartSequenceForReplicaRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) {

shards.startPrimary();
final IndexShard primary = shards.getPrimary();
int numDocs = shards.indexDocs(randomIntBetween(10, 100));
shards.flush();

final IndexShard replica = shards.addReplica();
shards.startAll();

allowShardFailures();
replica.failShard("test", null);

final ShardRouting replicaRouting = replica.routingEntry();
final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata())
.primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1)
.build();
closeShards(replica);
shards.removeReplica(replica);

int moreDocs = shards.indexDocs(randomIntBetween(20, 100));
shards.flush();

IndexShard newReplicaShard = newShard(
newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
false,
ShardRoutingState.INITIALIZING,
RecoverySource.PeerRecoverySource.INSTANCE
),
replica.shardPath(),
newIndexMetadata,
null,
null,
replica.getEngineFactory(),
replica.getEngineConfigFactory(),
replica.getGlobalCheckpointSyncer(),
replica.getRetentionLeaseSyncer(),
EMPTY_EVENT_LISTENER,
null
);
shards.addReplica(newReplicaShard);
shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) {
@Override
public IndexShard indexShard() {
IndexShard idxShard = super.indexShard();
// verify the starting sequence number while recovering a failed shard which has a valid last commit
assertEquals(numDocs - 1, idxShard.fetchStartSeqNoFromLastCommit());
return idxShard;
}
});

shards.flush();
replicateSegments(primary, shards.getReplicas());
shards.assertAllEqual(numDocs + moreDocs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,12 @@ protected final void recoverUnstartedReplica(
}
replica.prepareForIndexRecovery();
final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode);
final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
IndexShard indexShard = recoveryTarget.indexShard();
boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = recoveryTarget.state().getPrimary() == false
&& indexShard.isRemoteTranslogEnabledOnPrimary();
final long startingSeqNo = isRecoveringReplicaWithRemoteTxLogEnabledIndex
? indexShard.fetchStartSeqNoFromLastCommit()
: indexShard.recoverLocallyUpToGlobalCheckpoint();
final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
logger,
rNode,
Expand Down

0 comments on commit c46be7d

Please sign in to comment.