Skip to content

Commit

Permalink
[Remote Store] Fix sleep time bug during remote store sync (opensear…
Browse files Browse the repository at this point in the history
…ch-project#14342)

* [Remote Store] Fix sleep time bug during remote store sync (opensearch-project#14037)

---------

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>

* Fix remote migration ITs

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>

---------

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna authored Jun 14, 2024
1 parent 79405ed commit 5f2bff3
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@
package org.opensearch.remotemigration;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.common.Priority;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand All @@ -39,6 +45,7 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
Expand Down Expand Up @@ -114,6 +121,10 @@ public void initDocRepToRemoteMigration() {
);
}

public ClusterHealthStatus ensureGreen(String... indices) {
return ensureGreen(TimeValue.timeValueSeconds(60), indices);
}

public BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -181,14 +192,12 @@ private Thread getIndexingThread() {
long currentDocCount = indexedDocs.incrementAndGet();
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
if (rarely()) {
logger.info("--> [iteration {}] flushing index", currentDocCount);
client().admin().indices().prepareFlush(indexName).get();
logger.info("Completed ingestion of {} docs. Flushing now", currentDocCount);
} else {
logger.info("--> [iteration {}] refreshing index", currentDocCount);
client().admin().indices().prepareRefresh(indexName).get();
}
}
logger.info("Completed ingestion of {} docs", currentDocCount);
}
});
}
Expand Down Expand Up @@ -218,4 +227,38 @@ public void stopShardRebalancing() {
.get()
);
}

public ClusterHealthStatus waitForRelocation() {
ClusterHealthRequest request = Requests.clusterHealthRequest()
.waitForNoRelocatingShards(true)
.timeout(TimeValue.timeValueSeconds(60))
.waitForEvents(Priority.LANGUID);
ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet();
if (actionGet.isTimedOut()) {
logger.info(
"waitForRelocation timed out, cluster state:\n{}\n{}",
client().admin().cluster().prepareState().get().getState(),
client().admin().cluster().preparePendingClusterTasks().get()
);
assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
}
return actionGet.getStatus();
}

public ClusterHealthStatus waitForRelocation(TimeValue t) {
ClusterHealthRequest request = Requests.clusterHealthRequest()
.waitForNoRelocatingShards(true)
.timeout(t)
.waitForEvents(Priority.LANGUID);
ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet();
if (actionGet.isTimedOut()) {
logger.info(
"waitForRelocation timed out, cluster state:\n{}\n{}",
client().admin().cluster().prepareState().get().getState(),
client().admin().cluster().preparePendingClusterTasks().get()
);
assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
}
return actionGet.getStatus();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,7 @@ public void testRemotePrimaryRelocation() throws Exception {
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
waitForRelocation();
assertEquals(remoteNode, primaryNodeName("test"));
logger.info("--> relocation from docrep to remote complete");

Expand All @@ -123,16 +114,7 @@ public void testRemotePrimaryRelocation() throws Exception {
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
waitForRelocation();
assertEquals(remoteNode2, primaryNodeName("test"));

logger.info("--> relocation from remote to remote complete");
Expand All @@ -155,7 +137,6 @@ public void testRemotePrimaryRelocation() throws Exception {

public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -83,16 +80,8 @@ public void testReplicaRecovery() throws Exception {
.add(new MoveAllocationCommand("test", 0, primaryNode, remoteNode))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
waitForRelocation();
logger.info("--> relocation of primary from docrep to remote complete");

logger.info("--> getting up the new replicas now to doc rep node as well as remote node ");
Expand All @@ -109,17 +98,7 @@ public void testReplicaRecovery() throws Exception {
)
.get();

client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.execute()
.actionGet();
logger.info("--> replica is up now on another docrep now as well as remote node");

assertEquals(0, clusterHealthResponse.getRelocatingShards());
waitForRelocation();
asyncIndexingService.stopIndexing();
refresh("test");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
Expand All @@ -28,6 +26,7 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -48,6 +47,10 @@ protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}

protected int maximumNumberOfShards() {
return 5;
}

public void testMixedModeAddRemoteNodes() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Expand Down Expand Up @@ -155,7 +158,11 @@ public void testEndToEndRemoteMigration() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> docRepNodes = internalCluster().startNodes(2);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
updateSettingsRequest.persistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), maximumNumberOfShards())
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");
Expand Down Expand Up @@ -189,16 +196,7 @@ public void testEndToEndRemoteMigration() throws Exception {
)
.get()
);

ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(45))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();
assertTrue(clusterHealthResponse.getRelocatingShards() == 0);
waitForRelocation(TimeValue.timeValueSeconds(90));
logger.info("---> Stopping indexing thread");
asyncIndexingService.stopIndexing();
Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ public void waitForRemoteStoreSync(Runnable onProgress) throws IOException {
segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size();
}
try {
Thread.sleep(TimeValue.timeValueSeconds(30).seconds());
Thread.sleep(TimeValue.timeValueSeconds(30).millis());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
Expand Down

0 comments on commit 5f2bff3

Please sign in to comment.