diff --git a/CHANGELOG.md b/CHANGELOG.md index 5df33dc6e3ace..1103dd0b1efc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [BWC and API enforcement] Decorate the existing APIs with proper annotations (part 1) ([#9520](https://github.com/opensearch-project/OpenSearch/pull/9520)) - Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622)) - Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507)) +- [Remote Store] Changes to introduce repository registration during bootstrap via node attributes. ([#9105](https://github.com/opensearch-project/OpenSearch/pull/9105)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreNode.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreNode.java new file mode 100644 index 0000000000000..2006d46edc443 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreNode.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore; + +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.Settings; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * This is an extension of {@Code DiscoveryNode} which provides an abstraction for validating and storing information + * specific to remote backed storage nodes. + * + * @opensearch.internal + */ +public class RemoteStoreNode extends DiscoveryNode { + + public static final String REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX = "remote_store"; + public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.segment.repository"; + public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.translog.repository"; + public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type"; + public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings."; + private final DiscoveryNode node; + private final RepositoriesMetadata repositoriesMetadata; + + /** + * Creates a new {@link RemoteStoreNode} + */ + public RemoteStoreNode(DiscoveryNode node) { + super(node.getName(), node.getId(), node.getAddress(), node.getAttributes(), node.getRoles(), node.getVersion()); + this.node = node; + this.repositoriesMetadata = buildRepositoriesMetadata(node); + } + + private String validateAttributeNonNull(DiscoveryNode node, String attributeKey) { + String attributeValue = node.getAttributes().get(attributeKey); + if (attributeValue == null || attributeValue.isEmpty()) { + throw new IllegalStateException("joining node [" + node + "] doesn't have the node attribute [" + attributeKey + "]."); + } + + return attributeValue; + } + + private Map validateSettingsAttributesNonNull(DiscoveryNode node, String settingsAttributeKeyPrefix) { + return node.getAttributes() + .keySet() + .stream() + .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) + .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> validateAttributeNonNull(node, key))); + } + + // TODO: Add logic to mark these repository as System Repository once thats merged. + private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) { + String type = validateAttributeNonNull( + node, + String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name) + ); + Map settingsMap = validateSettingsAttributesNonNull( + node, + String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, name) + ); + + Settings.Builder settings = Settings.builder(); + settingsMap.forEach(settings::put); + + return new RepositoryMetadata(name, type, settings.build()); + } + + private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) { + String segmentRepositoryName = node.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY); + String translogRepositoryName = node.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY); + if (segmentRepositoryName.equals(translogRepositoryName)) { + return new RepositoriesMetadata(Collections.singletonList(buildRepositoryMetadata(node, segmentRepositoryName))); + } else { + List repositoryMetadataList = new ArrayList<>(); + repositoryMetadataList.add(buildRepositoryMetadata(node, segmentRepositoryName)); + repositoryMetadataList.add(buildRepositoryMetadata(node, translogRepositoryName)); + return new RepositoriesMetadata(repositoryMetadataList); + } + } + + RepositoriesMetadata getRepositoriesMetadata() { + return this.repositoriesMetadata; + } + + @Override + public int hashCode() { + // We will hash the id and repositories metadata as its highly unlikely that two nodes will have same id and + // repositories metadata but are actually different. + return Objects.hash(node.getEphemeralId(), repositoriesMetadata); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RemoteStoreNode that = (RemoteStoreNode) o; + + return this.getRepositoriesMetadata().equalsIgnoreGenerations(that.getRepositoriesMetadata()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append('{').append(this.node).append('}'); + sb.append('{').append(this.repositoriesMetadata).append('}'); + return super.toString(); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreService.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreService.java new file mode 100644 index 0000000000000..be67355389987 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/RemoteStoreService.java @@ -0,0 +1,204 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.action.ActionListener; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.function.Supplier; + +/** + * Contains all the method needed for a remote store node lifecycle. + */ +public class RemoteStoreService { + + private static final Logger logger = LogManager.getLogger(RemoteStoreService.class); + private final Supplier repositoriesService; + public static final Setting REMOTE_STORE_MIGRATION_SETTING = Setting.simpleString( + "remote_store.migration", + MigrationTypes.NOT_MIGRATING.value, + MigrationTypes::validate, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public enum MigrationTypes { + NOT_MIGRATING("not_migrating"), + MIGRATING_TO_REMOTE_STORE("migrating_to_remote_store"), + MIGRATING_TO_HOT("migrating_to_hot"); + + public static MigrationTypes validate(String migrationType) { + try { + return MigrationTypes.valueOf(migrationType.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "[" + + migrationType + + "] migration type is not supported. " + + "Supported migration types are [" + + MigrationTypes.values().toString() + + "]" + ); + } + } + + public final String value; + + MigrationTypes(String value) { + this.value = value; + } + } + + public RemoteStoreService(Supplier repositoriesService) { + this.repositoriesService = repositoriesService; + } + + public void verifyRepository(RepositoryMetadata repositoryMetadata) { + ActionListener listener = new ActionListener<>() { + + @Override + public void onResponse(VerifyRepositoryResponse verifyRepositoryResponse) { + logger.info("Successfully verified repository : " + verifyRepositoryResponse.toString()); + } + + @Override + public void onFailure(Exception e) { + throw new IllegalStateException("Failed to finish remote store repository verification" + e.getMessage()); + } + }; + + repositoriesService.get() + .verifyRepository( + repositoryMetadata.name(), + ActionListener.delegateFailure( + listener, + (delegatedListener, verifyResponse) -> delegatedListener.onResponse( + new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])) + ) + ) + ); + } + + private ClusterState createRepository(RepositoryMetadata newRepositoryMetadata, ClusterState currentState) { + RepositoriesService.validate(newRepositoryMetadata.name()); + + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE); + if (repositories == null) { + Repository repository = repositoriesService.get().createRepository(newRepositoryMetadata); + logger.info( + "Remote store repository with name {} and type {} created.", + repository.getMetadata().name(), + repository.getMetadata().type() + ); + repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata)); + } else { + List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); + + for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { + if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) { + if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) { + return new ClusterState.Builder(currentState).build(); + } else { + throw new IllegalStateException( + "new repository metadata [" + + newRepositoryMetadata + + "] supplied by joining node is different from existing repository metadata [" + + repositoryMetadata + + "]." + ); + } + } else { + repositoriesMetadata.add(repositoryMetadata); + } + } + Repository repository = repositoriesService.get().createRepository(newRepositoryMetadata); + logger.info( + "Remote store repository with name {} and type {} created", + repository.getMetadata().name(), + repository.getMetadata().type() + ); + repositoriesMetadata.add(newRepositoryMetadata); + repositories = new RepositoriesMetadata(repositoriesMetadata); + } + mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } + + private boolean isRepositoryCreated(RepositoryMetadata repositoryMetadata) { + try { + repositoriesService.get().repository(repositoryMetadata.name()); + return true; + } catch (RepositoryMissingException e) { + return false; + } + } + + private boolean isRepositoryAddedInClusterState(RepositoryMetadata repositoryMetadata, ClusterState currentState) { + RepositoriesMetadata repositoriesMetadata = currentState.metadata().custom(RepositoriesMetadata.TYPE); + if (repositoriesMetadata == null) { + return false; + } + for (RepositoryMetadata existingRepositoryMetadata : repositoriesMetadata.repositories()) { + existingRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata); + return true; + } + return false; + } + + private ClusterState createOrVerifyRepository(RepositoriesMetadata repositoriesMetadata, ClusterState currentState) { + ClusterState newState = ClusterState.builder(currentState).build(); + for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) { + if (isRepositoryCreated(repositoryMetadata)) { + verifyRepository(repositoryMetadata); + } else { + if (!isRepositoryAddedInClusterState(repositoryMetadata, currentState)) { + newState = ClusterState.builder(createRepository(repositoryMetadata, newState)).build(); + // verifyRepository(repositoryMetadata); + } + } + } + return newState; + } + + public ClusterState joinCluster(RemoteStoreNode joiningRemoteStoreNode, ClusterState currentState) { + List existingNodes = new ArrayList<>(currentState.nodes().getNodes().values()); + if (existingNodes.isEmpty()) { + return currentState; + } + ClusterState.Builder newState = ClusterState.builder(currentState); + if (existingNodes.get(0).isRemoteStoreNode()) { + RemoteStoreNode existingRemoteStoreNode = new RemoteStoreNode(existingNodes.get(0)); + if (existingRemoteStoreNode.equals(joiningRemoteStoreNode)) { + newState = ClusterState.builder(createOrVerifyRepository(joiningRemoteStoreNode.getRepositoriesMetadata(), currentState)); + } + } else { + throw new IllegalStateException( + "a remote store node [" + joiningRemoteStoreNode + "] is trying to join a non remote store cluster." + ); + } + return newState.build(); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 0274073ddfdc7..00c7c7bd6e655 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -181,6 +182,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); private final NodeHealthService nodeHealthService; + private final RemoteStoreService remoteStoreService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -201,7 +203,8 @@ public Coordinator( Random random, RerouteService rerouteService, ElectionStrategy electionStrategy, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + RemoteStoreService remoteStoreService ) { this.settings = settings; this.transportService = transportService; @@ -223,7 +226,8 @@ public Coordinator( rerouteService, nodeHealthService, this::onNodeCommissionStatusChange, - namedWriteableRegistry + namedWriteableRegistry, + remoteStoreService ); this.persistedStateSupplier = persistedStateSupplier; this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings); @@ -287,6 +291,7 @@ public Coordinator( ); this.nodeHealthService = nodeHealthService; this.localNodeCommissioned = true; + this.remoteStoreService = remoteStoreService; } private ClusterFormationState getClusterFormationState() { @@ -605,6 +610,8 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback // we are checking source node commission status here to reject any join request coming from a decommissioned node // even before executing the join task to fail fast JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata()); + + JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joinRequest.getSourceNode(), stateForJoinValidation); } sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback); } else { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 9a1f14295fad8..5f8b332525cb1 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.action.ActionListenerResponseHandler; +import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskConfig; import org.opensearch.cluster.ClusterStateTaskListener; @@ -143,7 +144,8 @@ public class JoinHelper { RerouteService rerouteService, NodeHealthService nodeHealthService, Consumer nodeCommissioned, - NamedWriteableRegistry namedWriteableRegistry + NamedWriteableRegistry namedWriteableRegistry, + RemoteStoreService remoteStoreService ) { this.clusterManagerService = clusterManagerService; this.transportService = transportService; @@ -152,7 +154,13 @@ public class JoinHelper { this.nodeCommissioned = nodeCommissioned; this.namedWriteableRegistry = namedWriteableRegistry; - this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) { + this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor( + settings, + allocationService, + logger, + rerouteService, + remoteStoreService + ) { private final long term = currentTermSupplier.getAsLong(); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 564819a70111d..c879898e9effb 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -33,6 +33,8 @@ import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode; +import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.NotClusterManagerException; @@ -74,6 +76,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor execute(ClusterState currentState, List jo // processing any joins Map joiniedNodeNameIds = new HashMap<>(); for (final Task joinTask : joiningNodes) { + final DiscoveryNode node = joinTask.node(); if (joinTask.isBecomeClusterManagerTask() || joinTask.isFinishElectionTask()) { // noop - } else if (currentNodes.nodeExistsWithSameRoles(joinTask.node())) { - logger.debug("received a join request for an existing node [{}]", joinTask.node()); + } else if (currentNodes.nodeExistsWithSameRoles(node)) { + logger.debug("received a join request for an existing node [{}]", node); + if (node.isRemoteStoreNode()) { + /** joinCluster for remote store node is invoked here as elect leader task can have same node + * present in join task as well as current node. We want the repositories to be registered during + * first node join. See + * {@link org.opensearch.gateway.GatewayMetaState#prepareInitialClusterState(TransportService, ClusterService, ClusterState)} **/ + newState = ClusterState.builder(remoteStoreService.joinCluster(new RemoteStoreNode(node), currentState)); + } } else { - final DiscoveryNode node = joinTask.node(); + if (node.isRemoteStoreNode()) { + newState = ClusterState.builder(remoteStoreService.joinCluster(new RemoteStoreNode(node), currentState)); + } try { if (enforceMajorVersion) { ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion); @@ -187,6 +208,8 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // we have added the same check in handleJoinRequest method and adding it here as this method // would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness ensureNodeCommissioned(node, currentState.metadata()); + + ensureRemoteStoreNodesCompatibility(node, currentState); nodesBuilder.add(node); nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); @@ -422,6 +445,61 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) } } + /** + * The method ensures two conditions - + * 1. The joining node is remote store if it is joining a remote store cluster. + * 2. The joining node is non-remote store if it is joining a non-remote store cluster. + * A remote store node is the one which holds all the remote store attributes and a remote store cluster is + * the one which has only homogeneous remote store nodes with same node attributes + * + * TODO: When we support migration from remote store cluster to non remote store and vice versa the migration + * setting {@link RemoteStoreService::REMOTE_STORE_MIGRATION_SETTING} will be help determine if a non + * remote store node is allowed to join the remote store cluster and vice versa. + */ + public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, ClusterState currentState) { + List existingNodes = new ArrayList<>(currentState.getNodes().getNodes().values()); + + // If there are no node in the cluster state we will No op the compatibility check as at this point we cannot + // determine if this is a remote store cluster or non-remote store cluster. + if (existingNodes.isEmpty()) { + return; + } + + // TODO: The below check is valid till we support migration, once we start supporting migration a remote + // store node will be able to join a non remote store cluster and vice versa. #7986 + if (RemoteStoreService.MigrationTypes.NOT_MIGRATING.value.equals( + RemoteStoreService.REMOTE_STORE_MIGRATION_SETTING.get(currentState.metadata().settings()) + )) { + DiscoveryNode existingNode = existingNodes.get(0); + if (joiningNode.isRemoteStoreNode()) { + if (existingNode.isRemoteStoreNode()) { + RemoteStoreNode joiningRemoteStoreNode = new RemoteStoreNode(joiningNode); + RemoteStoreNode existingRemoteStoreNode = new RemoteStoreNode(existingNode); + if (existingRemoteStoreNode.equals(joiningRemoteStoreNode) == false) { + throw new IllegalStateException( + "a remote store node [" + + joiningNode + + "] is trying to join a remote store cluster with incompatible node attributes in " + + "comparison with existing node [" + + existingNode + + "]." + ); + } + } else { + throw new IllegalStateException( + "a remote store node [" + joiningNode + "] is trying to join a non remote store cluster." + ); + } + } else { + if (existingNode.isRemoteStoreNode()) { + throw new IllegalStateException( + "a non remote store node [" + joiningNode + "] is trying to join a remote store cluster." + ); + } + } + } + } + public static Collection> addBuiltInJoinValidators( Collection> onJoinValidators ) { @@ -430,6 +508,7 @@ public static Collection> addBuiltInJoin ensureNodesCompatibility(node.getVersion(), state.getNodes()); ensureIndexCompatibility(node.getVersion(), state.getMetadata()); ensureNodeCommissioned(node, state.getMetadata()); + ensureRemoteStoreNodesCompatibility(node, state); }); validators.addAll(onJoinValidators); return Collections.unmodifiableCollection(validators); diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index a04b0d9de912d..de4f2ab74a7bf 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -60,6 +60,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.action.admin.cluster.remotestore.RemoteStoreNode.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING; /** @@ -460,6 +461,15 @@ public boolean isSearchNode() { return roles.contains(DiscoveryNodeRole.SEARCH_ROLE); } + /** + * Returns whether the node is a remote store node. + * + * @return true if the node contains remote store node attributes, false otherwise + */ + public boolean isRemoteStoreNode() { + return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)); + } + /** * Returns a set of all the roles that the node has. The roles are returned in sorted order by the role name. *

diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index e00e7e3bf4ea7..aa337b9b432bf 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -33,6 +33,7 @@ import org.apache.logging.log4j.LogManager; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; +import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; import org.opensearch.action.search.CreatePitController; import org.opensearch.action.search.TransportSearchAction; @@ -670,6 +671,7 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING + RemoteStoreService.REMOTE_STORE_MIGRATION_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index 68fce4d9b9bb4..4f9ead6a3883f 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.ElectionStrategy; @@ -129,7 +130,8 @@ public DiscoveryModule( Path configFile, GatewayMetaState gatewayMetaState, RerouteService rerouteService, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + RemoteStoreService remoteStoreService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -205,7 +207,8 @@ public DiscoveryModule( new Random(Randomness.get().nextLong()), rerouteService, electionStrategy, - nodeHealthService + nodeHealthService, + remoteStoreService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 51cc7c9007159..cf41040942111 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -43,6 +43,7 @@ import org.opensearch.action.ActionModule; import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionType; +import org.opensearch.action.admin.cluster.remotestore.RemoteStoreService; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; @@ -963,6 +964,8 @@ protected Node( ); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); + final RemoteStoreService remoteStoreService = new RemoteStoreService(repositoriesServiceReference::get); + final DiscoveryModule discoveryModule = new DiscoveryModule( settings, threadPool, @@ -977,7 +980,8 @@ protected Node( environment.configDir(), gatewayMetaState, rerouteService, - fsHealthService + fsHealthService, + remoteStoreService ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index f00bf3942c9a9..0d84035ced9cc 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -576,6 +576,13 @@ private void archiveRepositoryStats(Repository repository, long clusterStateVers } } + /** + * Creates repository holder. This method starts the non-internal repository + */ + public Repository createRepository(RepositoryMetadata repositoryMetadata) { + return this.createRepository(repositoryMetadata, typesRegistry); + } + /** * Creates repository holder. This method starts the repository */ @@ -600,7 +607,7 @@ private Repository createRepository(RepositoryMetadata repositoryMetadata, Map(repositoriesService)::get); + nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); - joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {}); + joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {}, remoteStoreService); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {