diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java new file mode 100644 index 0000000000000..ad3e99dd274ce --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { + static final String INDEX_NAME = "remote-store-test-idx-1"; + static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2"; + static final String INDEX_NAMES_WILDCARD = "test-remote-store-*,remote-store-test-index-*"; + static final String TOTAL_OPERATIONS = "total-operations"; + static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations"; + static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; + + @Override + public Settings indexSettings() { + return remoteStoreIndexSettings(0); + } + + public Settings indexSettings(int shards, int replicas) { + return remoteStoreIndexSettings(replicas, shards); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + + protected void restore(String... indices) { + boolean restoreAllShards = randomBoolean(); + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(indices)); + } + client().admin() + .cluster() + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); + } + + protected void verifyRestoredData(Map indexStats, String indexName) throws Exception { + ensureYellowAndNoInitializingShards(indexName); + ensureGreen(indexName); + // This is to ensure that shards that were already assigned will get latest count + refresh(indexName); + assertBusy( + () -> assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)), + 30, + TimeUnit.SECONDS + ); + IndexResponse response = indexSingleDoc(indexName); + if (indexStats.containsKey(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id())) { + assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo()); + } + refresh(indexName); + assertBusy( + () -> assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1), + 30, + TimeUnit.SECONDS + ); + } + + public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { + prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY); + } + + public void prepareCluster( + int numClusterManagerNodes, + int numDataOnlyNodes, + String indices, + int replicaCount, + int shardCount, + Settings settings + ) { + prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings); + for (String index : indices.split(",")) { + createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + ensureYellowAndNoInitializingShards(index); + ensureGreen(index); + } + } + + public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings); + internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java new file mode 100644 index 0000000000000..cbead232e1262 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreClusterStateRestoreIT.java @@ -0,0 +1,293 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING; +import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; +import static org.opensearch.indices.ShardLimitValidator.SETTING_MAX_SHARDS_PER_CLUSTER_KEY; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .put(REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), REPOSITORY_NAME) + .build(); + } + + private void addNewNodes(int dataNodeCount, int clusterManagerNodeCount) { + internalCluster().startNodes(dataNodeCount + clusterManagerNodeCount); + } + + private Map initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) { + prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount); + Map indexStats = indexData(1, false, INDEX_NAME); + assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards); + ensureGreen(INDEX_NAME); + return indexStats; + } + + private void resetCluster(int dataNodeCount, int clusterManagerNodeCount) { + internalCluster().stopAllNodes(); + addNewNodes(dataNodeCount, clusterManagerNodeCount); + } + + private void restoreAndValidate(String clusterUUID, Map indexStats) throws Exception { + restoreAndValidate(clusterUUID, indexStats, true); + } + + private void restoreAndValidate(String clusterUUID, Map indexStats, boolean validate) throws Exception { + // TODO once auto restore is merged, the remote cluster state will be restored + + if (validate) { + // Step - 4 validation restore is successful. + ensureGreen(INDEX_NAME); + verifyRestoredData(indexStats, INDEX_NAME); + } + } + + private void restoreAndValidateFails( + String clusterUUID, + PlainActionFuture actionListener, + Class clazz, + String errorSubString + ) { + + try { + restoreAndValidate(clusterUUID, null, false); + } catch (Exception e) { + assertTrue( + String.format(Locale.ROOT, "%s %s", clazz, e), + clazz.isAssignableFrom(e.getClass()) + || clazz.isAssignableFrom(e.getCause().getClass()) + || (e.getCause().getCause() != null && clazz.isAssignableFrom(e.getCause().getCause().getClass())) + ); + assertTrue( + String.format(Locale.ROOT, "Error message mismatch. Expected: [%s]. Actual: [%s]", errorSubString, e.getMessage()), + e.getMessage().contains(errorSubString) + ); + } + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9834") + public void testFullClusterRestore() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + // Step - 1 index some data to generate files in remote directory + Map indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1); + String prevClusterUUID = clusterService().state().metadata().clusterUUID(); + + // Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata + resetCluster(dataNodeCount, clusterManagerNodeCount); + + String newClusterUUID = clusterService().state().metadata().clusterUUID(); + assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same"; + + // Step - 3 Trigger full cluster restore and validate + restoreAndValidate(prevClusterUUID, indexStats); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9834") + public void testFullClusterRestoreMultipleIndices() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + // Step - 1 index some data to generate files in remote directory + Map indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + + String secondIndexName = INDEX_NAME + "-2"; + createIndex(secondIndexName, remoteStoreIndexSettings(replicaCount, shardCount + 1)); + Map indexStats2 = indexData(1, false, secondIndexName); + assertEquals((shardCount + 1) * (replicaCount + 1), getNumShards(secondIndexName).totalNumShards); + ensureGreen(secondIndexName); + + String prevClusterUUID = clusterService().state().metadata().clusterUUID(); + + // Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata + resetCluster(dataNodeCount, clusterManagerNodeCount); + + String newClusterUUID = clusterService().state().metadata().clusterUUID(); + assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same"; + + // Step - 3 Trigger full cluster restore + restoreAndValidate(prevClusterUUID, indexStats); + ensureGreen(secondIndexName); + verifyRestoredData(indexStats2, secondIndexName); + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9834") + public void testFullClusterRestoreFailureValidationFailures() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + // index some data to generate files in remote directory + Map indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + String prevClusterUUID = clusterService().state().metadata().clusterUUID(); + + // Start of Test - 1 + // Test - 1 Trigger full cluster restore and validate it fails due to incorrect cluster UUID + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreAndValidateFails("randomUUID", future, IllegalStateException.class, "Remote Cluster State not found - randomUUID"); + // End of Test - 1 + + // Start of Test - 3 + // Test - 2 Trigger full cluster restore and validate it fails due to cluster UUID same as current cluster UUID + future = PlainActionFuture.newFuture(); + restoreAndValidateFails( + clusterService().state().metadata().clusterUUID(), + future, + IllegalArgumentException.class, + "clusterUUID to restore from should be different from current cluster UUID" + ); + // End of Test - 2 + + // Start of Test - 3 + // Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata + // Restarting cluster with just 1 data node helps with applying cluster settings + resetCluster(1, clusterManagerNodeCount); + String newClusterUUID = clusterService().state().metadata().clusterUUID(); + assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same"; + + reduceShardLimits(1, 1); + + // Step - 4 Trigger full cluster restore and validate it fails + future = PlainActionFuture.newFuture(); + restoreAndValidateFails( + prevClusterUUID, + future, + IllegalArgumentException.class, + "this action would add [2] total shards, but this cluster currently has [0]/[1] maximum shards open" + ); + resetShardLimits(); + // End of Test - 3 + + // Start of Test - 4 + // Test -4 Reset cluster and trigger full restore with same name index in the cluster + // Test -4 Add required nodes for this test after last reset. + addNewNodes(dataNodeCount - 1, 0); + + newClusterUUID = clusterService().state().metadata().clusterUUID(); + assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same"; + + // Test -4 Step - 2 Create a new index with same name + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 1)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + future = PlainActionFuture.newFuture(); + + // Test -4 Step - 3 Trigger full cluster restore and validate fails + restoreAndValidateFails( + prevClusterUUID, + future, + IllegalStateException.class, + "cannot restore index [remote-store-test-idx-1] because an open index with same name/uuid already exists in the cluster" + ); + + // Test -4 Step - 4 validation restore is successful. + ensureGreen(INDEX_NAME); + // End of Test - 4 + } + + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9834") + public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathThrowsException() throws Exception { + int shardCount = randomIntBetween(1, 2); + int replicaCount = 1; + int dataNodeCount = shardCount * (replicaCount + 1); + int clusterManagerNodeCount = 1; + + // Step - 1 index some data to generate files in remote directory + initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount); + + String prevClusterUUID = clusterService().state().metadata().clusterUUID(); + + // Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata + resetCluster(dataNodeCount, clusterManagerNodeCount); + + String newClusterUUID = clusterService().state().metadata().clusterUUID(); + assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same"; + + // Step - 4 Delete index metadata file in remote + try { + Files.move( + segmentRepoPath.resolve( + RemoteClusterStateService.encodeString(clusterService().state().getClusterName().value()) + + "/cluster-state/" + + prevClusterUUID + + "/index" + ), + segmentRepoPath.resolve("cluster-state/") + ); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Step - 5 Trigger full cluster restore and validate fails + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreAndValidateFails(prevClusterUUID, future, IllegalStateException.class, "asdsa"); + } + + private void reduceShardLimits(int maxShardsPerNode, int maxShardsPerCluster) { + // Step 3 - Reduce shard limits to hit shard limit with less no of shards + try { + client().admin() + .cluster() + .updateSettings( + new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder() + .put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), maxShardsPerNode) + .put(SETTING_MAX_SHARDS_PER_CLUSTER_KEY, maxShardsPerCluster) + ) + ) + .get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private void resetShardLimits() { + // Step - 5 Reset the cluster settings + ClusterUpdateSettingsRequest resetRequest = new ClusterUpdateSettingsRequest(); + resetRequest.transientSettings( + Settings.builder().putNull(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()).putNull(SETTING_MAX_SHARDS_PER_CLUSTER_KEY) + ); + + try { + client().admin().cluster().updateSettings(resetRequest).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index b84b9e38c63a3..489f4c52d4298 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -10,20 +10,14 @@ import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; -import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.health.ClusterHealthStatus; -import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.test.transport.MockTransportService; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -33,79 +27,8 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.greaterThan; -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { - private static final String INDEX_NAME = "remote-store-test-idx-1"; - private static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2"; - private static final String INDEX_NAMES_WILDCARD = "test-remote-store-*,remote-store-test-index-*"; - private static final String TOTAL_OPERATIONS = "total-operations"; - private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total"; - - @Override - public Settings indexSettings() { - return remoteStoreIndexSettings(0); - } - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); - } - - private void restore(String... indices) { - boolean restoreAllShards = randomBoolean(); - if (restoreAllShards) { - assertAcked(client().admin().indices().prepareClose(indices)); - } - client().admin() - .cluster() - .restoreRemoteStore( - new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), - PlainActionFuture.newFuture() - ); - } - - private void verifyRestoredData(Map indexStats, String indexName) throws Exception { - ensureYellowAndNoInitializingShards(indexName); - ensureGreen(indexName); - // This is to ensure that shards that were already assigned will get latest count - refresh(indexName); - assertBusy( - () -> assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)), - 30, - TimeUnit.SECONDS - ); - IndexResponse response = indexSingleDoc(indexName); - if (indexStats.containsKey(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id())) { - assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo()); - } - refresh(indexName); - assertBusy( - () -> assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1), - 30, - TimeUnit.SECONDS - ); - } - - private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { - prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY); - } - - private void prepareCluster( - int numClusterManagerNodes, - int numDataOnlyNodes, - String indices, - int replicaCount, - int shardCount, - Settings clusterSettings - ) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, clusterSettings); - internalCluster().startDataOnlyNodes(numDataOnlyNodes, clusterSettings); - for (String index : indices.split(",")) { - createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); - ensureYellowAndNoInitializingShards(index); - ensureGreen(index); - } - } +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) +public class RemoteStoreRestoreIT extends BaseRemoteStoreRestoreIT { /** * Simulates all data restored using Remote Translog Store. diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index fd70d319780c8..b12698c8a320e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -454,7 +454,7 @@ public Builder initializeAsRemoteStoreRestore( IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource, Map indexShardRoutingTableMap, - boolean restoreAllShards + boolean forceRecoverAllPrimaries ) { final UnassignedInfo unassignedInfo = new UnassignedInfo( UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, @@ -466,21 +466,20 @@ public Builder initializeAsRemoteStoreRestore( } for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) { ShardId shardId = new ShardId(index, shardNumber); - if (indexShardRoutingTableMap.containsKey(shardId) == false) { + if (forceRecoverAllPrimaries == false && indexShardRoutingTableMap.containsKey(shardId) == false) { throw new IllegalStateException("IndexShardRoutingTable is not present for shardId: " + shardId); } IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingTableMap.get(shardId); - if (restoreAllShards || indexShardRoutingTable.primaryShard().unassigned()) { + if (forceRecoverAllPrimaries || indexShardRoutingTable == null || indexShardRoutingTable.primaryShard().unassigned()) { // Primary shard to be recovered from remote store. indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo)); // All the replica shards to be recovered from peer recovery. - indexShardRoutingTable.replicaShards() - .forEach( - shardRouting -> indexShardRoutingBuilder.addShard( - ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo) - ) + for (int replicaNumber = 0; replicaNumber < indexMetadata.getNumberOfReplicas(); replicaNumber++) { + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo) ); + } } else { // Primary is either active or initializing. Do not trigger restore. indexShardRoutingBuilder.addShard(indexShardRoutingTable.primaryShard()); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index bcfc324b202b9..2b56163f852e8 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -576,10 +576,10 @@ public Builder addAsRemoteStoreRestore( IndexMetadata indexMetadata, RemoteStoreRecoverySource recoverySource, Map indexShardRoutingTableMap, - boolean restoreAllShards + boolean forceRecoveryPrimary ) { IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex()) - .initializeAsRemoteStoreRestore(indexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards); + .initializeAsRemoteStoreRestore(indexMetadata, recoverySource, indexShardRoutingTableMap, forceRecoveryPrimary); add(indexRoutingBuilder); return this; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 4697f8f83fe20..2c38bec4802d7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -425,7 +425,7 @@ private BlobContainer indexMetadataContainer(String clusterName, String clusterU return blobStoreRepository.blobStore() .blobContainer( blobStoreRepository.basePath() - .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) + .add(encodeString(clusterName)) .add("cluster-state") .add(clusterUUID) .add("index") @@ -437,11 +437,7 @@ private BlobContainer manifestContainer(String clusterName, String clusterUUID) // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest return blobStoreRepository.blobStore() .blobContainer( - blobStoreRepository.basePath() - .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8))) - .add("cluster-state") - .add(clusterUUID) - .add("manifest") + blobStoreRepository.basePath().add(encodeString(clusterName)).add("cluster-state").add(clusterUUID).add("manifest") ); } @@ -471,6 +467,7 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) { * @return {@code Map} latest IndexUUID to IndexMetadata map */ public Map getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException { + ensureRepositorySet(); Map remoteIndexMetadata = new HashMap<>(); ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID); assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID()) @@ -491,9 +488,10 @@ public Map getLatestIndexMetadata(String clusterName, Str */ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, UploadedIndexMetadata uploadedIndexMetadata) { try { + String[] splitPath = uploadedIndexMetadata.getUploadedFilename().split("/"); return INDEX_METADATA_FORMAT.read( indexMetadataContainer(clusterName, clusterUUID, uploadedIndexMetadata.getIndexUUID()), - uploadedIndexMetadata.getUploadedFilename(), + splitPath[splitPath.length - 1], blobStoreRepository.getNamedXContentRegistry() ); } catch (IOException e) { @@ -562,6 +560,10 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste } } + public static String encodeString(String content) { + return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); + } + /** * Exception for IndexMetadata transfer failures to remote */ diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java index 9fdd2ff9f759d..5cdff14cae360 100644 --- a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -10,28 +10,40 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.Version; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.MetadataCreateIndexService; +import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.indices.ShardLimitValidator; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.RestoreInfo; import org.opensearch.snapshots.RestoreService; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -49,87 +61,47 @@ public class RemoteStoreRestoreService { private final AllocationService allocationService; - public RemoteStoreRestoreService(ClusterService clusterService, AllocationService allocationService) { + private final MetadataCreateIndexService createIndexService; + + private final MetadataIndexUpgradeService metadataIndexUpgradeService; + + private final ShardLimitValidator shardLimitValidator; + + private final RemoteClusterStateService remoteClusterStateService; + + public RemoteStoreRestoreService( + ClusterService clusterService, + AllocationService allocationService, + MetadataCreateIndexService createIndexService, + MetadataIndexUpgradeService metadataIndexUpgradeService, + ShardLimitValidator shardLimitValidator, + RemoteClusterStateService remoteClusterStateService + ) { this.clusterService = clusterService; this.allocationService = allocationService; + this.createIndexService = createIndexService; + this.metadataIndexUpgradeService = metadataIndexUpgradeService; + this.shardLimitValidator = shardLimitValidator; + this.remoteClusterStateService = remoteClusterStateService; } + /** + * Restores data from remote store for indices specified in the restore request. + * + * @param request restore request + * @param listener restore listener + */ public void restore(RestoreRemoteStoreRequest request, final ActionListener listener) { clusterService.submitStateUpdateTask("restore[remote_store]", new ClusterStateUpdateTask() { - final String restoreUUID = UUIDs.randomBase64UUID(); + String restoreUUID; RestoreInfo restoreInfo = null; @Override public ClusterState execute(ClusterState currentState) { - // Updating cluster state - ClusterState.Builder builder = ClusterState.builder(currentState); - Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); - ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); - - List indicesToBeRestored = new ArrayList<>(); - int totalShards = 0; - for (String index : request.indices()) { - IndexMetadata currentIndexMetadata = currentState.metadata().index(index); - if (currentIndexMetadata == null) { - // ToDo: Handle index metadata does not exist case. (GitHub #3457) - logger.warn("Remote store restore is not supported for non-existent index. Skipping: {}", index); - continue; - } - if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) { - IndexMetadata updatedIndexMetadata = currentIndexMetadata; - if (request.restoreAllShards()) { - if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) { - throw new IllegalStateException( - "cannot restore index [" - + index - + "] because an open index " - + "with same name already exists in the cluster. Close the existing index" - ); - } - updatedIndexMetadata = IndexMetadata.builder(currentIndexMetadata) - .state(IndexMetadata.State.OPEN) - .version(1 + currentIndexMetadata.getVersion()) - .mappingVersion(1 + currentIndexMetadata.getMappingVersion()) - .settingsVersion(1 + currentIndexMetadata.getSettingsVersion()) - .aliasesVersion(1 + currentIndexMetadata.getAliasesVersion()) - .build(); - } - - Map indexShardRoutingTableMap = currentState.routingTable() - .index(index) - .shards() - .values() - .stream() - .collect(Collectors.toMap(IndexShardRoutingTable::shardId, Function.identity())); - - IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID()); - - RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource( - restoreUUID, - updatedIndexMetadata.getCreationVersion(), - indexId - ); - rtBuilder.addAsRemoteStoreRestore( - updatedIndexMetadata, - recoverySource, - indexShardRoutingTableMap, - request.restoreAllShards() - ); - blocks.updateBlocks(updatedIndexMetadata); - mdBuilder.put(updatedIndexMetadata, true); - indicesToBeRestored.add(index); - totalShards += updatedIndexMetadata.getNumberOfShards(); - } else { - logger.warn("Remote store is not enabled for index: {}", index); - } - } - - restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards); - - RoutingTable rt = rtBuilder.build(); - ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build(); - return allocationService.reroute(updatedState, "restored from remote store"); + RemoteRestoreResult remoteRestoreResult = restore(currentState, null, request.restoreAllShards(), request.indices()); + restoreUUID = remoteRestoreResult.getRestoreUUID(); + restoreInfo = remoteRestoreResult.getRestoreInfo(); + return remoteRestoreResult.getClusterState(); } @Override @@ -140,7 +112,7 @@ public void onFailure(String source, Exception e) { @Override public TimeValue timeout() { - return request.masterNodeTimeout(); + return request.clusterManagerNodeTimeout(); } @Override @@ -148,6 +120,227 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS listener.onResponse(new RestoreService.RestoreCompletionResponse(restoreUUID, null, restoreInfo)); } }); + } + + /** + * Executes remote restore + * @param currentState current cluster state + * @param restoreClusterUUID cluster UUID used to restore IndexMetadata + * @param restoreAllShards indicates if all shards of the index needs to be restored. This flag is ignored if remoteClusterUUID is provided + * @param indexNames list of indices to restore. This list is ignored if remoteClusterUUID is provided + * @return remote restore result + */ + public RemoteRestoreResult restore( + ClusterState currentState, + @Nullable String restoreClusterUUID, + boolean restoreAllShards, + String[] indexNames + ) { + Map> indexMetadataMap = new HashMap<>(); + boolean metadataFromRemoteStore = (restoreClusterUUID == null + || restoreClusterUUID.isEmpty() + || restoreClusterUUID.isBlank()) == false; + if (metadataFromRemoteStore) { + try { + remoteClusterStateService.getLatestIndexMetadata(currentState.getClusterName().value(), restoreClusterUUID) + .values() + .forEach(indexMetadata -> { + indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata)); + }); + } catch (IOException e) { + throw new IllegalStateException("Unable to restore remote index metadata", e); + } + } else { + for (String indexName : indexNames) { + IndexMetadata indexMetadata = currentState.metadata().index(indexName); + if (indexMetadata == null) { + logger.warn("Index restore is not supported for non-existent index. Skipping: {}", indexName); + } else { + indexMetadataMap.put(indexName, new Tuple<>(false, indexMetadata)); + } + } + } + validate(currentState, indexMetadataMap, restoreClusterUUID, restoreAllShards); + return executeRestore(currentState, indexMetadataMap, restoreAllShards); + } + + /** + * Executes remote restore + * @param currentState current cluster state + * @param indexMetadataMap map of index metadata to restore + * @param restoreAllShards indicates if all shards of the index needs to be restored + * @return remote restore result + */ + private RemoteRestoreResult executeRestore( + ClusterState currentState, + Map> indexMetadataMap, + boolean restoreAllShards + ) { + final String restoreUUID = UUIDs.randomBase64UUID(); + List indicesToBeRestored = new ArrayList<>(); + int totalShards = 0; + ClusterState.Builder builder = ClusterState.builder(currentState); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); + for (Map.Entry> indexMetadataEntry : indexMetadataMap.entrySet()) { + String indexName = indexMetadataEntry.getKey(); + IndexMetadata indexMetadata = indexMetadataEntry.getValue().v2(); + boolean metadataFromRemoteStore = indexMetadataEntry.getValue().v1(); + IndexMetadata updatedIndexMetadata = indexMetadata; + if (restoreAllShards || metadataFromRemoteStore) { + updatedIndexMetadata = IndexMetadata.builder(indexMetadata) + .state(IndexMetadata.State.OPEN) + .version(1 + indexMetadata.getVersion()) + .mappingVersion(1 + indexMetadata.getMappingVersion()) + .settingsVersion(1 + indexMetadata.getSettingsVersion()) + .aliasesVersion(1 + indexMetadata.getAliasesVersion()) + .build(); + } + + IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID()); + + Map indexShardRoutingTableMap = new HashMap<>(); + if (metadataFromRemoteStore == false) { + indexShardRoutingTableMap = currentState.routingTable() + .index(indexName) + .shards() + .values() + .stream() + .collect(Collectors.toMap(IndexShardRoutingTable::shardId, Function.identity())); + } + + RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource( + restoreUUID, + updatedIndexMetadata.getCreationVersion(), + indexId + ); + rtBuilder.addAsRemoteStoreRestore( + updatedIndexMetadata, + recoverySource, + indexShardRoutingTableMap, + restoreAllShards || metadataFromRemoteStore + ); + blocks.updateBlocks(updatedIndexMetadata); + mdBuilder.put(updatedIndexMetadata, true); + indicesToBeRestored.add(indexName); + totalShards += updatedIndexMetadata.getNumberOfShards(); + } + + RestoreInfo restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards); + + RoutingTable rt = rtBuilder.build(); + ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build(); + return RemoteRestoreResult.build(restoreUUID, restoreInfo, allocationService.reroute(updatedState, "restored from remote store")); + } + + /** + * Performs various validations needed before executing restore + * @param currentState current cluster state + * @param indexMetadataMap map of index metadata to restore + * @param restoreClusterUUID cluster UUID used to restore IndexMetadata + * @param restoreAllShards indicates if all shards of the index needs to be restored. This flat is ignored if remoteClusterUUID is provided + */ + private void validate( + ClusterState currentState, + Map> indexMetadataMap, + @Nullable String restoreClusterUUID, + boolean restoreAllShards + ) throws IllegalStateException, IllegalArgumentException { + String errorMsg = "cannot restore index [%s] because an open index with same name/uuid already exists in the cluster."; + + // Restore with current cluster UUID will fail as same indices would be present in the cluster which we are trying to + // restore + if (currentState.metadata().clusterUUID().equals(restoreClusterUUID)) { + throw new IllegalArgumentException("clusterUUID to restore from should be different from current cluster UUID"); + } + for (Map.Entry> indexMetadataEntry : indexMetadataMap.entrySet()) { + String indexName = indexMetadataEntry.getKey(); + IndexMetadata indexMetadata = indexMetadataEntry.getValue().v2(); + String indexUUID = indexMetadata.getIndexUUID(); + boolean metadataFromRemoteStore = indexMetadataEntry.getValue().v1(); + if (indexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) { + if (metadataFromRemoteStore) { + Set graveyardIndexNames = new HashSet<>(); + Set graveyardIndexUUID = new HashSet<>(); + Set liveClusterIndexUUIDs = currentState.metadata() + .indices() + .values() + .stream() + .map(IndexMetadata::getIndexUUID) + .collect(Collectors.toSet()); + + currentState.metadata().indexGraveyard().getTombstones().forEach(tombstone -> { + graveyardIndexNames.add(tombstone.getIndex().getName()); + graveyardIndexUUID.add(tombstone.getIndex().getUUID()); + }); + + // Since updates to graveyard are synced to remote we should neven land in a situation where remote contain index + // metadata for graveyard index. + assert graveyardIndexNames.contains(indexName) == false : String.format( + Locale.ROOT, + "Index name [%s] exists in graveyard!", + indexName + ); + assert graveyardIndexUUID.contains(indexUUID) == false : String.format( + Locale.ROOT, + "Index UUID [%s] exists in graveyard!", + indexUUID + ); + + // Any indices being restored from remote cluster state should not already be part of the cluster as this causes + // conflict + boolean sameNameIndexExists = currentState.metadata().hasIndex(indexName); + boolean sameUUIDIndexExists = liveClusterIndexUUIDs.contains(indexUUID); + if (sameNameIndexExists || sameUUIDIndexExists) { + String finalErrorMsg = String.format(Locale.ROOT, errorMsg, indexName); + logger.info(finalErrorMsg); + throw new IllegalStateException(finalErrorMsg); + } + + Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion(); + metadataIndexUpgradeService.upgradeIndexMetadata(indexMetadata, minIndexCompatibilityVersion); + boolean isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.getSettings()); + createIndexService.validateIndexName(indexName, currentState); + createIndexService.validateDotIndex(indexName, isHidden); + shardLimitValidator.validateShardLimit(indexName, indexMetadata.getSettings(), currentState); + } else if (restoreAllShards && IndexMetadata.State.CLOSE.equals(indexMetadata.getState()) == false) { + throw new IllegalStateException(String.format(Locale.ROOT, errorMsg, indexName) + " Close the existing index."); + } + } else { + logger.warn("Remote store is not enabled for index: {}", indexName); + } + } + } + + /** + * Result of a remote restore operation. + */ + public static class RemoteRestoreResult { + private final ClusterState clusterState; + private final RestoreInfo restoreInfo; + private final String restoreUUID; + + private RemoteRestoreResult(String restoreUUID, RestoreInfo restoreInfo, ClusterState clusterState) { + this.clusterState = clusterState; + this.restoreInfo = restoreInfo; + this.restoreUUID = restoreUUID; + } + + public static RemoteRestoreResult build(String restoreUUID, RestoreInfo restoreInfo, ClusterState clusterState) { + return new RemoteRestoreResult(restoreUUID, restoreInfo, clusterState); + } + + public ClusterState getClusterState() { + return clusterState; + } + + public RestoreInfo getRestoreInfo() { + return restoreInfo; + } + public String getRestoreUUID() { + return restoreUUID; + } } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index b9c5c3352adc9..837b0e3795fc0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -974,9 +974,14 @@ protected Node( indicesService, clusterInfoService::getClusterInfo ); + RemoteStoreRestoreService remoteStoreRestoreService = new RemoteStoreRestoreService( clusterService, - clusterModule.getAllocationService() + clusterModule.getAllocationService(), + metadataCreateIndexService, + metadataIndexUpgradeService, + shardLimitValidator, + remoteClusterStateService ); final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 95832dc9544ce..d3e24ccd90500 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -1848,6 +1848,27 @@ public synchronized void stopRandomNonMasterNode() throws IOException { stopRandomNonClusterManagerNode(); } + /** + * Stops all running nodes in cluster + */ + public void stopAllNodes() { + try { + int totalDataNodes = numDataNodes(); + while (totalDataNodes > 0) { + stopRandomDataNode(); + totalDataNodes -= 1; + } + int totalClusterManagerNodes = numClusterManagerNodes(); + while (totalClusterManagerNodes > 1) { + stopRandomNonClusterManagerNode(); + totalClusterManagerNodes -= 1; + } + stopCurrentClusterManagerNode(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private synchronized void startAndPublishNodesAndClients(List nodeAndClients) { if (nodeAndClients.size() > 0) { final int newClusterManagers = (int) nodeAndClients.stream()