diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java index 32a10451a0dd3..e9962706bcd39 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexPrimaryRelocationIT.java @@ -56,14 +56,16 @@ public class IndexPrimaryRelocationIT extends OpenSearchIntegTestCase { private static final int RELOCATION_COUNT = 15; + public void setup() {} + + public Settings indexSettings() { + return Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build(); + } + public void testPrimaryRelocationWhileIndexing() throws Exception { internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3)); - client().admin() - .indices() - .prepareCreate("test") - .setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) - .setMapping("field", "type=text") - .get(); + setup(); + client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); ensureGreen("test"); AtomicInteger numAutoGenDocs = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/IndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/IndexPrimaryRelocationIT.java deleted file mode 100644 index 6d498c516ae32..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/IndexPrimaryRelocationIT.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.remotestore; - -import org.junit.Before; -import org.opensearch.action.DocWriteResponse; -import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.cluster.node.hotthreads.NodeHotThreads; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; -import org.opensearch.common.Priority; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.hamcrest.OpenSearchAssertions; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) -public class IndexPrimaryRelocationIT extends RemoteStoreBaseIntegTestCase { - - private static final int RELOCATION_COUNT = 15; - - @Before - public void setup() { - setupRepo(); - } - - - public Settings indexSettings() { - return Settings.builder() - .put(super.indexSettings()) - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .build(); - } - - public void testPrimaryRelocationWhileIndexing() throws Exception { - internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3)); - client().admin() - .indices() - .prepareCreate("test") - .setSettings(indexSettings()) - .setMapping("field", "type=text") - .get(); - ensureGreen("test"); - AtomicInteger numAutoGenDocs = new AtomicInteger(); - final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = new Thread() { - @Override - public void run() { - while (finished.get() == false && numAutoGenDocs.get() < 10_000) { - IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex("test").setSource("auto", true).get(); - numAutoGenDocs.incrementAndGet(); - } - } - }; - indexingThread.start(); - - ClusterState initialState = client().admin().cluster().prepareState().get().getState(); - DiscoveryNode[] dataNodes = initialState.getNodes().getDataNodes().values().toArray(new DiscoveryNode[0]); - DiscoveryNode relocationSource = initialState.getNodes() - .getDataNodes() - .get(initialState.getRoutingTable().shardRoutingTable("test", 0).primaryShard().currentNodeId()); - for (int i = 0; i < RELOCATION_COUNT; i++) { - DiscoveryNode relocationTarget = randomFrom(dataNodes); - while (relocationTarget.equals(relocationSource)) { - relocationTarget = randomFrom(dataNodes); - } - logger.info("--> [iteration {}] relocating from {} to {} ", i, relocationSource.getName(), relocationTarget.getName()); - client().admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand("test", 0, relocationSource.getId(), relocationTarget.getId())) - .execute() - .actionGet(); - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setTimeout(TimeValue.timeValueSeconds(60)) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .execute() - .actionGet(); - if (clusterHealthResponse.isTimedOut()) { - final String hotThreads = client().admin() - .cluster() - .prepareNodesHotThreads() - .setIgnoreIdleThreads(false) - .get() - .getNodes() - .stream() - .map(NodeHotThreads::getHotThreads) - .collect(Collectors.joining("\n")); - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - logger.info( - "timed out for waiting for relocation iteration [{}] \ncluster state {} \nhot threads {}", - i, - clusterState, - hotThreads - ); - finished.set(true); - indexingThread.join(); - throw new AssertionError("timed out waiting for relocation iteration [" + i + "] "); - } - logger.info("--> [iteration {}] relocation complete", i); - relocationSource = relocationTarget; - // indexing process aborted early, no need for more relocations as test has already failed - if (indexingThread.isAlive() == false) { - break; - } - if (i > 0 && i % 5 == 0) { - logger.info("--> [iteration {}] flushing index", i); - client().admin().indices().prepareFlush("test").get(); - } - } - finished.set(true); - indexingThread.join(); - refresh("test"); - OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); - OpenSearchAssertions.assertHitCount( - client().prepareSearch("test") - .setTrackTotalHits(true)// extra paranoia ;) - .setQuery(QueryBuilders.termQuery("auto", true)) - .get(), - numAutoGenDocs.get() - ); - } - -} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java new file mode 100644 index 0000000000000..59c65a1895f06 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexPrimaryRelocationIT.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.recovery.IndexPrimaryRelocationIT; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) +public class RemoteIndexPrimaryRelocationIT extends IndexPrimaryRelocationIT { + + protected static final String REPOSITORY_NAME = "test-remore-store-repo"; + + protected Path absolutePath; + + public void setup() { + absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index b45f4121f2fc3..dfdfe07647293 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -335,12 +335,16 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { public void testInvalidRepo() { internalCluster().startDataOnlyNodes(3); - createIndex(INDEX_NAME,Settings.builder() - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) - .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME + "invalid") - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build()); + createIndex( + INDEX_NAME, + Settings.builder() + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME + "invalid") + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); ensureRed(INDEX_NAME); client().admin() .cluster() diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java index d6ac3ae67b432..b8229a2117b02 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java @@ -134,35 +134,51 @@ public void testFailoverWhileIndexing() throws Exception { shard_count = scaledRandomIntBetween(1, 5); createIndex(indexName); ensureGreen(indexName); + int docCount = 10_000; + final int indexDocAfterFailover = 10; AtomicInteger numAutoGenDocs = new AtomicInteger(); CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean finished = new AtomicBoolean(false); Thread indexingThread = new Thread(() -> { - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - while (finished.get() == false && numAutoGenDocs.get() < 10 ) { - logger.info("Keeping indexing"); - IndexResponse indexResponse = internalCluster().clusterManagerClient().prepareIndex(indexName).setId("id").setSource("field", numAutoGenDocs.get()).get(); - if(indexResponse.status() == RestStatus.CREATED) { + int docsAfterFailover = 0; + while (finished.get() == false && numAutoGenDocs.get() < docCount) { + IndexResponse indexResponse = internalCluster().clusterManagerClient() + .prepareIndex(indexName) + .setSource("field", numAutoGenDocs.get()) + .get(); + if (indexResponse.status() == RestStatus.CREATED) { numAutoGenDocs.incrementAndGet(); + if (numAutoGenDocs.get() == docCount / 2) { + // Node is killed on this + if (random().nextInt(3) == 0) { + refresh(indexName); + } else if (random().nextInt(2) == 0) { + flush(indexName); + } + latch.countDown(); + } else if (numAutoGenDocs.get() > docCount / 2) { + docsAfterFailover++; + if (docsAfterFailover == indexDocAfterFailover) { + finished.set(true); + } + } } } - logger.info("Done indexing"); + logger.debug("Done indexing"); }); indexingThread.start(); + latch.await(); ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); final int numShards = state.metadata().index(indexName).getNumberOfShards(); final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard(); final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId()); - latch.countDown(); + // stop the random data node, all remaining shards are promoted to primaries internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName())); ensureYellowAndNoInitializingShards(indexName); indexingThread.join(); - assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numAutoGenDocs.get()); + refresh(indexName); + assertHitCount(client(internalCluster().getClusterManagerName()).prepareSearch(indexName).setSize(0).get(), numAutoGenDocs.get()); } }