From 4c5a980f266862a279067b44b7a391b4f4286865 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Thu, 3 Aug 2023 16:47:20 +0530 Subject: [PATCH] Avoid duplicate indexing in case of SegRep enabled indices' translog replay (#8578) Signed-off-by: Gaurav Bafna --- .../recovery/IndexPrimaryRelocationIT.java | 14 +-- .../RemoteIndexPrimaryRelocationIT.java | 66 ++++++++++++++ .../opensearch/remotestore/RemoteStoreIT.java | 3 + .../ReplicaToPrimaryPromotionIT.java | 64 ++++++++++++++ .../index/engine/InternalEngine.java | 28 ++++-- .../index/shard/IndexShardTests.java | 27 +++--- ...dTests.java => RemoteIndexShardTests.java} | 39 ++++++++- ...overyWithRemoteTranslogOnPrimaryTests.java | 12 +-- .../SegmentReplicationIndexShardTests.java | 40 ++++++++- .../TranslogTransferManagerTests.java | 5 +- ...enSearchIndexLevelReplicationTestCase.java | 17 +--- .../index/shard/IndexShardTestCase.java | 86 ++++++++++++++----- 12 files changed, 325 insertions(+), 76 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java rename server/src/test/java/org/opensearch/index/shard/{SegmentReplicationWithRemoteIndexShardTests.java => RemoteIndexShardTests.java} (78%) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java index 32a10451a0dd3..e9962706bcd39 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java @@ -56,14 +56,16 @@ public class IndexPrimaryRelocationIT extends OpenSearchIntegTestCase { private static final int RELOCATION_COUNT = 15; + public void setup() {} + + public Settings indexSettings() { + return Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build(); + } + public void testPrimaryRelocationWhileIndexing() throws Exception { internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3)); - client().admin() - .indices() - .prepareCreate("test") - .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) - .setMapping("field", "type=text") - .get(); + setup(); + client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); ensureGreen("test"); AtomicInteger numAutoGenDocs = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java new file mode 100644 index 0000000000000..a9482c8c19187 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.recovery.IndexPrimaryRelocationIT; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) +public class RemoteIndexPrimaryRelocationIT extends IndexPrimaryRelocationIT { + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + + protected Path absolutePath; + + public void setup() { + absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, REPOSITORY_NAME, false)) + .build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 2da9f1bf1a35e..edb7cfbf01fb7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -37,6 +37,9 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java index 549b4985894a7..c73e7f603b09b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java @@ -11,17 +11,22 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.junit.Before; import org.opensearch.action.admin.indices.close.CloseIndexResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.settings.Settings; +import org.opensearch.core.rest.RestStatus; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import java.util.Locale; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -116,4 +121,63 @@ public void testPromoteReplicaToPrimary() throws Exception { refresh(indexName); assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs); } + + public void testFailoverWhileIndexing() throws Exception { + internalCluster().startNode(); + internalCluster().startNode(); + final String indexName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + shard_count = scaledRandomIntBetween(1, 5); + createIndex(indexName); + ensureGreen(indexName); + int docCount = scaledRandomIntBetween(20, 50); + final int indexDocAfterFailover = scaledRandomIntBetween(20, 50); + AtomicInteger numAutoGenDocs = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = new Thread(() -> { + int docsAfterFailover = 0; + while (finished.get() == false && numAutoGenDocs.get() < docCount) { + IndexResponse indexResponse = internalCluster().clusterManagerClient() + .prepareIndex(indexName) + .setSource("field", numAutoGenDocs.get()) + .get(); + + if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) { + numAutoGenDocs.incrementAndGet(); + if (numAutoGenDocs.get() == docCount / 2) { + if (random().nextInt(3) == 0) { + refresh(indexName); + } else if (random().nextInt(2) == 0) { + flush(indexName); + } + // Node is killed on this + latch.countDown(); + } else if (numAutoGenDocs.get() > docCount / 2) { + docsAfterFailover++; + if (docsAfterFailover == indexDocAfterFailover) { + finished.set(true); + } + } + } + } + logger.debug("Done indexing"); + }); + indexingThread.start(); + latch.await(); + + ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); + final int numShards = state.metadata().index(indexName).getNumberOfShards(); + final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard(); + final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId()); + + // stop the random data node, all remaining shards are promoted to primaries + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName())); + ensureYellowAndNoInitializingShards(indexName); + indexingThread.join(); + refresh(indexName); + assertHitCount( + client(internalCluster().getClusterManagerName()).prepareSearch(indexName).setSize(0).setTrackTotalHits(true).get(), + numAutoGenDocs.get() + ); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 83a9c1faf8dbe..ab3790a8d2044 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -801,6 +801,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); + boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled(); if (versionValue != null) { status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue); } else { @@ -813,10 +814,8 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) } else if (op.seqNo() > docAndSeqNo.seqNo) { status = OpVsLuceneDocStatus.OP_NEWER; } else if (op.seqNo() == docAndSeqNo.seqNo) { - assert localCheckpointTracker.hasProcessed(op.seqNo()) : "local checkpoint tracker is not updated seq_no=" - + op.seqNo() - + " id=" - + op.id(); + assert localCheckpointTracker.hasProcessed(op.seqNo()) || segRepEnabled + : "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id(); status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } else { status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; @@ -1018,6 +1017,7 @@ public IndexResult index(Index index) throws IOException { plan.currentNotFoundOrDeleted ); } + } if (index.origin().isFromTranslog() == false) { final Translog.Location location; @@ -1096,10 +1096,18 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; plan = IndexingStrategy.optimizedAppendOnly(index.version(), 0); } else { + boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled(); versionMap.enforceSafeAccess(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(index.version()); + if (segRepEnabled) { + // For segrep based indices, we can't completely rely on localCheckpointTracker + // as the preserved checkpoint may not have all the operations present in lucene + // we don't need to index it again as stale op as it would create multiple documents for same seq no + plan = IndexingStrategy.processButSkipLucene(false, index.version()); + } else { + plan = IndexingStrategy.processAsStaleOp(index.version()); + } } else { plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0); } @@ -1533,9 +1541,17 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // See testRecoveryWithOutOfOrderDelete for an example of peer recovery plan = DeletionStrategy.processButSkipLucene(false, delete.version()); } else { + boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabled(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = DeletionStrategy.processAsStaleOp(delete.version()); + if (segRepEnabled) { + // For segrep based indices, we can't completely rely on localCheckpointTracker + // as the preserved checkpoint may not have all the operations present in lucene + // we don't need to index it again as stale op as it would create multiple documents for same seq no + plan = DeletionStrategy.processButSkipLucene(false, delete.version()); + } else { + plan = DeletionStrategy.processAsStaleOp(delete.version()); + } } else { plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 097d001bc3012..2abd98392cb7b 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -36,8 +36,8 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexableField; -import org.apache.lucene.index.Term; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.Term; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; @@ -50,8 +50,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.junit.Assert; -import org.opensearch.common.io.PathUtils; -import org.opensearch.core.Assertions; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -77,11 +75,12 @@ import org.opensearch.common.Randomness; import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; -import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -90,8 +89,9 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.common.lease.Releasable; -import org.opensearch.common.lease.Releasables; +import org.opensearch.core.Assertions; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; @@ -106,8 +106,8 @@ import org.opensearch.index.engine.EngineTestCase; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.InternalEngineFactory; -import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.fielddata.FieldDataStats; import org.opensearch.index.fielddata.IndexFieldData; @@ -167,6 +167,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -191,7 +192,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import java.util.Collection; + import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -216,8 +217,8 @@ import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex; -import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.opensearch.test.hamcrest.RegexMatcher.matches; @@ -2824,13 +2825,14 @@ public void testCommitLevelRestoreShardFromRemoteStore() throws IOException { } public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException { + String remoteStorePath = createTempDir().toString(); IndexShard target = newStartedShard( true, Settings.builder() .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "temp-fs") - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "temp-fs") + .put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, remoteStorePath + "__test") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, remoteStorePath + "__test") .build(), new InternalEngineFactory() ); @@ -2895,7 +2897,6 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep final PlainActionFuture future = PlainActionFuture.newFuture(); target.restoreFromRemoteStore(future); target.remoteStore().decRef(); - assertTrue(future.actionGet()); assertDocs(target, "1", "2"); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java similarity index 78% rename from server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java rename to server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index b15d8b66fca55..a01169480de0b 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -19,10 +19,12 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.CountDownLatch; import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; -public class SegmentReplicationWithRemoteIndexShardTests extends SegmentReplicationIndexShardTests { +public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests { private static final String REPOSITORY_NAME = "temp-fs"; private static final Settings settings = Settings.builder() @@ -135,4 +137,39 @@ public void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushF } } } + + public void testNoDuplicateSeqNo() throws Exception { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir()); + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + shards.startPrimary(); + shards.startAll(); + shards.indexDocs(10); + replicateSegments(primaryShard, shards.getReplicas()); + + flushShard(primaryShard); + shards.indexDocs(10); + replicateSegments(primaryShard, shards.getReplicas()); + + shards.indexDocs(10); + primaryShard.refresh("test"); + replicateSegments(primaryShard, shards.getReplicas()); + + CountDownLatch latch = new CountDownLatch(1); + shards.promoteReplicaToPrimary(replicaShard, (shard, listener) -> { + try { + assertAtMostOneLuceneDocumentPerSequenceNumber(replicaShard.getEngine()); + } catch (IOException e) { + throw new RuntimeException(e); + } + latch.countDown(); + }); + latch.await(); + for (IndexShard shard : shards) { + if (shard != null) { + closeShard(shard, false); + } + } + } } diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index a0ed28afcb673..9de1324d8727f 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -23,7 +23,6 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.index.store.Store; import org.opensearch.index.translog.WriteOnlyTranslogManager; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.common.ReplicationType; @@ -76,15 +75,6 @@ public void testStartSequenceForReplicaRecovery() throws Exception { int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); shards.flush(); - - final ShardRouting replicaRouting2 = newShardRouting( - replicaRouting.shardId(), - replicaRouting.currentNodeId(), - false, - ShardRoutingState.INITIALIZING, - RecoverySource.PeerRecoverySource.INSTANCE - ); - Store remoteStore = createRemoteStore(remoteDir, replicaRouting2, newIndexMetadata); IndexShard newReplicaShard = newShard( newShardRouting( replicaRouting.shardId(), @@ -102,7 +92,7 @@ public void testStartSequenceForReplicaRecovery() throws Exception { replica.getGlobalCheckpointSyncer(), replica.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER, - remoteStore + remoteDir ); shards.addReplica(newReplicaShard); AtomicBoolean assertDone = new AtomicBoolean(false); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 57602d96745f6..58ae4b404e69c 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -24,12 +24,12 @@ import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.common.lease.Releasable; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngineFactory; @@ -51,8 +51,8 @@ import org.opensearch.indices.replication.SegmentReplicationState; import org.opensearch.indices.replication.SegmentReplicationTarget; import org.opensearch.indices.replication.SegmentReplicationTargetService; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; @@ -84,6 +84,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { @@ -738,6 +739,41 @@ protected SegmentReplicationTargetService newTargetService(SegmentReplicationSou ); } + public void testNoDuplicateSeqNo() throws Exception { + Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir()); + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + shards.startPrimary(); + shards.startAll(); + shards.indexDocs(10); + replicateSegments(primaryShard, shards.getReplicas()); + + flushShard(primaryShard); + shards.indexDocs(10); + replicateSegments(primaryShard, shards.getReplicas()); + + shards.indexDocs(10); + primaryShard.refresh("test"); + replicateSegments(primaryShard, shards.getReplicas()); + + CountDownLatch latch = new CountDownLatch(1); + shards.promoteReplicaToPrimary(replicaShard, (shard, listener) -> { + try { + assertAtMostOneLuceneDocumentPerSequenceNumber(replicaShard.getEngine()); + } catch (IOException e) { + throw new RuntimeException(e); + } + latch.countDown(); + }); + latch.await(); + for (IndexShard shard : shards) { + if (shard != null) { + closeShard(shard, false); + } + } + } + /** * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts. diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 3d622d6bdf8b8..99937faa18584 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -16,8 +16,8 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; -import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.translog.Translog; @@ -44,7 +44,6 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anySet; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -432,7 +431,7 @@ public void testDeleteStaleTranslogMetadata() { .listAllInSortedOrderAsync( eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), - anyString(), + eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class) ); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 278847e56e65f..e8c5203129374 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -75,12 +75,12 @@ import org.opensearch.common.collect.Iterators; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.common.lease.Releasable; -import org.opensearch.common.lease.Releasables; import org.opensearch.core.index.Index; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.DocIdSeqNoAndSource; @@ -98,7 +98,6 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; @@ -253,10 +252,6 @@ protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - Store remoteStore = null; - if (remotePath != null) { - remoteStore = createRemoteStore(remotePath, primaryRouting, indexMetadata); - } primary = newShard( primaryRouting, indexMetadata, @@ -264,7 +259,7 @@ protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) t getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer, - remoteStore + remotePath ); replicas = new CopyOnWriteArrayList<>(); this.indexMetadata = indexMetadata; @@ -390,10 +385,6 @@ public IndexShard addReplica() throws IOException { public IndexShard addReplica(Path remotePath) throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); - Store remoteStore = null; - if (remotePath != null) { - remoteStore = createRemoteStore(remotePath, replicaRouting, indexMetadata); - } final IndexShard replica = newShard( replicaRouting, indexMetadata, @@ -401,7 +392,7 @@ public IndexShard addReplica(Path remotePath) throws IOException { getEngineFactory(replicaRouting), () -> {}, retentionLeaseSyncer, - remoteStore + remotePath ); addReplica(replica); return replica; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 9dc114eb923d3..7ef460ebbfb50 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -48,6 +48,7 @@ import org.opensearch.action.support.replication.TransportReplicationAction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.IndexShardRoutingTable; @@ -66,7 +67,6 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; -import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.io.PathUtils; import org.opensearch.common.lease.Releasable; @@ -77,9 +77,12 @@ import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.io.IOUtils; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; +import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.MapperTestUtils; import org.opensearch.index.VersionType; @@ -144,7 +147,9 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.BlobStoreTestUtil; import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase; +import org.opensearch.repositories.fs.FsRepository; import org.opensearch.snapshots.Snapshot; import org.opensearch.test.DummyShardLock; import org.opensearch.test.OpenSearchTestCase; @@ -462,7 +467,7 @@ protected IndexShard newShard( @Nullable EngineFactory engineFactory, Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, - Store remoteStore, + Path path, IndexingOperationListener... listeners ) throws IOException { // add node id as name to settings for proper logging @@ -480,7 +485,7 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, EMPTY_EVENT_LISTENER, - remoteStore, + path, listeners ); } @@ -508,7 +513,7 @@ protected IndexShard newShard( Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, - Store remoteStore, + Path remotePath, IndexingOperationListener... listeners ) throws IOException { return newShard( @@ -523,7 +528,7 @@ protected IndexShard newShard( retentionLeaseSyncer, indexEventListener, SegmentReplicationCheckpointPublisher.EMPTY, - remoteStore, + remotePath, listeners ); } @@ -604,7 +609,7 @@ protected IndexShard newShard( RetentionLeaseSyncer retentionLeaseSyncer, IndexEventListener indexEventListener, SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable Store remoteStore, + @Nullable Path remotePath, IndexingOperationListener... listeners ) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); @@ -632,26 +637,32 @@ protected IndexShard newShard( Collections.emptyList(), clusterSettings ); - + Store remoteStore = null; RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService = null; + RepositoriesService mockRepoSvc = mock(RepositoriesService.class); + if (indexSettings.isRemoteStoreEnabled()) { - if (remoteStore == null) { - Path remoteStorePath; - String remoteStoreRepository = indexSettings.getRemoteStoreRepository(); - if (remoteStoreRepository != null && remoteStoreRepository.endsWith("__test")) { - remoteStorePath = PathUtils.get(remoteStoreRepository.replace("__test", "")); - } else { - remoteStorePath = createTempDir(); - } - remoteStore = createRemoteStore(remoteStorePath, routing, indexMetadata); + String remoteStoreRepository = indexSettings.getRemoteStoreRepository(); + // remote path via setting a repository . This is a hack used for shards are created using reset . + // since we can't get remote path from IndexShard directly, we are using repository to store it . + if (remoteStoreRepository != null && remoteStoreRepository.endsWith("__test")) { + remotePath = PathUtils.get(remoteStoreRepository.replace("__test", "")); + } else if (remotePath == null) { + remotePath = createTempDir(); } + + remoteStore = createRemoteStore(remotePath, routing, indexMetadata); + remoteRefreshSegmentPressureService = new RemoteRefreshSegmentPressureService(clusterService, indexSettings.getSettings()); + BlobStoreRepository repo = createRepository(remotePath); + when(mockRepoSvc.repository(any())).thenAnswer(invocationOnMock -> repo); } final BiFunction translogFactorySupplier = (settings, shardRouting) -> { if (settings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + return new RemoteBlobStoreInternalTranslogFactory( - this::createRepositoriesService, + () -> mockRepoSvc, threadPool, settings.getRemoteStoreTranslogRepository() ); @@ -697,6 +708,39 @@ protected IndexShard newShard( return indexShard; } + private BlobStoreRepository createRepository(Path path) { + Settings settings = Settings.builder().put("location", path).build(); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final FsRepository repository = new FsRepository( + repositoryMetadata, + createEnvironment(path), + xContentRegistry(), + clusterService, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually + } + }; + clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); + repository.start(); + return repository; + } + + private Environment createEnvironment(Path path) { + Path home = createTempDir(); + return TestEnvironment.newEnvironment( + Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), path.toAbsolutePath()) + .build() + ); + } + protected RepositoriesService createRepositoriesService() { RepositoriesService repositoriesService = Mockito.mock(RepositoriesService.class); BlobStoreRepository repository = Mockito.mock(BlobStoreRepository.class); @@ -778,7 +822,7 @@ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, Index current.indexSettings.getIndexMetadata(), current.engineFactory, current.engineConfigFactory, - current.remoteStore(), + null, listeners ); } @@ -797,7 +841,7 @@ protected IndexShard reinitShard( IndexMetadata indexMetadata, EngineFactory engineFactory, EngineConfigFactory engineConfigFactory, - Store remoteStore, + Path remotePath, IndexingOperationListener... listeners ) throws IOException { closeShards(current); @@ -812,7 +856,7 @@ protected IndexShard reinitShard( current.getGlobalCheckpointSyncer(), current.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER, - remoteStore, + remotePath, listeners ); }