Skip to content

Commit

Permalink
[Remote Store] Add support to restore only unassigned shards of an in…
Browse files Browse the repository at this point in the history
…dex (#8792)

* Add support to restore only unassigned shards of an index

---------

Signed-off-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale and Sachin Kale authored Jul 27, 2023
1 parent 99f28cb commit c25c175
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 78 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Remote Segment Store Repository setting moved from `index.remote_store.repository` to `index.remote_store.segment.repository` and `cluster.remote_store.repository` to `cluster.remote_store.segment.repository` respectively for Index and Cluster level settings ([#8719](https://github.com/opensearch-project/OpenSearch/pull/8719))
- [Remote Store] Add support to restore only unassigned shards of an index ([#8792](https://github.com/opensearch-project/OpenSearch/pull/8792))
- Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -37,6 +38,13 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final int REPLICA_COUNT = 1;
protected Path absolutePath;
protected Path absolutePath2;
private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5)
);

@Override
protected boolean addMockInternalEngine() {
Expand All @@ -59,7 +67,7 @@ public Settings indexSettings() {
IndexResponse indexSingleDoc(String indexName) {
return client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,17 @@ private void testRestoreWithMergeFlow(int numberOfIterations, boolean invokeFlus
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, flushAfterMerge, deletedDocs);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
}
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(INDEX_NAME);

if (deletedDocs == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.indices.recovery.RecoveryState;
Expand All @@ -34,15 +34,16 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
Expand All @@ -68,13 +69,6 @@ public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush, String index) {
long totalOperations = 0;
long refreshedOrFlushedOperations = 0;
Expand All @@ -93,7 +87,7 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush,
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = INDEX_NAME.equals(index) ? indexSingleDoc() : indexSingleDoc(index);
IndexResponse response = indexSingleDoc(index);
maxSeqNo = response.getSeqNo();
shardId = response.getShardId().id();
indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo);
Expand All @@ -109,12 +103,14 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush,
}

private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal, String indexName) {
// This is required to get updated number from already active shards which were not restored
refresh(indexName);
String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS;
String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED;
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity));
IndexResponse response = INDEX_NAME.equals(indexName) ? indexSingleDoc() : indexSingleDoc(indexName);
IndexResponse response = indexSingleDoc(indexName);
assertEquals(indexStats.get(maxSeqNoGranularity + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo());
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity) + 1);
Expand All @@ -130,6 +126,28 @@ private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, St
}
}

private void restore(String... indices) {
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
}

private void restoreAndVerify(int shardCount, int replicaCount, Map<String, Long> indexStats) {
restore(INDEX_NAME);
ensureGreen(INDEX_NAME);
// This is required to get updated number from already active shards which were not restored
assertEquals(shardCount * (1 + replicaCount), getNumShards(INDEX_NAME).totalNumShards);
assertEquals(replicaCount, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, true, INDEX_NAME);
}

/**
* Helper function to test restoring an index with no replication from remote store. Only primary node is dropped.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
Expand All @@ -144,23 +162,16 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int sh
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
verifyRestoredData(indexStats, true, INDEX_NAME);
restoreAndVerify(shardCount, 0, indexStats);
}

/**
* Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop.
* @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
* @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
* @throws IOException IO Exception.
*/
private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush, int shardCount)
throws IOException {
private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException {
prepareCluster(1, 2, INDEX_NAME, 1, shardCount);
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
Expand All @@ -170,14 +181,7 @@ private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int
ensureRed(INDEX_NAME);
internalCluster().startDataOnlyNodes(2);

assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);

assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
assertEquals(0, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, true, INDEX_NAME);
restoreAndVerify(shardCount, 1, indexStats);
}

/**
Expand Down Expand Up @@ -212,17 +216,54 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAMES_WILDCARD.split(",")).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices);
for (String index : indices) {
assertEquals(shardCount, getNumShards(index).totalNumShards);
verifyRestoredData(indicesStats.get(index), true, index);
}
}

public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture();
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true), future);
try {
future.get();
} catch (ExecutionException e) {
// If the request goes to co-ordinator, e.getCause() can be RemoteTransportException
assertTrue(e.getCause() instanceof IllegalStateException || e.getCause().getCause() instanceof IllegalStateException);
}
}

public void testRestoreFlowNoRedIndex() {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(false), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
verifyRestoredData(indexStats, true, INDEX_NAME);
}

/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
Expand Down Expand Up @@ -265,7 +306,7 @@ public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, 1, true, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(1, true, randomIntBetween(1, 5));
}

/**
Expand All @@ -274,7 +315,7 @@ public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOExce
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, 1, false, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(1, false, randomIntBetween(1, 5));
}

/**
Expand All @@ -284,7 +325,7 @@ public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOExc
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), false, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), false, randomIntBetween(1, 5));
}

/**
Expand All @@ -294,7 +335,7 @@ public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOExcepti
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), true, randomIntBetween(1, 5));
testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), true, randomIntBetween(1, 5));
}

/**
Expand Down Expand Up @@ -341,10 +382,7 @@ public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOExceptio
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices));
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(new String[] {}), PlainActionFuture.newFuture());
restore(indices);
ensureGreen(indices);

for (String index : indices) {
Expand Down Expand Up @@ -381,10 +419,16 @@ public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOExc
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indices[0], indices[1]), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(indices[0], indices[1]).restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices[0], indices[1]);
assertEquals(shardCount, getNumShards(indices[0]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]);
Expand Down Expand Up @@ -427,10 +471,16 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc
ensureRed(indices);
internalCluster().startDataOnlyNodes(3);

assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
assertAcked(client().admin().indices().prepareClose(indices[0], indices[1]));
}
client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*"), PlainActionFuture.newFuture());
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices("*", "-remote-store-test-index-*").restoreAllShards(restoreAllShards),
PlainActionFuture.newFuture()
);
ensureGreen(indices[0], indices[1]);
assertEquals(shardCount, getNumShards(indices[0]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]);
Expand Down Expand Up @@ -490,7 +540,7 @@ private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throw
assertEquals(0, recoverySource.get().getIndex().recoveredFileCount());
}

IndexResponse response = indexSingleDoc();
IndexResponse response = indexSingleDoc(INDEX_NAME);
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertBusy(
Expand Down
Loading

0 comments on commit c25c175

Please sign in to comment.