Skip to content

Commit

Permalink
[Remote Store Migration] Reconcile remote store based index settings …
Browse files Browse the repository at this point in the history
…during STRICT mode switch (#14792)

Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 authored Jul 19, 2024
1 parent cb74371 commit 18da095
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
Expand Down Expand Up @@ -277,4 +278,30 @@ protected IndexShard getIndexShard(String dataNode, String indexName) throws Exe
IndexService indexService = indicesService.indexService(new Index(indexName, uuid));
return indexService.getShard(0);
}

public void changeReplicaCountAndEnsureGreen(int replicaCount, String indexName) {
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
);
ensureGreen(indexName);
}

public void completeDocRepToRemoteMigration() {
assertTrue(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.putNull(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey())
.putNull(MIGRATION_DIRECTION_SETTING.getKey())
)
.get()
.isAcknowledged()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,73 @@ public void testRemoteIndexPathFileExistsAfterMigration() throws Exception {
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix)));
}

/**
* Scenario:
* Creates an index with 1 pri 1 rep setup with 3 docrep nodes (1 cluster manager + 2 data nodes),
* initiate migration and create 3 remote nodes (1 cluster manager + 2 data nodes) and moves over
* only primary shard copy of the index
* After the primary shard copy is relocated, decrease replica count to 0, stop all docrep nodes
* and conclude migration. Remote store index settings should be applied to the index at this point.
*/
public void testIndexSettingsUpdateDuringReplicaCountDecrement() throws Exception {
String indexName = "migration-index-replica-decrement";
String docrepClusterManager = internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 2 docrep nodes");
List<String> docrepNodeNames = internalCluster().startDataOnlyNodes(2);
internalCluster().validateClusterFormed();

logger.info("---> Creating index with 1 primary and 1 replica");
Settings oneReplica = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
createIndexAndAssertDocrepProperties(indexName, oneReplica);

int docsToIndex = randomIntBetween(10, 100);
logger.info("---> Indexing {} on both indices", docsToIndex);
indexBulk(indexName, docsToIndex);

logger.info(
"---> Stopping shard rebalancing to ensure shards do not automatically move over to newer nodes after they are launched"
);
stopShardRebalancing();

logger.info("---> Starting 3 remote store enabled nodes");
initDocRepToRemoteMigration();
setAddRemote(true);
internalCluster().startClusterManagerOnlyNode();
List<String> remoteNodeNames = internalCluster().startDataOnlyNodes(2);
internalCluster().validateClusterFormed();

String primaryNode = primaryNodeName(indexName);

logger.info("---> Moving over primary to remote store enabled nodes");
assertAcked(
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(indexName, 0, primaryNode, remoteNodeNames.get(0)))
.execute()
.actionGet()
);
waitForRelocation();
waitNoPendingTasksOnAll();

logger.info("---> Reducing replica count to 0 for the index");
changeReplicaCountAndEnsureGreen(0, indexName);

logger.info("---> Stopping all docrep nodes");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(docrepClusterManager));
for (String node : docrepNodeNames) {
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node));
}
internalCluster().validateClusterFormed();
completeDocRepToRemoteMigration();
waitNoPendingTasksOnAll();
assertRemoteProperties(indexName);
}

private void createIndexAndAssertDocrepProperties(String index, Settings settings) {
createIndexAssertHealthAndDocrepProperties(index, settings, this::ensureGreen);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -59,17 +58,13 @@
import org.opensearch.common.settings.SettingsException;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasAllRemoteStoreRelatedMetadata;
import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration;

/**
* Transport action for updating cluster settings
Expand Down Expand Up @@ -262,13 +257,14 @@ public void onFailure(String source, Exception e) {

@Override
public ClusterState execute(final ClusterState currentState) {
validateCompatibilityModeSettingRequest(request, state);
final ClusterState clusterState = updater.updateSettings(
boolean isCompatibilityModeChanging = validateCompatibilityModeSettingRequest(request, state);
ClusterState clusterState = updater.updateSettings(
currentState,
clusterSettings.upgradeSettings(request.transientSettings()),
clusterSettings.upgradeSettings(request.persistentSettings()),
logger
);
clusterState = checkAndFinalizeRemoteStoreMigration(isCompatibilityModeChanging, request, clusterState, logger);
changed = clusterState != currentState;
return clusterState;
}
Expand All @@ -278,19 +274,23 @@ public ClusterState execute(final ClusterState currentState) {

/**
* Runs various checks associated with changing cluster compatibility mode
*
* @param request cluster settings update request, for settings to be updated and new values
* @param clusterState current state of cluster, for information on nodes
* @return true if the incoming cluster settings update request is switching compatibility modes
*/
public void validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) {
public boolean validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) {
Settings settings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(settings)) {
String value = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode;
validateAllNodesOfSameVersion(clusterState.nodes());
if (RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(value)) {
if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(
settings
) == RemoteStoreNodeService.CompatibilityMode.STRICT) {
validateAllNodesOfSameType(clusterState.nodes());
validateIndexSettings(clusterState);
}
return true;
}
return false;
}

/**
Expand All @@ -310,31 +310,18 @@ private void validateAllNodesOfSameVersion(DiscoveryNodes discoveryNodes) {
* @param discoveryNodes current discovery nodes in the cluster
*/
private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) {
Set<Boolean> nodeTypes = discoveryNodes.getNodes()
boolean allNodesDocrepEnabled = discoveryNodes.getNodes()
.values()
.stream()
.map(DiscoveryNode::isRemoteStoreNode)
.collect(Collectors.toSet());
if (nodeTypes.size() != 1) {
.allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode() == false);
boolean allNodesRemoteStoreEnabled = discoveryNodes.getNodes()
.values()
.stream()
.allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode());
if (allNodesDocrepEnabled == false && allNodesRemoteStoreEnabled == false) {
throw new SettingsException(
"can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes"
);
}
}

/**
* Verifies that while trying to switch to STRICT compatibility mode,
* all indices in the cluster have {@link RemoteMigrationIndexMetadataUpdater#indexHasAllRemoteStoreRelatedMetadata(IndexMetadata)} as <code>true</code>.
* If not, throws {@link SettingsException}
* @param clusterState current cluster state
*/
private void validateIndexSettings(ClusterState clusterState) {
Collection<IndexMetadata> allIndicesMetadata = clusterState.metadata().indices().values();
if (allIndicesMetadata.isEmpty() == false
&& allIndicesMetadata.stream().anyMatch(indexMetadata -> indexHasAllRemoteStoreRelatedMetadata(indexMetadata) == false)) {
throw new SettingsException(
"can not switch to STRICT compatibility mode since all indices in the cluster does not have remote store based index settings"
);
}
}
}
123 changes: 123 additions & 0 deletions server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,33 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA;
Expand Down Expand Up @@ -250,4 +258,119 @@ public static Map<String, String> getRemoteStoreRepoName(DiscoveryNodes discover
.findFirst();
return remoteNode.map(RemoteStoreNodeAttribute::getDataRepoNames).orElseGet(HashMap::new);
}

/**
* Invoked after a cluster settings update.
* Checks if the applied cluster settings has switched the cluster to STRICT mode.
* If so, checks and applies appropriate index settings depending on the current set
* of node types in the cluster
* This has been intentionally done after the cluster settings update
* flow. That way we are not interfering with the usual settings update
* and the cluster state mutation that comes along with it
*
* @param isCompatibilityModeChanging flag passed from cluster settings update call to denote if a compatibility mode change has been done
* @param request request payload passed from cluster settings update
* @param currentState cluster state generated after changing cluster settings were applied
* @param logger Logger reference
* @return Mutated cluster state with remote store index settings applied, no-op if the cluster is not switching to `STRICT` compatibility mode
*/
public static ClusterState checkAndFinalizeRemoteStoreMigration(
boolean isCompatibilityModeChanging,
ClusterUpdateSettingsRequest request,
ClusterState currentState,
Logger logger
) {
if (isCompatibilityModeChanging && isSwitchToStrictCompatibilityMode(request)) {
return finalizeMigration(currentState, logger);
}
return currentState;
}

/**
* Finalizes the docrep to remote-store migration process by applying remote store based index settings
* on indices that are missing them. No-Op if all indices already have the settings applied through
* IndexMetadataUpdater
*
* @param incomingState mutated cluster state after cluster settings were applied
* @return new cluster state with index settings updated
*/
public static ClusterState finalizeMigration(ClusterState incomingState, Logger logger) {
Map<String, DiscoveryNode> discoveryNodeMap = incomingState.nodes().getNodes();
if (discoveryNodeMap.isEmpty() == false) {
// At this point, we have already validated that all nodes in the cluster are of uniform type.
// Either all of them are remote store enabled, or all of them are docrep enabled
boolean remoteStoreEnabledNodePresent = discoveryNodeMap.values().stream().findFirst().get().isRemoteStoreNode();
if (remoteStoreEnabledNodePresent == true) {
List<IndexMetadata> indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState, logger);
if (indicesWithoutRemoteStoreSettings.isEmpty() == true) {
logger.info("All indices in the cluster has remote store based index settings");
} else {
Metadata mutatedMetadata = applyRemoteStoreSettings(incomingState, indicesWithoutRemoteStoreSettings, logger);
return ClusterState.builder(incomingState).metadata(mutatedMetadata).build();
}
} else {
logger.debug("All nodes in the cluster are not remote nodes. Skipping.");
}
}
return incomingState;
}

/**
* Filters out indices which does not have remote store based
* index settings applied even after all shard copies have
* migrated to remote store enabled nodes
*/
private static List<IndexMetadata> getIndicesWithoutRemoteStoreSettings(ClusterState clusterState, Logger logger) {
Collection<IndexMetadata> allIndicesMetadata = clusterState.metadata().indices().values();
if (allIndicesMetadata.isEmpty() == false) {
List<IndexMetadata> indicesWithoutRemoteSettings = allIndicesMetadata.stream()
.filter(idxMd -> indexHasRemoteStoreSettings(idxMd.getSettings()) == false)
.collect(Collectors.toList());
logger.debug(
"Attempting to switch to strict mode. Count of indices without remote store settings {}",
indicesWithoutRemoteSettings.size()
);
return indicesWithoutRemoteSettings;
}
return Collections.emptyList();
}

/**
* Applies remote store index settings through {@link RemoteMigrationIndexMetadataUpdater}
*/
private static Metadata applyRemoteStoreSettings(
ClusterState clusterState,
List<IndexMetadata> indicesWithoutRemoteStoreSettings,
Logger logger
) {
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata());
RoutingTable currentRoutingTable = clusterState.getRoutingTable();
DiscoveryNodes currentDiscoveryNodes = clusterState.getNodes();
Settings currentClusterSettings = clusterState.metadata().settings();
for (IndexMetadata indexMetadata : indicesWithoutRemoteStoreSettings) {
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata);
RemoteMigrationIndexMetadataUpdater indexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater(
currentDiscoveryNodes,
currentRoutingTable,
indexMetadata,
currentClusterSettings,
logger
);
indexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexMetadata.getIndex().getName());
metadataBuilder.put(indexMetadataBuilder);
}
return metadataBuilder.build();
}

/**
* Checks if the incoming cluster settings payload is attempting to switch
* the cluster to `STRICT` compatibility mode
* Visible only for tests
*/
public static boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsRequest request) {
Settings incomingSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
return RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(
incomingSettings
) == RemoteStoreNodeService.CompatibilityMode.STRICT;
}
}
Loading

0 comments on commit 18da095

Please sign in to comment.