diff --git a/CHANGELOG.md b/CHANGELOG.md index e14e20ce212e4..cbdff54911b82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979)) - Optimize regexp-based include/exclude on aggregations when pattern matches prefixes ([#14371](https://github.com/opensearch-project/OpenSearch/pull/14371)) - Replace and block usages of org.apache.logging.log4j.util.Strings ([#15238](https://github.com/opensearch-project/OpenSearch/pull/15238)) +- Remote publication using minimum node version for backward compatibility ([#15216](https://github.com/opensearch-project/OpenSearch/pull/15216)) + ### Deprecated diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index bbfa265d50bcf..26d23eeeb8bf5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -1338,6 +1338,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) coordinationState.get().isRemotePublicationEnabled(), persistedStateRegistry ); + logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString()); final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState); final CoordinatorPublication publication = new CoordinatorPublication( diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 62885a12222be..f7e4eff655e9d 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -62,6 +62,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -97,6 +98,7 @@ public class PublicationTransportHandler { private final AtomicLong fullClusterStateReceivedCount = new AtomicLong(); private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong(); private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong(); + private final AtomicBoolean allNodesRemotePublicationEnabled = new AtomicBoolean(); // -> no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder() @@ -332,11 +334,18 @@ public PublicationContext newPublicationContext( boolean isRemotePublicationEnabled, PersistedStateRegistry persistedStateRegistry ) { - final PublicationContext publicationContext = new PublicationContext( - clusterChangedEvent, - isRemotePublicationEnabled, - persistedStateRegistry - ); + if (isRemotePublicationEnabled == true) { + if (allNodesRemotePublicationEnabled.get() == false) { + if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) { + allNodesRemotePublicationEnabled.set(true); + } + } + if (allNodesRemotePublicationEnabled.get() == true) { + // if all nodes are remote then create remote publication context + return new RemotePublicationContext(clusterChangedEvent, persistedStateRegistry); + } + } + final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, persistedStateRegistry); // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and @@ -345,6 +354,17 @@ public PublicationContext newPublicationContext( return publicationContext; } + private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) { + assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0; + for (DiscoveryNode node : discoveryNodes.getNodes().values()) { + // if a node is non-remote then created local publication context + if (node.isRemoteStatePublicationEnabled() == false) { + return false; + } + } + return true; + } + // package private for testing void setCurrentPublishRequestToSelf(PublishRequest publishRequest) { this.currentPublishRequestToSelf.set(publishRequest); @@ -385,25 +405,19 @@ private static BytesReference serializeDiffClusterState(Diff diff, */ public class PublicationContext { - private final DiscoveryNodes discoveryNodes; - private final ClusterState newState; - private final ClusterState previousState; - private final boolean sendFullVersion; + protected final DiscoveryNodes discoveryNodes; + protected final ClusterState newState; + protected final ClusterState previousState; + protected final boolean sendFullVersion; private final Map serializedStates = new HashMap<>(); private final Map serializedDiffs = new HashMap<>(); - private final boolean sendRemoteState; - private final PersistedStateRegistry persistedStateRegistry; + protected final PersistedStateRegistry persistedStateRegistry; - PublicationContext( - ClusterChangedEvent clusterChangedEvent, - boolean isRemotePublicationEnabled, - PersistedStateRegistry persistedStateRegistry - ) { + PublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) { discoveryNodes = clusterChangedEvent.state().nodes(); newState = clusterChangedEvent.state(); previousState = clusterChangedEvent.previousState(); sendFullVersion = previousState.getBlocks().disableStatePersistence(); - sendRemoteState = isRemotePublicationEnabled; this.persistedStateRegistry = persistedStateRegistry; } @@ -468,17 +482,7 @@ public void onFailure(Exception e) { } else { responseActionListener = listener; } - // TODO Decide to send remote state before starting publication by checking remote publication on all nodes - if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) { - logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination); - sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener); - } else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { - logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); - sendFullClusterState(destination, responseActionListener); - } else { - logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination); - sendClusterStateDiff(destination, responseActionListener); - } + sendClusterState(destination, responseActionListener); } public void sendApplyCommit( @@ -517,58 +521,14 @@ public String executor() { ); } - private void sendRemoteClusterState( - final DiscoveryNode destination, - final ClusterState clusterState, - final ActionListener listener - ) { - try { - final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)) - .getLastUploadedManifestFile(); - final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( - discoveryNodes.getLocalNode(), - clusterState.term(), - clusterState.getVersion(), - clusterState.getClusterName().value(), - clusterState.metadata().clusterUUID(), - manifestFileName - ); - final Consumer transportExceptionHandler = exp -> { - logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp); - listener.onFailure(exp); - }; - final TransportResponseHandler responseHandler = new TransportResponseHandler<>() { - - @Override - public PublishWithJoinResponse read(StreamInput in) throws IOException { - return new PublishWithJoinResponse(in); - } - - @Override - public void handleResponse(PublishWithJoinResponse response) { - listener.onResponse(response); - } - - @Override - public void handleException(TransportException exp) { - transportExceptionHandler.accept(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } - }; - transportService.sendRequest( - destination, - PUBLISH_REMOTE_STATE_ACTION_NAME, - remotePublishRequest, - stateRequestOptions, - responseHandler - ); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); - listener.onFailure(e); + public void sendClusterState(DiscoveryNode destination, ActionListener listener) { + logger.info("sending cluster state over transport to node: {}", destination.getName()); + if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { + logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); + sendFullClusterState(destination, listener); + } else { + logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination); + sendClusterStateDiff(destination, listener); } } @@ -648,4 +608,69 @@ public String executor() { } } + /** + * An extension of {@code PublicationContext} to support remote cluster state publication + * + * @opensearch.internal + */ + public class RemotePublicationContext extends PublicationContext { + + RemotePublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) { + super(clusterChangedEvent, persistedStateRegistry); + } + + @Override + public void sendClusterState(final DiscoveryNode destination, final ActionListener listener) { + try { + logger.info("sending remote cluster state to node: {}", destination.getName()); + final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)) + .getLastUploadedManifestFile(); + final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + discoveryNodes.getLocalNode(), + newState.term(), + newState.getVersion(), + newState.getClusterName().value(), + newState.metadata().clusterUUID(), + manifestFileName + ); + final Consumer transportExceptionHandler = exp -> { + logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp); + listener.onFailure(exp); + }; + final TransportResponseHandler responseHandler = new TransportResponseHandler<>() { + + @Override + public PublishWithJoinResponse read(StreamInput in) throws IOException { + return new PublishWithJoinResponse(in); + } + + @Override + public void handleResponse(PublishWithJoinResponse response) { + listener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + transportExceptionHandler.accept(exp); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + }; + transportService.sendRequest( + destination, + PUBLISH_REMOTE_STATE_ACTION_NAME, + remotePublishRequest, + stateRequestOptions, + responseHandler + ); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); + listener.onFailure(e); + } + } + } + } diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 811a5161a6f6e..95b109dcdc4f0 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -702,7 +702,12 @@ public String getLastUploadedManifestFile() { public void setLastAcceptedState(ClusterState clusterState) { try { final RemoteClusterStateManifestInfo manifestDetails; - if (shouldWriteFullClusterState(clusterState)) { + // Decide the codec version + int codecVersion = ClusterMetadataManifest.getCodecForVersion(clusterState.nodes().getMinNodeVersion()); + assert codecVersion >= 0 : codecVersion; + logger.info("codec version is {}", codecVersion); + + if (shouldWriteFullClusterState(clusterState, codecVersion)) { final Optional latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest( clusterState.getClusterName().value(), clusterState.metadata().clusterUUID() @@ -719,7 +724,7 @@ public void setLastAcceptedState(ClusterState clusterState) { clusterState.metadata().clusterUUID() ); } - manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID); + manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, codecVersion); } else { assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true : "Previous manifest and previous ClusterState are not in sync"; @@ -759,11 +764,13 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, return true; } - private boolean shouldWriteFullClusterState(ClusterState clusterState) { + private boolean shouldWriteFullClusterState(ClusterState clusterState, int codecVersion) { + assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion; if (lastAcceptedState == null || lastAcceptedManifest == null || lastAcceptedState.term() != clusterState.term() - || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) { + || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT + || lastAcceptedManifest.getCodecVersion() != codecVersion) { return true; } return false; diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 71815b6ee324c..ca730a73dd89b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -20,10 +20,10 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.gateway.remote.ClusterMetadataManifest.Builder; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -46,6 +46,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment { // required for state publication public static final int CODEC_V3 = 3; // In Codec V3, we have introduced new diff field in diff-manifest's routing_table_diff + public static final int[] CODEC_VERSIONS = { CODEC_V0, CODEC_V1, CODEC_V2, CODEC_V3 }; + private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term"); private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version"); private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); @@ -237,12 +239,34 @@ private static ClusterStateDiffManifest diffManifest(Object[] fields) { ); private static final ConstructingObjectParser CURRENT_PARSER = PARSER_V3; + public static final int MANIFEST_CURRENT_CODEC_VERSION = CODEC_V3; + + private static final Map VERSION_TO_CODEC_MAPPING; static { declareParser(PARSER_V0, CODEC_V0); declareParser(PARSER_V1, CODEC_V1); declareParser(PARSER_V2, CODEC_V2); declareParser(PARSER_V3, CODEC_V3); + + assert Arrays.stream(CODEC_VERSIONS).max().getAsInt() == MANIFEST_CURRENT_CODEC_VERSION; + Map versionToCodecMapping = new HashMap<>(); + for (Version version : Version.getDeclaredVersions(Version.class)) { + if (version.onOrAfter(Version.V_2_10_0) && version.before(Version.V_2_12_0)) { + versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V0); + } else if (version.onOrAfter(Version.V_2_12_0) && version.before(Version.V_2_15_0)) { + versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V1); + } else if (version.onOrAfter(Version.V_2_15_0) && version.before(Version.V_2_16_0)) { + versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V2); + } else if (version.onOrAfter(Version.V_2_16_0)) { + versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V3); + } + } + VERSION_TO_CODEC_MAPPING = Collections.unmodifiableMap(versionToCodecMapping); + } + + public static int getCodecForVersion(Version version) { + return VERSION_TO_CODEC_MAPPING.getOrDefault(version, -1); } private static void declareParser(ConstructingObjectParser parser, long codec_version) { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 910f601a81ca8..05be827e65dc3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -217,7 +217,8 @@ public RemoteClusterStateService( * @return A manifest object which contains the details of uploaded entity metadata. */ @Nullable - public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { + public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID, int codecVersion) + throws IOException { final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); @@ -252,7 +253,8 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, - false + false, + codecVersion ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -443,7 +445,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, - false + false, + previousManifest.getCodecVersion() ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -878,7 +881,8 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clus uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), - true + true, + previousManifest.getCodecVersion() ); if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) { remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifestDetails.getClusterMetadataManifest()); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index 0ccadd7dd18da..67b1b77447bed 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -98,7 +98,8 @@ RemoteClusterStateManifestInfo uploadManifest( RemoteClusterStateUtils.UploadedMetadataResults uploadedMetadataResult, String previousClusterUUID, ClusterStateDiffManifest clusterDiffManifest, - boolean committed + boolean committed, + int codecVersion ) { synchronized (this) { ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder(); @@ -109,7 +110,7 @@ RemoteClusterStateManifestInfo uploadManifest( .opensearchVersion(Version.CURRENT) .nodeId(nodeId) .committed(committed) - .codecVersion(RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION) + .codecVersion(codecVersion) .indices(uploadedMetadataResult.uploadedIndexMetadata) .previousClusterUUID(previousClusterUUID) .clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted()) diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java index 5f79b690af574..e890c30700eb6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java @@ -35,7 +35,6 @@ public class RemoteClusterMetadataManifest extends AbstractClusterMetadataWritea public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6; public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; - public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3; public static final String COMMITTED = "C"; public static final String PUBLISHED = "P"; @@ -150,7 +149,7 @@ int getManifestCodecVersion() { private ChecksumBlobStoreFormat getClusterMetadataManifestBlobStoreFormat() { long codecVersion = getManifestCodecVersion(); - if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { + if (codecVersion == ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION) { return CLUSTER_METADATA_MANIFEST_FORMAT; } else if (codecVersion == ClusterMetadataManifest.CODEC_V2) { return CLUSTER_METADATA_MANIFEST_FORMAT_V2; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 3ee2278aec546..ee9a2951ec541 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -67,6 +67,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; @@ -947,7 +948,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep .previousClusterUUID(randomAlphaOfLength(10)) .clusterUUIDCommitted(true) .build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)) + Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION)) .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); @@ -978,7 +979,8 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings); coordinationState.handlePrePublish(clusterState); - Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState, previousClusterUUID); + Mockito.verify(remoteClusterStateService, Mockito.times(1)) + .writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState)); Mockito.when(remoteClusterStateService.markLastStateAsCommitted(any(), any())) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java index 08e3f47100d8c..65bc228cd5704 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTransportHandlerTests.java @@ -37,31 +37,43 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; +import org.opensearch.cluster.coordination.PublicationTransportHandler.PublicationContext; +import org.opensearch.cluster.coordination.PublicationTransportHandler.RemotePublicationContext; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.gateway.GatewayMetaState.RemotePersistedState; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.test.transport.CapturingTransport.CapturedRequest; import org.opensearch.transport.TransportService; import org.junit.Before; import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.function.Function; import org.mockito.Mockito; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; @@ -80,6 +92,8 @@ public class PublicationTransportHandlerTests extends OpenSearchTestCase { private DiscoveryNode localNode; private DiscoveryNode secondNode; + private CapturingTransport capturingTransport; + @Before public void setup() { deterministicTaskQueue = new DeterministicTaskQueue( @@ -89,7 +103,8 @@ public void setup() { final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); localNode = new DiscoveryNode(LOCAL_NODE_ID, buildNewFakeTransportAddress(), Version.CURRENT); secondNode = new DiscoveryNode("secondNode", buildNewFakeTransportAddress(), Version.CURRENT); - transportService = new CapturingTransport().createTransportService( + capturingTransport = new CapturingTransport(); + transportService = capturingTransport.createTransportService( Settings.EMPTY, deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, @@ -288,6 +303,74 @@ public void testHandleIncomingRemotePublishRequestWhenNoLastSeenState() throws I Mockito.verify(remoteClusterStateService, times(1)).getClusterMetadataManifestByFileName(Mockito.any(), Mockito.any()); } + public void testNewPublicationContext() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + + // Remote publication disabled + ClusterChangedEvent event1 = new ClusterChangedEvent( + "source1", + buildClusterState(TERM, VERSION + 1), + buildClusterState(TERM, VERSION) + ); + PublicationContext publicationContext = handler.newPublicationContext(event1, false, new PersistedStateRegistry()); + assertNotNull(publicationContext); + assertThat(publicationContext, not(instanceOf(RemotePublicationContext.class))); + + // Remote publication enabled but some nodes are remote enabled and some remote disabled + ClusterChangedEvent event2 = new ClusterChangedEvent( + "source2", + buildClusterStateWithMixedNodes(TERM, VERSION + 1), + buildClusterState(TERM, VERSION) + ); + PublicationContext publicationContext2 = handler.newPublicationContext(event2, true, new PersistedStateRegistry()); + assertNotNull(publicationContext2); + assertThat(publicationContext2, not(instanceOf(RemotePublicationContext.class))); + + // Remote publication enabled and all nodes are remote enabled + ClusterChangedEvent event3 = new ClusterChangedEvent( + "source3", + buildClusterStateWithRemoteNodes(TERM, VERSION + 1), + buildClusterState(TERM, VERSION) + ); + PublicationContext publicationContext3 = handler.newPublicationContext(event3, true, new PersistedStateRegistry()); + assertNotNull(publicationContext3); + assertThat(publicationContext3, instanceOf(RemotePublicationContext.class)); + } + + public void testRemotePublicationContext() throws Exception { + ClusterChangedEvent event = new ClusterChangedEvent( + "source3", + buildClusterStateWithRemoteNodes(TERM, VERSION + 1), + buildClusterState(TERM, VERSION) + ); + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + PublishWithJoinResponse expectedPublishResponse = new PublishWithJoinResponse(new PublishResponse(TERM, VERSION), Optional.empty()); + Function handlePublishRequest = p -> expectedPublishResponse; + final PublicationTransportHandler handler = getPublicationTransportHandler(handlePublishRequest, remoteClusterStateService); + PersistedStateRegistry persistedStateRegistry = new PersistedStateRegistry(); + RemotePersistedState remotePersistedState = mock(RemotePersistedState.class); + when(remotePersistedState.getLastUploadedManifestFile()).thenReturn("/path/to/manifest"); + persistedStateRegistry.addPersistedState(PersistedStateType.REMOTE, remotePersistedState); + + PublicationContext publicationContext3 = handler.newPublicationContext(event, true, persistedStateRegistry); + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(PublishWithJoinResponse publishWithJoinResponse) {} + + @Override + public void onFailure(Exception e) {} + }; + DiscoveryNode discoveryNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); + publicationContext3.sendClusterState(discoveryNode, listener); + CapturedRequest[] capturedRequests1 = capturingTransport.getCapturedRequestsAndClear(); + assertThat(capturedRequests1.length, equalTo(1)); + CapturedRequest capturedRequest1 = capturedRequests1[0]; + assertThat(capturedRequest1.request, instanceOf(RemotePublishRequest.class)); + } + private PublicationTransportHandler getPublicationTransportHandler( Function handlePublishRequest, RemoteClusterStateService remoteClusterStateService @@ -310,4 +393,54 @@ private ClusterState buildClusterState(long term, long version) { DiscoveryNodes nodes = DiscoveryNodes.builder().add(localNode).add(secondNode).localNodeId(LOCAL_NODE_ID).build(); return ClusterState.builder(ClusterState.EMPTY_STATE).version(version).metadata(newMetadata).nodes(nodes).build(); } + + private ClusterState buildClusterStateWithMixedNodes(long term, long version) { + CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder().term(term); + Metadata newMetadata = Metadata.builder().coordinationMetadata(coordMetadataBuilder.build()).build(); + DiscoveryNode remoteNode = new DiscoveryNode( + "remoteNode", + buildNewFakeTransportAddress(), + Map.of( + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "remote_state_repo", + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "remote_routing_repo" + ), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(localNode).add(remoteNode).localNodeId(LOCAL_NODE_ID).build(); + return ClusterState.builder(ClusterState.EMPTY_STATE).version(version).metadata(newMetadata).nodes(nodes).build(); + } + + private ClusterState buildClusterStateWithRemoteNodes(long term, long version) { + CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder().term(term); + Metadata newMetadata = Metadata.builder().coordinationMetadata(coordMetadataBuilder.build()).build(); + DiscoveryNode remoteNode1 = new DiscoveryNode( + "remoteNode1", + buildNewFakeTransportAddress(), + Map.of( + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "remote_state_repo", + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "remote_routing_repo" + ), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNode remoteNode2 = new DiscoveryNode( + "remoteNode2", + buildNewFakeTransportAddress(), + Map.of( + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "remote_state_repo", + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "remote_routing_repo" + ), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(remoteNode1).add(remoteNode2).localNodeId(remoteNode1.getId()).build(); + return ClusterState.builder(ClusterState.EMPTY_STATE).version(version).metadata(newMetadata).nodes(nodes).build(); + } } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 8e8d80c870ddf..529302bea6758 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -98,6 +98,8 @@ import org.mockito.Mockito; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; @@ -109,6 +111,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -213,6 +216,18 @@ private ClusterState createClusterState(long version, Metadata metadata) { .build(); } + private ClusterState createClusterStateWithNodes(long version, Metadata metadata) { + DiscoveryNode oldNode = new DiscoveryNode( + "node2", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Sets.newHashSet(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.V_2_13_0 + ); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).add(oldNode).build(); + return ClusterState.builder(clusterName).nodes(discoveryNodes).version(version).metadata(metadata).build(); + } + private CoordinationMetadata createCoordinationMetadata(long term) { CoordinationMetadata.Builder builder = CoordinationMetadata.builder(); builder.term(term); @@ -725,7 +740,7 @@ public void testRemotePersistedState() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION))) .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) @@ -742,7 +757,7 @@ public void testRemotePersistedState() throws IOException { ); remotePersistedState.setLastAcceptedState(clusterState); - Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID); + Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); assertThat(remotePersistedState.getLastAcceptedState(), equalTo(clusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); @@ -753,7 +768,8 @@ public void testRemotePersistedState() throws IOException { ); remotePersistedState.setLastAcceptedState(secondClusterState); - Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(secondClusterState, previousClusterUUID); + Mockito.verify(remoteClusterStateService, times(1)) + .writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); @@ -776,6 +792,54 @@ public void testRemotePersistedState() throws IOException { assertThat(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted(), equalTo(true)); } + public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOException { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + final String previousClusterUUID = "prev-cluster-uuid"; + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(5L) + .codecVersion(CODEC_V1) + .opensearchVersion(Version.CURRENT) + .build(); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(CODEC_V1))) + .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest2")); + + CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); + + ClusterState clusterState1 = createClusterStateWithNodes( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build() + ); + remotePersistedState.setLastAcceptedState(clusterState1); + + Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState1, previousClusterUUID, CODEC_V1); + + ClusterState clusterState2 = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build() + ); + final ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(5L) + .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) + .opensearchVersion(Version.CURRENT) + .build(); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION))) + .thenReturn(new RemoteClusterStateManifestInfo(manifest2, "path/to/manifest")); + remotePersistedState.setLastAcceptedState(clusterState2); + Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState2, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + + ClusterState clusterState3 = createClusterState( + randomNonNegativeLong(), + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build() + ); + Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) + .thenReturn(new RemoteClusterStateManifestInfo(manifest2, "path/to/manifest3")); + remotePersistedState.setLastAcceptedState(clusterState3); + Mockito.verify(remoteClusterStateService).writeIncrementalMetadata(clusterState2, clusterState3, manifest2); + + } + public void testRemotePersistedStateNotCommitted() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final String previousClusterUUID = "prev-cluster-uuid"; @@ -786,7 +850,7 @@ public void testRemotePersistedStateNotCommitted() throws IOException { .build(); Mockito.when(remoteClusterStateService.getLatestClusterMetadataManifest(Mockito.any(), Mockito.any())) .thenReturn(Optional.of(manifest)); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION))) .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) @@ -811,14 +875,17 @@ public void testRemotePersistedStateNotCommitted() throws IOException { remotePersistedState.setLastAcceptedState(clusterState); ArgumentCaptor previousClusterUUIDCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor clusterStateCaptor = ArgumentCaptor.forClass(ClusterState.class); - Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture()); + Mockito.verify(remoteClusterStateService) + .writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture(), eq(MANIFEST_CURRENT_CODEC_VERSION)); assertEquals(previousClusterUUID, previousClusterUUIDCaptor.getValue()); } public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any()); + Mockito.doThrow(IOException.class) + .when(remoteClusterStateService) + .writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); @@ -835,7 +902,9 @@ public void testRemotePersistedStateFailureStats() throws IOException { RemotePersistenceStats remoteStateStats = new RemotePersistenceStats(); final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any()); + Mockito.doThrow(IOException.class) + .when(remoteClusterStateService) + .writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)); when(remoteClusterStateService.getStats()).thenReturn(remoteStateStats); doCallRealMethod().when(remoteClusterStateService).writeMetadataFailed(); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 8a6dd6bc96e72..418cfa104a911 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -39,6 +39,8 @@ import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V3; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; @@ -115,7 +117,7 @@ public void testClusterMetadataManifestXContent() throws IOException { .opensearchVersion(Version.CURRENT) .nodeId("test-node-id") .committed(false) - .codecVersion(ClusterMetadataManifest.CODEC_V2) + .codecVersion(CODEC_V2) .indices(Collections.singletonList(uploadedIndexMetadata)) .previousClusterUUID("prev-cluster-uuid") .clusterUUIDCommitted(true) @@ -162,7 +164,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { .opensearchVersion(Version.CURRENT) .nodeId("B10RX1f5RJenMQvYccCgSQ") .committed(true) - .codecVersion(ClusterMetadataManifest.CODEC_V3) + .codecVersion(CODEC_V3) .indices(randomUploadedIndexMetadataList()) .previousClusterUUID("yfObdx8KSMKKrXf8UyHhM") .clusterUUIDCommitted(true) @@ -495,7 +497,7 @@ public void testClusterMetadataManifestXContentV2() throws IOException { .opensearchVersion(Version.CURRENT) .nodeId("test-node-id") .committed(false) - .codecVersion(ClusterMetadataManifest.CODEC_V2) + .codecVersion(CODEC_V2) .indices(Collections.singletonList(uploadedIndexMetadata)) .previousClusterUUID("prev-cluster-uuid") .clusterUUIDCommitted(true) @@ -561,7 +563,7 @@ public void testClusterMetadataManifestXContentV3() throws IOException { .opensearchVersion(Version.CURRENT) .nodeId("test-node-id") .committed(false) - .codecVersion(ClusterMetadataManifest.CODEC_V3) + .codecVersion(CODEC_V3) .indices(Collections.singletonList(uploadedIndexMetadata)) .previousClusterUUID("prev-cluster-uuid") .clusterUUIDCommitted(true) @@ -630,7 +632,7 @@ public void testClusterMetadataManifestXContentV2WithoutEphemeral() throws IOExc .opensearchVersion(Version.CURRENT) .nodeId("test-node-id") .committed(false) - .codecVersion(ClusterMetadataManifest.CODEC_V2) + .codecVersion(CODEC_V2) .indices(Collections.singletonList(uploadedIndexMetadata)) .previousClusterUUID("prev-cluster-uuid") .clusterUUIDCommitted(true) @@ -712,6 +714,17 @@ public void testUploadedIndexMetadataWithoutComponentPrefix() throws IOException } } + public void testGetCodecForVersion() { + assertEquals(-1, ClusterMetadataManifest.getCodecForVersion(Version.fromString("1.3.0"))); + assertEquals(-1, ClusterMetadataManifest.getCodecForVersion(Version.V_2_1_0)); + assertEquals(CODEC_V0, ClusterMetadataManifest.getCodecForVersion(Version.V_2_10_0)); + assertEquals(CODEC_V1, ClusterMetadataManifest.getCodecForVersion(Version.V_2_12_0)); + assertEquals(CODEC_V1, ClusterMetadataManifest.getCodecForVersion(Version.V_2_13_0)); + assertEquals(CODEC_V2, ClusterMetadataManifest.getCodecForVersion(Version.V_2_15_0)); + assertEquals(CODEC_V3, ClusterMetadataManifest.getCodecForVersion(Version.V_2_16_0)); + assertEquals(CODEC_V3, ClusterMetadataManifest.getCodecForVersion(Version.V_2_17_0)); + } + private UploadedIndexMetadata randomlyChangingUploadedIndexMetadata(UploadedIndexMetadata uploadedIndexMetadata) { switch (randomInt(2)) { case 0: diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 1871f4d08ba43..168c74c7cb584 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -109,6 +109,7 @@ import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_BLOCKS; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTE; import static org.opensearch.gateway.remote.RemoteClusterStateTestUtils.CustomMetadata1; @@ -124,7 +125,6 @@ import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS_FORMAT; import static org.opensearch.gateway.remote.model.RemoteClusterBlocksTests.randomClusterBlocks; -import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.model.RemoteClusterStateCustoms.CLUSTER_STATE_CUSTOM; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA_FORMAT; @@ -271,7 +271,8 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException final ClusterState clusterState = generateClusterStateWithOneIndex().build(); final RemoteClusterStateManifestInfo manifestDetails = remoteClusterStateService.writeFullMetadata( clusterState, - randomAlphaOfLength(10) + randomAlphaOfLength(10), + MANIFEST_CURRENT_CODEC_VERSION ); Assert.assertThat(manifestDetails, nullValue()); } @@ -309,8 +310,11 @@ public void testWriteFullMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") - .getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( + clusterState, + "prev-cluster-uuid", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -370,8 +374,11 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException .build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") - .getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( + clusterState, + "prev-cluster-uuid", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -419,8 +426,11 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") - .getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( + clusterState, + "prev-cluster-uuid", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -497,7 +507,7 @@ public void run() { remoteClusterStateService.start(); assertThrows( RemoteStateTransferException.class, - () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) + () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10), MANIFEST_CURRENT_CODEC_VERSION) ); } @@ -541,7 +551,7 @@ public void testTimeoutWhileWritingManifestFile() throws IOException { ).thenReturn(new RemoteClusterStateUtils.UploadedMetadataResults()); RemoteStateTransferException ex = expectThrows( RemoteStateTransferException.class, - () -> spiedService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) + () -> spiedService.writeFullMetadata(clusterState, randomAlphaOfLength(10), MANIFEST_CURRENT_CODEC_VERSION) ); assertTrue(ex.getMessage().contains("Timed out waiting for transfer of following metadata to complete")); } @@ -563,7 +573,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx remoteClusterStateService.start(); assertThrows( RemoteStateTransferException.class, - () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) + () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10), MANIFEST_CURRENT_CODEC_VERSION) ); assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); } @@ -1801,8 +1811,8 @@ private void verifyCodecMigrationManifest(int previousCodec) throws IOException // global metadata is updated assertThat(manifestAfterUpdate.hasMetadataAttributesFiles(), is(true)); - // Manifest file with codec version with 1 is updated. - assertThat(manifestAfterUpdate.getCodecVersion(), is(MANIFEST_CURRENT_CODEC_VERSION)); + // During incremental update, codec version will not change. + assertThat(manifestAfterUpdate.getCodecVersion(), is(previousCodec)); } public void testWriteIncrementalGlobalMetadataFromCodecV0Success() throws IOException { @@ -1837,7 +1847,7 @@ private void verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(ClusterMe ).getClusterMetadataManifest(); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() - .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) + .codecVersion(previousManifest.getCodecVersion()) .indices(Collections.emptyList()) .clusterTerm(1L) .stateVersion(1L) @@ -2026,8 +2036,11 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") - .getClusterMetadataManifest(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata( + initialClusterState, + "_na_", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); ClusterState clusterState1 = ClusterState.builder(initialClusterState) .metadata( @@ -2105,8 +2118,11 @@ public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") - .getClusterMetadataManifest(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata( + initialClusterState, + "_na_", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); String initialIndex = "test-index"; Index index1 = new Index("test-index-1", "index-uuid-1"); Index index2 = new Index("test-index-2", "index-uuid-2"); @@ -2184,8 +2200,11 @@ private void verifyMetadataAttributeOnlyUpdated( // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") - .getClusterMetadataManifest(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata( + initialClusterState, + "_na_", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); ClusterState newClusterState = clusterStateUpdater.apply(initialClusterState); @@ -2198,8 +2217,11 @@ private void verifyMetadataAttributeOnlyUpdated( initialManifest ).getClusterMetadataManifest(); } else { - manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata(newClusterState, initialClusterState.stateUUID()) - .getClusterMetadataManifest(); + manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata( + newClusterState, + initialClusterState.stateUUID(), + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); } assertions.accept(initialManifest, manifestAfterMetadataUpdate); @@ -2572,8 +2594,11 @@ public void testRemoteStateStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") - .getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( + clusterState, + "prev-cluster-uuid", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); assertTrue(remoteClusterStateService.getStats() != null); assertEquals(1, remoteClusterStateService.getStats().getSuccessCount()); @@ -2620,8 +2645,11 @@ public void testWriteFullMetadataSuccessWithRoutingTable() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") - .getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( + clusterState, + "prev-cluster-uuid", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( "test-index", @@ -2670,8 +2698,11 @@ public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOEx when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") - .getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( + clusterState, + "prev-cluster-uuid", + MANIFEST_CURRENT_CODEC_VERSION + ).getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java index de1befbecd924..f399dd2f3d7a5 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestTests.java @@ -43,8 +43,8 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2; +import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST; -import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo;