Skip to content

Commit

Permalink
Add allocation tests for remote shards
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Nov 4, 2022
1 parent cce0411 commit 8f2a00f
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 62 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Data node side change([#4204](https://github.com/opensearch-project/OpenSearch/pull/4204))
- on-boarding of tasks([#4542](https://github.com/opensearch-project/OpenSearch/pull/4542))
- Integs ([4588](https://github.com/opensearch-project/OpenSearch/pull/4588))

- Integration tests for searchable snapshots ([#5048](https://github.com/opensearch-project/OpenSearch/pull/5048))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,109 +60,167 @@ protected Settings.Builder randomRepositorySettings() {
return settings;
}

/**
* Tests a happy path scenario for searchable snapshots by creating 2 indices,
* taking a snapshot, restoring them as searchable snapshots.
* Ensures availability of sufficient data nodes and search capable nodes.
*/
public void testCreateSearchableSnapshot() throws Exception {
final int numReplicasIndex1 = randomIntBetween(1, 4);
final int numReplicasIndex2 = randomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
final Client client = client();
createRepository("test-repo", "fs");
createIndex(
"test-idx-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex1))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
createIndex(
"test-idx-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex2))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();
indexRandomDocs("test-idx-1", 100);
indexRandomDocs("test-idx-2", 100);

logger.info("--> snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test-idx-1", "test-idx-2")
.get();
MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex1, "test-idx-1");
createIndexWithDocsAndEnsureGreen(numReplicasIndex2, "test-idx-2");

assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged());
takeSnapshot(client, "test-idx-1", "test-idx-2");
deleteIndicesAndEnsureGreen(client, "test-idx-1", "test-idx-2");

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);

logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();
restoreSnapshotAndEnsureGreen(client);

assertDocCount("test-idx-1-copy", 100L);
assertDocCount("test-idx-2-copy", 100L);
assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy");
}

/**
* Tests the functionality of remote shard allocation to
* ensure it can assign remote shards to a node with local shards given it has the
* search role capabilities.
*/
public void testSearchableSnapshotAllocationForLocalAndRemoteShardsOnSameNode() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, indexName);
takeSnapshot(client, indexName);

restoreSnapshotAndEnsureGreen(client);

assertDocCount(restoredIndexName, 100L);
assertDocCount(indexName, 100L);
}

/**
* Tests the functionality of remote shard allocation to
* ensure it can handle node drops for failover scenarios and the cluster gets back to a healthy state when
* nodes with search capabilities are added back to the cluster.
*/
public void testSearchableSnapshotAllocationForFailoverAndRecovery() throws Exception {
final int numReplicasIndex = 1;
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, indexName);

takeSnapshot(client, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client);
assertDocCount(restoredIndexName, 100L);

logger.info("--> stop a random search node");
internalCluster().stopRandomSearchNode();
ensureYellow(restoredIndexName);
assertDocCount(restoredIndexName, 100L);

logger.info("--> stop the last search node");
internalCluster().stopRandomSearchNode();
ensureRed(restoredIndexName);

logger.info("--> add 3 new search nodes");
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 2);
ensureGreen(restoredIndexName);
assertDocCount(restoredIndexName, 100);

logger.info("--> stop a random search node");
internalCluster().stopRandomSearchNode();
ensureGreen(restoredIndexName);
assertDocCount(restoredIndexName, 100);
}

/**
* Tests the functionality of index write block on a searchable snapshot index.
*/
public void testSearchableSnapshotIndexIsReadOnly() throws Exception {
final String indexName = "test-index";
final String restoredIndexName = indexName + "-copy";
final Client client = client();
createRepository("test-repo", "fs");

createIndexWithDocsAndEnsureGreen(0, indexName);
takeSnapshot(client, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

internalCluster().ensureAtLeastNumSearchNodes(1);
restoreSnapshotAndEnsureGreen(client);

assertIndexingBlocked(restoredIndexName);
assertIndexSettingChangeBlocked(restoredIndexName);
assertTrue(client.admin().indices().prepareDelete(restoredIndexName).get().isAcknowledged());
assertThrows(
"Expect index to not exist",
IndexNotFoundException.class,
() -> client.admin().indices().prepareGetIndex().setIndices(restoredIndexName).execute().actionGet()
);
}

private void createIndexWithDocsAndEnsureGreen(int numReplicasIndex, String indexName) throws InterruptedException {
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();

logger.info("--> snapshot");
indexRandomDocs(indexName, 100);
ensureGreen();
}

private void takeSnapshot(Client client, String... indices) {
logger.info("--> Create a repository");
createRepository("test-repo", "fs");
logger.info("--> Take a snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices(indexName)
.setIndices(indices)
.get();

MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
}

assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged());
private void deleteIndicesAndEnsureGreen(Client client, String... indices) {
assertTrue(client.admin().indices().prepareDelete(indices).get().isAcknowledged());
ensureGreen();
}

private void restoreSnapshotAndEnsureGreen(Client client) {
logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();

assertIndexingBlocked(indexName);
assertIndexSettingChangeBlocked(indexName);
assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged());
assertThrows(
"Expect index to not exist",
IndexNotFoundException.class,
() -> client.admin().indices().prepareGetIndex().setIndices(indexName).execute().actionGet()
);
}

private void assertIndexingBlocked(String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.Randomness;
import org.opensearch.cluster.routing.RecoverySource;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,6 +58,7 @@ public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) {
*/
@Override
void allocateUnassigned() {
unassignIgnoredRemoteShards(allocation);
if (routingNodes.unassigned().isEmpty()) {
logger.debug("No unassigned remote shards found.");
return;
Expand Down Expand Up @@ -273,7 +275,6 @@ MoveDecision decideRebalance(ShardRouting shardRouting) {
*/
public Map<String, UnassignedIndexShards> groupUnassignedShardsByIndex() {
HashMap<String, UnassignedIndexShards> unassignedShardMap = new HashMap<>();
unassignIgnoredRemoteShards(allocation);
for (ShardRouting shard : routingNodes.unassigned().drain()) {
String index = shard.getIndexName();
if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
Expand All @@ -298,7 +299,17 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
for (ShardRouting shard : unassignedShards.drainIgnored()) {
RoutingPool pool = RoutingPool.getShardPool(shard, routingAllocation);
if (pool == RoutingPool.REMOTE_CAPABLE && shard.unassigned() && (shard.primary() || !shard.unassignedInfo().isDelayed())) {
unassignedShards.add(shard);
ShardRouting unassignedShard = shard;
// Shard when moved to an unassigned state updates the recovery source to be ExistingStoreRecoverySource
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
// to re-fetch any shard blocks from the repository.
if (shard.primary()) {
if (!RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType())) {
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
}
}

unassignedShards.add(unassignedShard);
} else {
unassignedShards.ignoreShard(shard, shard.unassignedInfo().getLastAllocationStatus(), routingAllocation.changes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
import static org.opensearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
import static org.opensearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
import static org.opensearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE;
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
import static org.opensearch.test.NodeRoles.dataOnlyNode;
Expand Down Expand Up @@ -204,6 +205,11 @@ public final class InternalTestCluster extends TestCluster {
DiscoveryNodeRole.SEARCH_ROLE
);

private static final Predicate<NodeAndClient> SEARCH_AND_DATA_NODE_PREDICATE = nodeAndClient -> DiscoveryNode.hasRole(
nodeAndClient.node.settings(),
DiscoveryNodeRole.SEARCH_ROLE
) && DiscoveryNode.isDataNode(nodeAndClient.node.settings());

private static final Predicate<NodeAndClient> NO_DATA_NO_CLUSTER_MANAGER_PREDICATE = nodeAndClient -> DiscoveryNode
.isClusterManagerNode(nodeAndClient.node.settings()) == false
&& DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false;
Expand Down Expand Up @@ -687,6 +693,27 @@ public synchronized void ensureAtLeastNumSearchNodes(int n) {
}
}

/**
* Ensures that at least <code>n</code> data-search nodes are present in the cluster.
* if more nodes than <code>n</code> are present this method will not
* stop any of the running nodes.
*/
public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) {
int size = numSearchAndDataNodes();
if (size < n) {
logger.info("increasing cluster size from {} to {}", size, n);
if (numSharedDedicatedClusterManagerNodes > 0) {
startDataAndSearchNodes(n - size);
} else {
Set<DiscoveryNodeRole> searchAndDataRoles = new HashSet<>();
searchAndDataRoles.add(DiscoveryNodeRole.DATA_ROLE);
searchAndDataRoles.add(DiscoveryNodeRole.SEARCH_ROLE);
startNodes(n - size, Settings.builder().put(onlyRoles(Settings.EMPTY, searchAndDataRoles)).build());
}
validateClusterFormed();
}
}

/**
* Ensures that at most <code>n</code> are up and running.
* If less nodes that <code>n</code> are running this method
Expand Down Expand Up @@ -1719,6 +1746,20 @@ public InetSocketAddress[] httpAddresses() {
return addresses.toArray(new InetSocketAddress[addresses.size()]);
}

/**
* Stops a random search node in the cluster. Returns true if a node was found to stop, false otherwise.
*/
public synchronized boolean stopRandomSearchNode() throws IOException {
ensureOpen();
NodeAndClient nodeAndClient = getRandomNodeAndClient(SEARCH_NODE_PREDICATE);
if (nodeAndClient != null) {
logger.info("Closing random node [{}] ", nodeAndClient.name);
stopNodesAndClient(nodeAndClient);
return true;
}
return false;
}

/**
* Stops a random data node in the cluster. Returns true if a node was found to stop, false otherwise.
*/
Expand Down Expand Up @@ -2306,6 +2347,17 @@ public List<String> startMasterOnlyNodes(int numNodes, Settings settings) {
return startClusterManagerOnlyNodes(numNodes, settings);
}

public List<String> startDataAndSearchNodes(int numNodes) {
return startDataAndSearchNodes(numNodes, Settings.EMPTY);
}

public List<String> startDataAndSearchNodes(int numNodes, Settings settings) {
Set<DiscoveryNodeRole> searchAndDataRoles = new HashSet<>();
searchAndDataRoles.add(DiscoveryNodeRole.DATA_ROLE);
searchAndDataRoles.add(DiscoveryNodeRole.SEARCH_ROLE);
return startNodes(numNodes, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build());
}

public List<String> startDataOnlyNodes(int numNodes) {
return startDataOnlyNodes(numNodes, Settings.EMPTY);
}
Expand Down Expand Up @@ -2382,6 +2434,10 @@ public int numSearchNodes() {
return searchNodeAndClients().size();
}

public int numSearchAndDataNodes() {
return searchDataNodeAndClients().size();
}

@Override
public int numDataAndClusterManagerNodes() {
return filterNodes(nodes, DATA_NODE_PREDICATE.or(CLUSTER_MANAGER_NODE_PREDICATE)).size();
Expand Down Expand Up @@ -2445,6 +2501,10 @@ private Collection<NodeAndClient> searchNodeAndClients() {
return filterNodes(nodes, SEARCH_NODE_PREDICATE);
}

private Collection<NodeAndClient> searchDataNodeAndClients() {
return filterNodes(nodes, SEARCH_AND_DATA_NODE_PREDICATE);
}

private static Collection<NodeAndClient> filterNodes(
Map<String, InternalTestCluster.NodeAndClient> map,
Predicate<NodeAndClient> predicate
Expand Down
Loading

0 comments on commit 8f2a00f

Please sign in to comment.