Skip to content

Commit

Permalink
Remote publication using min node version for backward compatibility (o…
Browse files Browse the repository at this point in the history
…pensearch-project#15216)

* Publish remote state using min node version

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha authored and dk2k committed Oct 17, 2024
1 parent d9d69f8 commit 77762a2
Show file tree
Hide file tree
Showing 14 changed files with 448 additions and 137 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979))
- Optimize regexp-based include/exclude on aggregations when pattern matches prefixes ([#14371](https://github.com/opensearch-project/OpenSearch/pull/14371))
- Replace and block usages of org.apache.logging.log4j.util.Strings ([#15238](https://github.com/opensearch-project/OpenSearch/pull/15238))
- Remote publication using minimum node version for backward compatibility ([#15216](https://github.com/opensearch-project/OpenSearch/pull/15216))


### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
coordinationState.get().isRemotePublicationEnabled(),
persistedStateRegistry
);
logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString());

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final CoordinatorPublication publication = new CoordinatorPublication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class PublicationTransportHandler {
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicBoolean allNodesRemotePublicationEnabled = new AtomicBoolean();
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
Expand Down Expand Up @@ -332,11 +334,18 @@ public PublicationContext newPublicationContext(
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
final PublicationContext publicationContext = new PublicationContext(
clusterChangedEvent,
isRemotePublicationEnabled,
persistedStateRegistry
);
if (isRemotePublicationEnabled == true) {
if (allNodesRemotePublicationEnabled.get() == false) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
}
}
if (allNodesRemotePublicationEnabled.get() == true) {
// if all nodes are remote then create remote publication context
return new RemotePublicationContext(clusterChangedEvent, persistedStateRegistry);
}
}
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, persistedStateRegistry);

// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand All @@ -345,6 +354,17 @@ public PublicationContext newPublicationContext(
return publicationContext;
}

private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
if (node.isRemoteStatePublicationEnabled() == false) {
return false;
}
}
return true;
}

// package private for testing
void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
this.currentPublishRequestToSelf.set(publishRequest);
Expand Down Expand Up @@ -385,25 +405,19 @@ private static BytesReference serializeDiffClusterState(Diff<ClusterState> diff,
*/
public class PublicationContext {

private final DiscoveryNodes discoveryNodes;
private final ClusterState newState;
private final ClusterState previousState;
private final boolean sendFullVersion;
protected final DiscoveryNodes discoveryNodes;
protected final ClusterState newState;
protected final ClusterState previousState;
protected final boolean sendFullVersion;
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
private final boolean sendRemoteState;
private final PersistedStateRegistry persistedStateRegistry;
protected final PersistedStateRegistry persistedStateRegistry;

PublicationContext(
ClusterChangedEvent clusterChangedEvent,
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
PublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
sendRemoteState = isRemotePublicationEnabled;
this.persistedStateRegistry = persistedStateRegistry;
}

Expand Down Expand Up @@ -468,17 +482,7 @@ public void onFailure(Exception e) {
} else {
responseActionListener = listener;
}
// TODO Decide to send remote state before starting publication by checking remote publication on all nodes
if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) {
logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination);
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, responseActionListener);
} else {
logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
sendClusterStateDiff(destination, responseActionListener);
}
sendClusterState(destination, responseActionListener);
}

public void sendApplyCommit(
Expand Down Expand Up @@ -517,58 +521,14 @@ public String executor() {
);
}

private void sendRemoteClusterState(
final DiscoveryNode destination,
final ClusterState clusterState,
final ActionListener<PublishWithJoinResponse> listener
) {
try {
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
discoveryNodes.getLocalNode(),
clusterState.term(),
clusterState.getVersion(),
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID(),
manifestFileName
);
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
};
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<>() {

@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}

@Override
public void handleResponse(PublishWithJoinResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};
transportService.sendRequest(
destination,
PUBLISH_REMOTE_STATE_ACTION_NAME,
remotePublishRequest,
stateRequestOptions,
responseHandler
);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
listener.onFailure(e);
public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
logger.info("sending cluster state over transport to node: {}", destination.getName());
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, listener);
} else {
logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
sendClusterStateDiff(destination, listener);
}
}

Expand Down Expand Up @@ -648,4 +608,69 @@ public String executor() {
}
}

/**
* An extension of {@code PublicationContext} to support remote cluster state publication
*
* @opensearch.internal
*/
public class RemotePublicationContext extends PublicationContext {

RemotePublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) {
super(clusterChangedEvent, persistedStateRegistry);
}

@Override
public void sendClusterState(final DiscoveryNode destination, final ActionListener<PublishWithJoinResponse> listener) {
try {
logger.info("sending remote cluster state to node: {}", destination.getName());
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
discoveryNodes.getLocalNode(),
newState.term(),
newState.getVersion(),
newState.getClusterName().value(),
newState.metadata().clusterUUID(),
manifestFileName
);
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
};
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<>() {

@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}

@Override
public void handleResponse(PublishWithJoinResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};
transportService.sendRequest(
destination,
PUBLISH_REMOTE_STATE_ACTION_NAME,
remotePublishRequest,
stateRequestOptions,
responseHandler
);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
listener.onFailure(e);
}
}
}

}
15 changes: 11 additions & 4 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,12 @@ public String getLastUploadedManifestFile() {
public void setLastAcceptedState(ClusterState clusterState) {
try {
final RemoteClusterStateManifestInfo manifestDetails;
if (shouldWriteFullClusterState(clusterState)) {
// Decide the codec version
int codecVersion = ClusterMetadataManifest.getCodecForVersion(clusterState.nodes().getMinNodeVersion());
assert codecVersion >= 0 : codecVersion;
logger.info("codec version is {}", codecVersion);

if (shouldWriteFullClusterState(clusterState, codecVersion)) {
final Optional<ClusterMetadataManifest> latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
Expand All @@ -718,7 +723,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
clusterState.metadata().clusterUUID()
);
}
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, codecVersion);
} else {
assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
: "Previous manifest and previous ClusterState are not in sync";
Expand Down Expand Up @@ -758,11 +763,13 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest,
return true;
}

private boolean shouldWriteFullClusterState(ClusterState clusterState) {
private boolean shouldWriteFullClusterState(ClusterState clusterState, int codecVersion) {
assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion;
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT
|| lastAcceptedManifest.getCodecVersion() != codecVersion) {
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.remote.ClusterMetadataManifest.Builder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -46,6 +46,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
// required for state publication
public static final int CODEC_V3 = 3; // In Codec V3, we have introduced new diff field in diff-manifest's routing_table_diff

public static final int[] CODEC_VERSIONS = { CODEC_V0, CODEC_V1, CODEC_V2, CODEC_V3 };

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
Expand Down Expand Up @@ -237,12 +239,34 @@ private static ClusterStateDiffManifest diffManifest(Object[] fields) {
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V3;
public static final int MANIFEST_CURRENT_CODEC_VERSION = CODEC_V3;

private static final Map<Version, Integer> VERSION_TO_CODEC_MAPPING;

static {
declareParser(PARSER_V0, CODEC_V0);
declareParser(PARSER_V1, CODEC_V1);
declareParser(PARSER_V2, CODEC_V2);
declareParser(PARSER_V3, CODEC_V3);

assert Arrays.stream(CODEC_VERSIONS).max().getAsInt() == MANIFEST_CURRENT_CODEC_VERSION;
Map<Version, Integer> versionToCodecMapping = new HashMap<>();
for (Version version : Version.getDeclaredVersions(Version.class)) {
if (version.onOrAfter(Version.V_2_10_0) && version.before(Version.V_2_12_0)) {
versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V0);
} else if (version.onOrAfter(Version.V_2_12_0) && version.before(Version.V_2_15_0)) {
versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V1);
} else if (version.onOrAfter(Version.V_2_15_0) && version.before(Version.V_2_16_0)) {
versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V2);
} else if (version.onOrAfter(Version.V_2_16_0)) {
versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V3);
}
}
VERSION_TO_CODEC_MAPPING = Collections.unmodifiableMap(versionToCodecMapping);
}

public static int getCodecForVersion(Version version) {
return VERSION_TO_CODEC_MAPPING.getOrDefault(version, -1);
}

private static void declareParser(ConstructingObjectParser<ClusterMetadataManifest, Void> parser, long codec_version) {
Expand Down
Loading

0 comments on commit 77762a2

Please sign in to comment.