Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add allocation tests for searchable snapshot remote shards #5048

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Data node side change([#4204](https://github.com/opensearch-project/OpenSearch/pull/4204))
- on-boarding of tasks([#4542](https://github.com/opensearch-project/OpenSearch/pull/4542))
- Integs ([4588](https://github.com/opensearch-project/OpenSearch/pull/4588))

- Integration tests for searchable snapshots ([#5048](https://github.com/opensearch-project/OpenSearch/pull/5048))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,35 @@ private Settings.Builder chunkedRepositorySettings() {
return settings;
}

/**
* Tests a happy path scenario for searchable snapshots by creating 2 indices,
* taking a snapshot, restoring them as searchable snapshots.
* Ensures availability of sufficient data nodes and search capable nodes.
*/
public void testCreateSearchableSnapshot() throws Exception {
final int numReplicasIndex1 = randomIntBetween(1, 4);
final int numReplicasIndex2 = randomIntBetween(0, 2);
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex1, 100, "test-idx-1");
createIndexWithDocsAndEnsureGreen(numReplicasIndex2, 100, "test-idx-2");

takeSnapshot(client, "test-idx-1", "test-idx-2");
deleteIndicesAndEnsureGreen(client, "test-idx-1", "test-idx-2");

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
restoreSnapshotAndEnsureGreen(client);

assertDocCount("test-idx-1-copy", 100L);
assertDocCount("test-idx-2-copy", 100L);
assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy");
}

/**
* Tests a chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
*/
public void testCreateSearchableSnapshotWithChunks() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
Expand All @@ -76,153 +105,160 @@ public void testCreateSearchableSnapshotWithChunks() throws Exception {

Settings.Builder repositorySettings = chunkedRepositorySettings();

internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1);
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();
indexRandomDocs(indexName, 1000);
internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
takeSnapshot(client, repositorySettings, indexName);

createRepository("test-repo", "fs", repositorySettings);
logger.info("--> snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices(indexName)
.get();
MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
deleteIndicesAndEnsureGreen(client, indexName);
restoreSnapshotAndEnsureGreen(client);

assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged());
assertDocCount(restoredIndexName, 1000L);
}

internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();
/**
* Tests the functionality of remote shard allocation to
* ensure it can assign remote shards to a node with local shards given it has the
* search role capabilities.
*/
public void testSearchableSnapshotAllocationForLocalAndRemoteShardsOnSameNode() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

assertDocCount(restoredIndexName, 1000L);
internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 100, indexName);
takeSnapshot(client, indexName);

restoreSnapshotAndEnsureGreen(client);

assertDocCount(restoredIndexName, 100L);
assertDocCount(indexName, 100L);
}

public void testCreateSearchableSnapshot() throws Exception {
final int numReplicasIndex1 = randomIntBetween(1, 4);
final int numReplicasIndex2 = randomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
/**
* Tests the functionality of remote shard allocation to
* ensure it can handle node drops for failover scenarios and the cluster gets back to a healthy state when
* nodes with search capabilities are added back to the cluster.
*/
public void testSearchableSnapshotAllocationForFailoverAndRecovery() throws Exception {
final int numReplicasIndex = 1;
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final Client client = client();
createRepository("test-repo", "fs");
createIndex(
"test-idx-1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex1))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
createIndex(
"test-idx-2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex2))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();
indexRandomDocs("test-idx-1", 100);
indexRandomDocs("test-idx-2", 100);

logger.info("--> snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test-idx-1", "test-idx-2")
.get();
MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 100, indexName);

assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged());
takeSnapshot(client, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client);
assertDocCount(restoredIndexName, 100L);

logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();
logger.info("--> stop a random search node");
internalCluster().stopRandomSearchNode();
ensureYellow(restoredIndexName);
assertDocCount(restoredIndexName, 100L);

assertDocCount("test-idx-1-copy", 100L);
assertDocCount("test-idx-2-copy", 100L);
assertIndexDirectoryDoesNotExist("test-idx-1-copy", "test-idx-2-copy");
logger.info("--> stop the last search node");
internalCluster().stopRandomSearchNode();
ensureRed(restoredIndexName);

logger.info("--> add 3 new search nodes");
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 2);
ensureGreen(restoredIndexName);
assertDocCount(restoredIndexName, 100);

logger.info("--> stop a random search node");
internalCluster().stopRandomSearchNode();
ensureGreen(restoredIndexName);
assertDocCount(restoredIndexName, 100);
}

/**
* Tests the functionality of index write block on a searchable snapshot index.
*/
public void testSearchableSnapshotIndexIsReadOnly() throws Exception {
final String indexName = "test-index";
final String restoredIndexName = indexName + "-copy";
final Client client = client();
createRepository("test-repo", "fs");

createIndexWithDocsAndEnsureGreen(0, 100, indexName);
takeSnapshot(client, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

internalCluster().ensureAtLeastNumSearchNodes(1);
restoreSnapshotAndEnsureGreen(client);

assertIndexingBlocked(restoredIndexName);
assertIndexSettingChangeBlocked(restoredIndexName);
assertTrue(client.admin().indices().prepareDelete(restoredIndexName).get().isAcknowledged());
assertThrows(
"Expect index to not exist",
IndexNotFoundException.class,
() -> client.admin().indices().prepareGetIndex().setIndices(restoredIndexName).execute().actionGet()
);
}

private void createIndexWithDocsAndEnsureGreen(int numReplicasIndex, int numOfDocs, String indexName) throws InterruptedException {
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0").put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1").build()
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, Integer.toString(numReplicasIndex))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.build()
);
ensureGreen();

logger.info("--> snapshot");
indexRandomDocs(indexName, numOfDocs);
ensureGreen();
}

private void takeSnapshot(Client client, String... indices) {
takeSnapshot(client, null, indices);
}

private void takeSnapshot(Client client, Settings.Builder repositorySettings, String... indices) {
logger.info("--> Create a repository");
if (repositorySettings == null) {
createRepository("test-repo", "fs");
} else {
createRepository("test-repo", "fs", repositorySettings);
}
logger.info("--> Take a snapshot");
final CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices(indexName)
.setIndices(indices)
.get();

MatcherAssert.assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
MatcherAssert.assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);
}

assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged());
private void deleteIndicesAndEnsureGreen(Client client, String... indices) {
assertTrue(client.admin().indices().prepareDelete(indices).get().isAcknowledged());
ensureGreen();
}

private void restoreSnapshotAndEnsureGreen(Client client) {
logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)")
.setRenameReplacement("$1")
.setRenameReplacement("$1-copy")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
.setWaitForCompletion(true)
.execute()
.actionGet();
ensureGreen();

assertIndexingBlocked(indexName);
assertIndexSettingChangeBlocked(indexName);
assertTrue(client.admin().indices().prepareDelete(indexName).get().isAcknowledged());
assertThrows(
"Expect index to not exist",
IndexNotFoundException.class,
() -> client.admin().indices().prepareGetIndex().setIndices(indexName).execute().actionGet()
);
}

private void assertIndexingBlocked(String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.Randomness;
import org.opensearch.cluster.routing.RecoverySource;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down Expand Up @@ -57,6 +58,7 @@ public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) {
*/
@Override
void allocateUnassigned() {
unassignIgnoredRemoteShards(allocation);
if (routingNodes.unassigned().isEmpty()) {
logger.debug("No unassigned remote shards found.");
return;
Expand Down Expand Up @@ -273,7 +275,6 @@ MoveDecision decideRebalance(ShardRouting shardRouting) {
*/
public Map<String, UnassignedIndexShards> groupUnassignedShardsByIndex() {
HashMap<String, UnassignedIndexShards> unassignedShardMap = new HashMap<>();
unassignIgnoredRemoteShards(allocation);
for (ShardRouting shard : routingNodes.unassigned().drain()) {
String index = shard.getIndexName();
if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
Expand All @@ -298,7 +299,17 @@ private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) {
for (ShardRouting shard : unassignedShards.drainIgnored()) {
RoutingPool pool = RoutingPool.getShardPool(shard, routingAllocation);
if (pool == RoutingPool.REMOTE_CAPABLE && shard.unassigned() && (shard.primary() || !shard.unassignedInfo().isDelayed())) {
unassignedShards.add(shard);
ShardRouting unassignedShard = shard;
// Shard when moved to an unassigned state updates the recovery source to be ExistingStoreRecoverySource
// Remote shards do not have an existing store to recover from and can be recovered from an empty source
// to re-fetch any shard blocks from the repository.
if (shard.primary()) {
if (!RecoverySource.Type.SNAPSHOT.equals(shard.recoverySource().getType())) {
unassignedShard = shard.updateUnassigned(shard.unassignedInfo(), RecoverySource.EmptyStoreRecoverySource.INSTANCE);
}
}

unassignedShards.add(unassignedShard);
} else {
unassignedShards.ignoreShard(shard, shard.unassignedInfo().getLastAllocationStatus(), routingAllocation.changes());
}
Expand Down
Loading