Skip to content

Commit

Permalink
modifying the test
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Jul 10, 2023
1 parent 0fccd95 commit 384374d
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,20 @@ public class IndexPrimaryRelocationIT extends OpenSearchIntegTestCase {

private static final int RELOCATION_COUNT = 15;

public void setup() {
}

public Settings indexSettings() {
return Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build();
}

public void testPrimaryRelocationWhileIndexing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
setup() ;
client().admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.setSettings(indexSettings())
.setMapping("field", "type=text")
.get();
ensureGreen("test");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.recovery.IndexPrimaryRelocationIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
public class RemoteIndexPrimaryRelocationIT extends IndexPrimaryRelocationIT {

protected static final String REPOSITORY_NAME = "test-remore-store-repo";

protected Path absolutePath;


public void setup() {
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}


public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,35 +134,40 @@ public void testFailoverWhileIndexing() throws Exception {
shard_count = scaledRandomIntBetween(1, 5);
createIndex(indexName);
ensureGreen(indexName);
int docCount = 10;
AtomicInteger numAutoGenDocs = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
while (finished.get() == false && numAutoGenDocs.get() < 10 ) {
logger.info("Keeping indexing");
IndexResponse indexResponse = internalCluster().clusterManagerClient().prepareIndex(indexName).setId("id").setSource("field", numAutoGenDocs.get()).get();
while (finished.get() == false && numAutoGenDocs.get() < docCount ) {
IndexResponse indexResponse = internalCluster().clusterManagerClient().prepareIndex(indexName).setSource("field", numAutoGenDocs.get()).get();
if(indexResponse.status() == RestStatus.CREATED) {
numAutoGenDocs.incrementAndGet();
if (numAutoGenDocs.get() == docCount/2) {
if (random().nextInt(3) == 0) {
refresh(indexName);
} else if (random().nextInt(2) == 0) {
flush(indexName);
}
latch.countDown();
}
}
}
logger.info("Done indexing");
logger.debug("Done indexing");
});
indexingThread.start();
latch.await();

ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final int numShards = state.metadata().index(indexName).getNumberOfShards();
final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard();
final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId());
latch.countDown();

// stop the random data node, all remaining shards are promoted to primaries
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName()));
ensureYellowAndNoInitializingShards(indexName);
indexingThread.join();
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numAutoGenDocs.get());
refresh(indexName);
assertHitCount(client(internalCluster().getClusterManagerName()).prepareSearch(indexName).setSize(0).get(), numAutoGenDocs.get());
}
}

0 comments on commit 384374d

Please sign in to comment.