diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
index 32d14a3519659..381e9d1e95ae4 100644
--- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
@@ -96,6 +96,7 @@
import org.opensearch.gateway.DanglingIndicesState;
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
+import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
@@ -660,7 +661,11 @@ public void apply(Settings value, Settings current, Settings previous) {
// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
- TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
+ TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING,
+
+ // Remote cluster state settings
+ RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
+ RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING
)
)
);
diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
index af894bdbc117e..c6ccb4e15a516 100644
--- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
+++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
@@ -60,6 +60,8 @@
import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.env.NodeMetadata;
+import org.opensearch.gateway.remote.ClusterMetadataMarker;
+import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.threadpool.ThreadPool;
@@ -84,19 +86,19 @@
/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
*
- * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
- * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
- * ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and
- * non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster.
+ * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being
+ * loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be
+ * stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes
+ * receive the real cluster state from the elected cluster-manager after joining the cluster.
*
* @opensearch.internal
*/
public class GatewayMetaState implements Closeable {
/**
- * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially
- * stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is
- * restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
+ * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since
+ * it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a
+ * cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state.
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";
@@ -234,8 +236,8 @@ Metadata upgradeMetadataForNode(
}
/**
- * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current
- * version. The MetadataIndexUpgradeService might also update obsolete settings if needed.
+ * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService
+ * might also update obsolete settings if needed.
*
* @return input metadata
if no upgrade is needed or an upgraded metadata
*/
@@ -599,4 +601,105 @@ public void close() throws IOException {
IOUtils.close(persistenceWriter.getAndSet(null));
}
}
+
+ /**
+ * Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}.
+ */
+ public static class RemotePersistedState implements PersistedState {
+
+ private ClusterState lastAcceptedState;
+ private ClusterMetadataMarker lastAcceptedMarker;
+ private final RemoteClusterStateService remoteClusterStateService;
+
+ public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) {
+ this.remoteClusterStateService = remoteClusterStateService;
+ }
+
+ @Override
+ public long getCurrentTerm() {
+ return lastAcceptedState != null ? lastAcceptedState.term() : 0L;
+ }
+
+ @Override
+ public ClusterState getLastAcceptedState() {
+ if (lastAcceptedMarker != null) {
+ assert lastAcceptedState != null : "Last accepted state is not set";
+ assert lastAcceptedState.metadata().indices().size() == lastAcceptedMarker.getIndices().size()
+ : "Number of indices in last accepted state and marker are different";
+ lastAcceptedMarker.getIndices().stream().forEach(md -> {
+ assert lastAcceptedState.metadata().indices().containsKey(md.getIndexName())
+ : "Last accepted state and marker are not in sync";
+ assert lastAcceptedState.metadata().indices().get(md.getIndexName()).getIndexUUID().equals(md.getIndexUUID())
+ : "Last accepted state and marker are not in sync";
+ });
+ }
+ return lastAcceptedState;
+ }
+
+ @Override
+ public void setCurrentTerm(long currentTerm) {
+ // no-op
+ // For LucenePersistedState, setCurrentTerm is used only while handling StartJoinRequest by all follower nodes.
+ // But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required.
+ }
+
+ @Override
+ public void setLastAcceptedState(ClusterState clusterState) {
+ try {
+ if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+ // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
+ lastAcceptedState = clusterState;
+ return;
+ }
+ final ClusterMetadataMarker marker;
+ if (shouldWriteFullClusterState(clusterState)) {
+ marker = remoteClusterStateService.writeFullMetadata(clusterState);
+ } else {
+ marker = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedMarker);
+ }
+ lastAcceptedMarker = marker;
+ lastAcceptedState = clusterState;
+ } catch (Exception e) {
+ handleExceptionOnWrite(e);
+ }
+ }
+
+ private boolean shouldWriteFullClusterState(ClusterState clusterState) {
+ if (lastAcceptedState == null
+ || lastAcceptedMarker == null
+ || lastAcceptedState.term() != clusterState.term()
+ || lastAcceptedMarker.getOpensearchVersion() != Version.CURRENT) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void markLastAcceptedStateAsCommitted() {
+ try {
+ if (lastAcceptedState == null
+ || lastAcceptedMarker == null
+ || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+ // On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
+ return;
+ }
+ final ClusterMetadataMarker committedMarker = remoteClusterStateService.markLastStateAsCommitted(
+ lastAcceptedState,
+ lastAcceptedMarker
+ );
+ lastAcceptedMarker = committedMarker;
+ } catch (Exception e) {
+ handleExceptionOnWrite(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ PersistedState.super.close();
+ }
+
+ private void handleExceptionOnWrite(Exception e) {
+ throw ExceptionsHelper.convertToRuntime(e);
+ }
+ }
}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java
new file mode 100644
index 0000000000000..14be75ef166b6
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataMarker.java
@@ -0,0 +1,381 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.Version;
+import org.opensearch.core.ParseField;
+import org.opensearch.core.common.Strings;
+import org.opensearch.core.common.io.stream.StreamOutput;
+import org.opensearch.core.common.io.stream.Writeable;
+import org.opensearch.core.xcontent.ConstructingObjectParser;
+import org.opensearch.core.xcontent.MediaTypeRegistry;
+import org.opensearch.core.xcontent.ToXContentFragment;
+import org.opensearch.core.xcontent.XContentBuilder;
+import org.opensearch.core.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Marker file which contains the details of the uploaded entity metadata
+ *
+ * @opensearch.internal
+ */
+public class ClusterMetadataMarker implements Writeable, ToXContentFragment {
+
+ private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
+ private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
+ private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
+ private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid");
+ private static final ParseField OPENSEARCH_VERSION_FIELD = new ParseField("opensearch_version");
+ private static final ParseField COMMITTED_FIELD = new ParseField("committed");
+ private static final ParseField INDICES_FIELD = new ParseField("indices");
+
+ private static long term(Object[] fields) {
+ return (long) fields[0];
+ }
+
+ private static long version(Object[] fields) {
+ return (long) fields[1];
+ }
+
+ private static String clusterUUID(Object[] fields) {
+ return (String) fields[2];
+ }
+
+ private static String stateUUID(Object[] fields) {
+ return (String) fields[3];
+ }
+
+ private static Version opensearchVersion(Object[] fields) {
+ return Version.fromId((int) fields[4]);
+ }
+
+ private static boolean committed(Object[] fields) {
+ return (boolean) fields[5];
+ }
+
+ private static List indices(Object[] fields) {
+ return (List) fields[6];
+ }
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ "cluster_metadata_marker",
+ fields -> new ClusterMetadataMarker(
+ term(fields),
+ version(fields),
+ clusterUUID(fields),
+ stateUUID(fields),
+ opensearchVersion(fields),
+ committed(fields),
+ indices(fields)
+ )
+ );
+
+ static {
+ PARSER.declareLong(ConstructingObjectParser.constructorArg(), CLUSTER_TERM_FIELD);
+ PARSER.declareLong(ConstructingObjectParser.constructorArg(), STATE_VERSION_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD);
+ PARSER.declareInt(ConstructingObjectParser.constructorArg(), OPENSEARCH_VERSION_FIELD);
+ PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), COMMITTED_FIELD);
+ PARSER.declareObjectArray(
+ ConstructingObjectParser.constructorArg(),
+ (p, c) -> UploadedIndexMetadata.fromXContent(p),
+ INDICES_FIELD
+ );
+ }
+
+ private final List indices;
+ private final long clusterTerm;
+ private final long stateVersion;
+ private final String clusterUUID;
+ private final String stateUUID;
+ private final Version opensearchVersion;
+ private final boolean committed;
+
+ public List getIndices() {
+ return indices;
+ }
+
+ public long getClusterTerm() {
+ return clusterTerm;
+ }
+
+ public long getStateVersion() {
+ return stateVersion;
+ }
+
+ public String getClusterUUID() {
+ return clusterUUID;
+ }
+
+ public String getStateUUID() {
+ return stateUUID;
+ }
+
+ public Version getOpensearchVersion() {
+ return opensearchVersion;
+ }
+
+ public boolean isCommitted() {
+ return committed;
+ }
+
+ public ClusterMetadataMarker(
+ long clusterTerm,
+ long version,
+ String clusterUUID,
+ String stateUUID,
+ Version opensearchVersion,
+ boolean committed,
+ List indices
+ ) {
+ this.clusterTerm = clusterTerm;
+ this.stateVersion = version;
+ this.clusterUUID = clusterUUID;
+ this.stateUUID = stateUUID;
+ this.opensearchVersion = opensearchVersion;
+ this.committed = committed;
+ this.indices = Collections.unmodifiableList(indices);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.field(CLUSTER_TERM_FIELD.getPreferredName(), getClusterTerm())
+ .field(STATE_VERSION_FIELD.getPreferredName(), getStateVersion())
+ .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID())
+ .field(STATE_UUID_FIELD.getPreferredName(), getStateUUID())
+ .field(OPENSEARCH_VERSION_FIELD.getPreferredName(), getOpensearchVersion().id)
+ .field(COMMITTED_FIELD.getPreferredName(), isCommitted());
+ builder.startArray(INDICES_FIELD.getPreferredName());
+ {
+ for (UploadedIndexMetadata uploadedIndexMetadata : indices) {
+ uploadedIndexMetadata.toXContent(builder, params);
+ }
+ }
+ builder.endArray();
+ return builder;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVLong(clusterTerm);
+ out.writeVLong(stateVersion);
+ out.writeString(clusterUUID);
+ out.writeString(stateUUID);
+ out.writeInt(opensearchVersion.id);
+ out.writeBoolean(committed);
+ out.writeCollection(indices);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ClusterMetadataMarker that = (ClusterMetadataMarker) o;
+ return Objects.equals(indices, that.indices)
+ && clusterTerm == that.clusterTerm
+ && stateVersion == that.stateVersion
+ && Objects.equals(clusterUUID, that.clusterUUID)
+ && Objects.equals(stateUUID, that.stateUUID)
+ && Objects.equals(opensearchVersion, that.opensearchVersion)
+ && Objects.equals(committed, that.committed);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(indices, clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, committed);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(MediaTypeRegistry.JSON, this);
+ }
+
+ public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+
+ /**
+ * Builder for ClusterMetadataMarker
+ *
+ * @opensearch.internal
+ */
+ public static class Builder {
+
+ private List indices;
+ private long clusterTerm;
+ private long stateVersion;
+ private String clusterUUID;
+ private String stateUUID;
+ private Version opensearchVersion;
+ private boolean committed;
+
+ public Builder indices(List indices) {
+ this.indices = indices;
+ return this;
+ }
+
+ public Builder clusterTerm(long clusterTerm) {
+ this.clusterTerm = clusterTerm;
+ return this;
+ }
+
+ public Builder stateVersion(long stateVersion) {
+ this.stateVersion = stateVersion;
+ return this;
+ }
+
+ public Builder clusterUUID(String clusterUUID) {
+ this.clusterUUID = clusterUUID;
+ return this;
+ }
+
+ public Builder stateUUID(String stateUUID) {
+ this.stateUUID = stateUUID;
+ return this;
+ }
+
+ public Builder opensearchVersion(Version opensearchVersion) {
+ this.opensearchVersion = opensearchVersion;
+ return this;
+ }
+
+ public Builder committed(boolean committed) {
+ this.committed = committed;
+ return this;
+ }
+
+ public List getIndices() {
+ return indices;
+ }
+
+ public Builder() {
+ indices = new ArrayList<>();
+ }
+
+ public ClusterMetadataMarker build() {
+ return new ClusterMetadataMarker(clusterTerm, stateVersion, clusterUUID, stateUUID, opensearchVersion, committed, indices);
+ }
+
+ }
+
+ /**
+ * Metadata for uploaded index metadata
+ *
+ * @opensearch.internal
+ */
+ public static class UploadedIndexMetadata implements Writeable, ToXContentFragment {
+
+ private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name");
+ private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid");
+ private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename");
+
+ private static String indexName(Object[] fields) {
+ return (String) fields[0];
+ }
+
+ private static String indexUUID(Object[] fields) {
+ return (String) fields[1];
+ }
+
+ private static String uploadedFilename(Object[] fields) {
+ return (String) fields[2];
+ }
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ "uploaded_index_metadata",
+ fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields))
+ );
+
+ static {
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD);
+ PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD);
+ }
+
+ private final String indexName;
+ private final String indexUUID;
+ private final String uploadedFilename;
+
+ public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) {
+ this.indexName = indexName;
+ this.indexUUID = indexUUID;
+ this.uploadedFilename = uploadedFileName;
+ }
+
+ public String getUploadedFilename() {
+ return uploadedFilename;
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public String getIndexUUID() {
+ return indexUUID;
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ return builder.startObject()
+ .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
+ .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
+ .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename())
+ .endObject();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(indexName);
+ out.writeString(indexUUID);
+ out.writeString(uploadedFilename);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final UploadedIndexMetadata that = (UploadedIndexMetadata) o;
+ return Objects.equals(indexName, that.indexName)
+ && Objects.equals(indexUUID, that.indexUUID)
+ && Objects.equals(uploadedFilename, that.uploadedFilename);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(indexName, indexUUID, uploadedFilename);
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(MediaTypeRegistry.JSON, this);
+ }
+
+ public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+ }
+}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
new file mode 100644
index 0000000000000..f5e0bb14e6743
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java
@@ -0,0 +1,358 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.Version;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.common.Nullable;
+import org.opensearch.common.blobstore.BlobContainer;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Setting.Property;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata;
+import org.opensearch.index.remote.RemoteStoreUtils;
+import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.repositories.Repository;
+import org.opensearch.repositories.blobstore.BlobStoreRepository;
+import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
+
+/**
+ * A Service which provides APIs to upload and download cluster metadata from remote store.
+ *
+ * @opensearch.internal
+ */
+public class RemoteClusterStateService {
+
+ public static final String METADATA_NAME_FORMAT = "%s.dat";
+
+ public static final String METADATA_MARKER_NAME_FORMAT = "%s";
+
+ public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
+ "index-metadata",
+ METADATA_NAME_FORMAT,
+ IndexMetadata::fromXContent
+ );
+
+ public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MARKER_FORMAT = new ChecksumBlobStoreFormat<>(
+ "cluster-metadata-marker",
+ METADATA_MARKER_NAME_FORMAT,
+ ClusterMetadataMarker::fromXContent
+ );
+ /**
+ * Used to specify if cluster state metadata should be publish to remote store
+ */
+ public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting(
+ "cluster.remote_store.state.enabled",
+ false,
+ Property.NodeScope,
+ Property.Final
+ );
+ /**
+ * Used to specify default repo to use for cluster state metadata upload
+ */
+ public static final Setting REMOTE_CLUSTER_STATE_REPOSITORY_SETTING = Setting.simpleString(
+ "cluster.remote_store.state.repository",
+ "",
+ Property.NodeScope,
+ Property.Final
+ );
+ private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);
+
+ private static final String DELIMITER = "__";
+
+ private final Supplier repositoriesService;
+ private final Settings settings;
+ private final LongSupplier relativeTimeMillisSupplier;
+ private BlobStoreRepository blobStoreRepository;
+ private volatile TimeValue slowWriteLoggingThreshold;
+
+ public RemoteClusterStateService(
+ Supplier repositoriesService,
+ Settings settings,
+ ClusterSettings clusterSettings,
+ LongSupplier relativeTimeMillisSupplier
+ ) {
+ this.repositoriesService = repositoriesService;
+ this.settings = settings;
+ this.relativeTimeMillisSupplier = relativeTimeMillisSupplier;
+ this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
+ clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
+ }
+
+ /**
+ * This method uploads entire cluster state metadata to the configured blob store. For now only index metadata upload is supported. This method should be
+ * invoked by the elected cluster manager when the remote cluster state is enabled.
+ *
+ * @return A metadata/marker object which contains the details of uploaded entity metadata.
+ */
+ @Nullable
+ public ClusterMetadataMarker writeFullMetadata(ClusterState clusterState) throws IOException {
+ final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
+ if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
+ logger.error("Local node is not elected cluster manager. Exiting");
+ return null;
+ }
+ assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
+ ensureRepositorySet();
+
+ final List allUploadedIndexMetadata = new ArrayList<>();
+ // todo parallel upload
+ // any validations before/after upload ?
+ for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
+ // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
+ final String indexMetadataKey = writeIndexMetadata(
+ clusterState.getClusterName().value(),
+ clusterState.getMetadata().clusterUUID(),
+ indexMetadata,
+ indexMetadataFileName(indexMetadata)
+ );
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
+ indexMetadata.getIndex().getName(),
+ indexMetadata.getIndexUUID(),
+ indexMetadataKey
+ );
+ allUploadedIndexMetadata.add(uploadedIndexMetadata);
+ }
+ final ClusterMetadataMarker marker = uploadMarker(clusterState, allUploadedIndexMetadata, false);
+ final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
+ if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
+ logger.warn(
+ "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices",
+ durationMillis,
+ slowWriteLoggingThreshold,
+ allUploadedIndexMetadata.size()
+ );
+ } else {
+ // todo change to debug
+ logger.info(
+ "writing cluster state took [{}ms]; " + "wrote full state with [{}] indices",
+ durationMillis,
+ allUploadedIndexMetadata.size()
+ );
+ }
+ return marker;
+ }
+
+ /**
+ * This method uploads the diff between the previous cluster state and the current cluster state. The previous marker file is needed to create the new
+ * marker. The new marker file is created by using the unchanged metadata from the previous marker and the new metadata changes from the current cluster
+ * state.
+ *
+ * @return The uploaded ClusterMetadataMarker file
+ */
+ @Nullable
+ public ClusterMetadataMarker writeIncrementalMetadata(
+ ClusterState previousClusterState,
+ ClusterState clusterState,
+ ClusterMetadataMarker previousMarker
+ ) throws IOException {
+ final long startTimeMillis = relativeTimeMillisSupplier.getAsLong();
+ if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
+ logger.error("Local node is not elected cluster manager. Exiting");
+ return null;
+ }
+ assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();
+ assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
+ final Map previousStateIndexMetadataVersionByName = new HashMap<>();
+ for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) {
+ previousStateIndexMetadataVersionByName.put(indexMetadata.getIndex().getName(), indexMetadata.getVersion());
+ }
+
+ int numIndicesUpdated = 0;
+ int numIndicesUnchanged = 0;
+ final Map allUploadedIndexMetadata = previousMarker.getIndices()
+ .stream()
+ .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity()));
+ for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) {
+ final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName());
+ if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
+ logger.trace(
+ "updating metadata for [{}], changing version from [{}] to [{}]",
+ indexMetadata.getIndex(),
+ previousVersion,
+ indexMetadata.getVersion()
+ );
+ numIndicesUpdated++;
+ final String indexMetadataKey = writeIndexMetadata(
+ clusterState.getClusterName().value(),
+ clusterState.getMetadata().clusterUUID(),
+ indexMetadata,
+ indexMetadataFileName(indexMetadata)
+ );
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(
+ indexMetadata.getIndex().getName(),
+ indexMetadata.getIndexUUID(),
+ indexMetadataKey
+ );
+ allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata);
+ } else {
+ numIndicesUnchanged++;
+ }
+ previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
+ }
+
+ for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
+ allUploadedIndexMetadata.remove(removedIndexName);
+ }
+ final ClusterMetadataMarker marker = uploadMarker(
+ clusterState,
+ allUploadedIndexMetadata.values().stream().collect(Collectors.toList()),
+ false
+ );
+ final long durationMillis = relativeTimeMillisSupplier.getAsLong() - startTimeMillis;
+ if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
+ logger.warn(
+ "writing cluster state took [{}ms] which is above the warn threshold of [{}]; "
+ + "wrote metadata for [{}] indices and skipped [{}] unchanged indices",
+ durationMillis,
+ slowWriteLoggingThreshold,
+ numIndicesUpdated,
+ numIndicesUnchanged
+ );
+ } else {
+ // todo change to debug
+ logger.info(
+ "writing cluster state took [{}ms]; " + "wrote and metadata for [{}] indices and skipped [{}] unchanged indices",
+ durationMillis,
+ numIndicesUpdated,
+ numIndicesUnchanged
+ );
+ }
+ return marker;
+ }
+
+ @Nullable
+ public ClusterMetadataMarker markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataMarker previousMarker)
+ throws IOException {
+ if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
+ logger.error("Local node is not elected cluster manager. Exiting");
+ return null;
+ }
+ assert clusterState != null : "Last accepted cluster state is not set";
+ assert previousMarker != null : "Last cluster metadata marker is not set";
+ assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
+ return uploadMarker(clusterState, previousMarker.getIndices(), true);
+ }
+
+ public ClusterState getLatestClusterState(String clusterUUID) {
+ // todo
+ return null;
+ }
+
+ // Visible for testing
+ void ensureRepositorySet() {
+ if (blobStoreRepository != null) {
+ return;
+ }
+ final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings);
+ assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
+ final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
+ assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
+ blobStoreRepository = (BlobStoreRepository) repository;
+ }
+
+ private ClusterMetadataMarker uploadMarker(
+ ClusterState clusterState,
+ List uploadedIndexMetadata,
+ boolean committed
+ ) throws IOException {
+ synchronized (this) {
+ final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version());
+ final ClusterMetadataMarker marker = new ClusterMetadataMarker(
+ clusterState.term(),
+ clusterState.getVersion(),
+ clusterState.metadata().clusterUUID(),
+ clusterState.stateUUID(),
+ Version.CURRENT,
+ committed,
+ uploadedIndexMetadata
+ );
+ writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName);
+ return marker;
+ }
+ }
+
+ private String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata uploadIndexMetadata, String fileName)
+ throws IOException {
+ final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, uploadIndexMetadata.getIndexUUID());
+ INDEX_METADATA_FORMAT.write(uploadIndexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor());
+ // returning full path
+ return indexMetadataContainer.path().buildAsString() + fileName;
+ }
+
+ private void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker uploadMarker, String fileName)
+ throws IOException {
+ final BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID);
+ CLUSTER_METADATA_MARKER_FORMAT.write(uploadMarker, metadataMarkerContainer, fileName, blobStoreRepository.getCompressor());
+ }
+
+ private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
+ // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
+ return blobStoreRepository.blobStore()
+ .blobContainer(
+ blobStoreRepository.basePath()
+ .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
+ .add("cluster-state")
+ .add(clusterUUID)
+ .add("index")
+ .add(indexUUID)
+ );
+ }
+
+ private BlobContainer markerContainer(String clusterName, String clusterUUID) {
+ // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker
+ return blobStoreRepository.blobStore()
+ .blobContainer(
+ blobStoreRepository.basePath()
+ .add(Base64.getUrlEncoder().withoutPadding().encodeToString(clusterName.getBytes(StandardCharsets.UTF_8)))
+ .add("cluster-state")
+ .add(clusterUUID)
+ .add("marker")
+ );
+ }
+
+ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
+ this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
+ }
+
+ private static String getMarkerFileName(long term, long version) {
+ // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker
+ return String.join(
+ DELIMITER,
+ "marker",
+ RemoteStoreUtils.invertLong(term),
+ RemoteStoreUtils.invertLong(version),
+ RemoteStoreUtils.invertLong(System.currentTimeMillis())
+ );
+ }
+
+ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
+ return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis()));
+ }
+
+}
diff --git a/server/src/main/java/org/opensearch/gateway/remote/package-info.java b/server/src/main/java/org/opensearch/gateway/remote/package-info.java
new file mode 100644
index 0000000000000..286e739f66289
--- /dev/null
+++ b/server/src/main/java/org/opensearch/gateway/remote/package-info.java
@@ -0,0 +1,12 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+/**
+ * Package containing class to perform operations on remote cluster state
+ */
+package org.opensearch.gateway.remote;
diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java
index 693022a60cc09..c5b18378c2060 100644
--- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java
@@ -789,6 +789,10 @@ public RepositoryMetadata getMetadata() {
return metadata;
}
+ public Compressor getCompressor() {
+ return compressor;
+ }
+
@Override
public RepositoryStats stats() {
final BlobStore store = blobStore.get();
diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java
new file mode 100644
index 0000000000000..d2d7b85579a89
--- /dev/null
+++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataMarkerTests.java
@@ -0,0 +1,46 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.Version;
+import org.opensearch.common.xcontent.json.JsonXContent;
+import org.opensearch.core.common.bytes.BytesReference;
+import org.opensearch.core.xcontent.ToXContent;
+import org.opensearch.core.xcontent.XContentBuilder;
+import org.opensearch.core.xcontent.XContentParser;
+import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata;
+import org.opensearch.test.OpenSearchTestCase;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class ClusterMetadataMarkerTests extends OpenSearchTestCase {
+
+ public void testXContent() throws IOException {
+ UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path");
+ ClusterMetadataMarker originalMarker = new ClusterMetadataMarker(
+ 1L,
+ 1L,
+ "test-cluster-uuid",
+ "test-state-uuid",
+ Version.CURRENT,
+ false,
+ Collections.singletonList(uploadedIndexMetadata)
+ );
+ final XContentBuilder builder = JsonXContent.contentBuilder();
+ builder.startObject();
+ originalMarker.toXContent(builder, ToXContent.EMPTY_PARAMS);
+ builder.endObject();
+
+ try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
+ final ClusterMetadataMarker fromXContentMarker = ClusterMetadataMarker.fromXContent(parser);
+ assertEquals(originalMarker, fromXContentMarker);
+ }
+ }
+}
diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
new file mode 100644
index 0000000000000..243b4e0f55516
--- /dev/null
+++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java
@@ -0,0 +1,254 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.gateway.remote;
+
+import org.opensearch.Version;
+import org.opensearch.cluster.ClusterName;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.coordination.CoordinationMetadata;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.cluster.metadata.Metadata;
+import org.opensearch.cluster.node.DiscoveryNodes;
+import org.opensearch.common.blobstore.BlobContainer;
+import org.opensearch.common.blobstore.BlobPath;
+import org.opensearch.common.blobstore.BlobStore;
+import org.opensearch.common.compress.DeflateCompressor;
+import org.opensearch.common.settings.ClusterSettings;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.core.index.Index;
+import org.opensearch.gateway.remote.ClusterMetadataMarker.UploadedIndexMetadata;
+import org.opensearch.repositories.FilterRepository;
+import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.repositories.RepositoryMissingException;
+import org.opensearch.repositories.blobstore.BlobStoreRepository;
+import org.opensearch.test.OpenSearchTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.mockito.ArgumentMatchers;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class RemoteClusterStateServiceTests extends OpenSearchTestCase {
+
+ private RemoteClusterStateService remoteClusterStateService;
+ private Supplier repositoriesServiceSupplier;
+ private RepositoriesService repositoriesService;
+ private BlobStoreRepository blobStoreRepository;
+
+ @Before
+ public void setup() {
+ repositoriesServiceSupplier = mock(Supplier.class);
+ repositoriesService = mock(RepositoriesService.class);
+ when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
+ final Settings settings = Settings.builder()
+ .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
+ .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.getKey(), "remote_store_repository")
+ .build();
+ blobStoreRepository = mock(BlobStoreRepository.class);
+ when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository);
+ remoteClusterStateService = new RemoteClusterStateService(
+ repositoriesServiceSupplier,
+ settings,
+ new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+ () -> 0L
+ );
+ }
+
+ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().build();
+ final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState);
+ Assert.assertThat(marker, nullValue());
+ }
+
+ public void testFailWriteFullMetadataWhenRemoteStateDisabled() throws IOException {
+ final Settings settings = Settings.builder().build();
+ remoteClusterStateService = spy(
+ new RemoteClusterStateService(
+ repositoriesServiceSupplier,
+ settings,
+ new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
+ () -> 0L
+ )
+ );
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ assertThrows(AssertionError.class, () -> remoteClusterStateService.writeFullMetadata(clusterState));
+ }
+
+ public void testFailWriteFullMetadataWhenRepositoryNotSet() {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ doThrow(new RepositoryMissingException("repository missing")).when(repositoriesService).repository("remote_store_repository");
+ assertThrows(RepositoryMissingException.class, () -> remoteClusterStateService.writeFullMetadata(clusterState));
+ }
+
+ public void testFailWriteFullMetadataWhenNotBlobRepository() {
+ final FilterRepository filterRepository = mock(FilterRepository.class);
+ when(repositoriesService.repository("remote_store_repository")).thenReturn(filterRepository);
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ assertThrows(AssertionError.class, () -> remoteClusterStateService.writeFullMetadata(clusterState));
+ }
+
+ public void testWriteFullMetadataSuccess() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ mockBlobStoreObjects();
+ final ClusterMetadataMarker marker = remoteClusterStateService.writeFullMetadata(clusterState);
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename");
+ List indices = List.of(uploadedIndexMetadata);
+
+ final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder()
+ .indices(indices)
+ .clusterTerm(1L)
+ .stateVersion(1L)
+ .stateUUID("state-uuid")
+ .clusterUUID("cluster-uuid")
+ .build();
+
+ assertThat(marker.getIndices().size(), is(1));
+ assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
+ assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
+ assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue());
+ assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm()));
+ assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion()));
+ assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID()));
+ assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID()));
+ }
+
+ public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().build();
+ final ClusterMetadataMarker marker = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null);
+ Assert.assertThat(marker, nullValue());
+ }
+
+ public void testFailWriteIncrementalMetadataWhenTermChanged() {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(2L).build();
+ final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT)
+ .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata))
+ .build();
+ assertThrows(
+ AssertionError.class,
+ () -> remoteClusterStateService.writeIncrementalMetadata(previousClusterState, clusterState, null)
+ );
+ }
+
+ public void testWriteIncrementalMetadataSuccess() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ mockBlobStoreObjects();
+ final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
+ final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT)
+ .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata))
+ .build();
+
+ final ClusterMetadataMarker previousMarker = ClusterMetadataMarker.builder().indices(Collections.emptyList()).build();
+
+ remoteClusterStateService.ensureRepositorySet();
+ final ClusterMetadataMarker marker = remoteClusterStateService.writeIncrementalMetadata(
+ previousClusterState,
+ clusterState,
+ previousMarker
+ );
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename");
+ final List indices = List.of(uploadedIndexMetadata);
+
+ final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder()
+ .indices(indices)
+ .clusterTerm(1L)
+ .stateVersion(1L)
+ .stateUUID("state-uuid")
+ .clusterUUID("cluster-uuid")
+ .build();
+
+ assertThat(marker.getIndices().size(), is(1));
+ assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
+ assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
+ assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue());
+ assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm()));
+ assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion()));
+ assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID()));
+ assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID()));
+ }
+
+ public void testMarkLastStateAsCommittedSuccess() throws IOException {
+ final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
+ mockBlobStoreObjects();
+ remoteClusterStateService.ensureRepositorySet();
+ final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename");
+ List indices = List.of(uploadedIndexMetadata);
+ final ClusterMetadataMarker previousMarker = ClusterMetadataMarker.builder().indices(indices).build();
+
+ final ClusterMetadataMarker marker = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousMarker);
+
+ final ClusterMetadataMarker expectedMarker = ClusterMetadataMarker.builder()
+ .indices(indices)
+ .clusterTerm(1L)
+ .stateVersion(1L)
+ .stateUUID("state-uuid")
+ .clusterUUID("cluster-uuid")
+ .build();
+
+ assertThat(marker.getIndices().size(), is(1));
+ assertThat(marker.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
+ assertThat(marker.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
+ assertThat(marker.getIndices().get(0).getUploadedFilename(), notNullValue());
+ assertThat(marker.getClusterTerm(), is(expectedMarker.getClusterTerm()));
+ assertThat(marker.getStateVersion(), is(expectedMarker.getStateVersion()));
+ assertThat(marker.getClusterUUID(), is(expectedMarker.getClusterUUID()));
+ assertThat(marker.getStateUUID(), is(expectedMarker.getStateUUID()));
+ }
+
+ private void mockBlobStoreObjects() {
+ final BlobStore blobStore = mock(BlobStore.class);
+ when(blobStoreRepository.blobStore()).thenReturn(blobStore);
+ final BlobPath blobPath = mock(BlobPath.class);
+ when((blobStoreRepository.basePath())).thenReturn(blobPath);
+ when(blobPath.add(anyString())).thenReturn(blobPath);
+ when(blobPath.buildAsString()).thenReturn("/blob/path/");
+ final BlobContainer blobContainer = mock(BlobContainer.class);
+ when(blobContainer.path()).thenReturn(blobPath);
+ when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer);
+ when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor());
+ }
+
+ private static ClusterState.Builder generateClusterStateWithOneIndex() {
+ final Index index = new Index("test-index", "index-uuid");
+ final Settings idxSettings = Settings.builder()
+ .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+ .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
+ .build();
+ final IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName()).settings(idxSettings)
+ .numberOfShards(1)
+ .numberOfReplicas(0)
+ .build();
+ final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
+
+ return ClusterState.builder(ClusterName.DEFAULT)
+ .version(1L)
+ .stateUUID("state-uuid")
+ .metadata(
+ Metadata.builder().put(indexMetadata, true).clusterUUID("cluster-uuid").coordinationMetadata(coordinationMetadata).build()
+ );
+ }
+
+ private static DiscoveryNodes nodesWithLocalNodeClusterManager() {
+ return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build();
+ }
+
+}