Skip to content

Commit

Permalink
modifying the test
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Jul 11, 2023
1 parent 6835178 commit 21f3d35
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ public class IndexPrimaryRelocationIT extends OpenSearchIntegTestCase {

private static final int RELOCATION_COUNT = 15;

public void setup() {}

public Settings indexSettings() {
return Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build();
}

public void testPrimaryRelocationWhileIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
client().admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.setMapping("field", "type=text")
.get();
setup();
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");
AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.recovery.IndexPrimaryRelocationIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class RemoteIndexPrimaryRelocationIT extends IndexPrimaryRelocationIT {

protected static final String REPOSITORY_NAME = "test-remore-store-repo";

protected Path absolutePath;

public void setup() {
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,16 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {

public void testInvalidRepo() {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME,Settings.builder()
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME + "invalid")
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build());
createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME + "invalid")
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureRed(INDEX_NAME);
client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,35 +134,51 @@ public void testFailoverWhileIndexing() throws Exception {
shard_count = scaledRandomIntBetween(1, 5);
createIndex(indexName);
ensureGreen(indexName);
int docCount = 10_000;
final int indexDocAfterFailover = 10;
AtomicInteger numAutoGenDocs = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
while (finished.get() == false && numAutoGenDocs.get() < 10 ) {
logger.info("Keeping indexing");
IndexResponse indexResponse = internalCluster().clusterManagerClient().prepareIndex(indexName).setId("id").setSource("field", numAutoGenDocs.get()).get();
if(indexResponse.status() == RestStatus.CREATED) {
int docsAfterFailover = 0;
while (finished.get() == false && numAutoGenDocs.get() < docCount) {
IndexResponse indexResponse = internalCluster().clusterManagerClient()
.prepareIndex(indexName)
.setSource("field", numAutoGenDocs.get())
.get();
if (indexResponse.status() == RestStatus.CREATED) {
numAutoGenDocs.incrementAndGet();
if (numAutoGenDocs.get() == docCount / 2) {
// Node is killed on this
if (random().nextInt(3) == 0) {
refresh(indexName);
} else if (random().nextInt(2) == 0) {
flush(indexName);
}
latch.countDown();
} else if (numAutoGenDocs.get() > docCount / 2) {
docsAfterFailover++;
if (docsAfterFailover == indexDocAfterFailover) {
finished.set(true);
}
}
}
}
logger.info("Done indexing");
logger.debug("Done indexing");
});
indexingThread.start();
latch.await();

ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final int numShards = state.metadata().index(indexName).getNumberOfShards();
final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard();
final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId());
latch.countDown();

// stop the random data node, all remaining shards are promoted to primaries
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName()));
ensureYellowAndNoInitializingShards(indexName);
indexingThread.join();
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numAutoGenDocs.get());
refresh(indexName);
assertHitCount(client(internalCluster().getClusterManagerName()).prepareSearch(indexName).setSize(0).get(), numAutoGenDocs.get());
}
}

0 comments on commit 21f3d35

Please sign in to comment.