Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote publication using min node version for backward compatibility #15216

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979))
- Optimize regexp-based include/exclude on aggregations when pattern matches prefixes ([#14371](https://github.com/opensearch-project/OpenSearch/pull/14371))
- Replace and block usages of org.apache.logging.log4j.util.Strings ([#15238](https://github.com/opensearch-project/OpenSearch/pull/15238))
- Remote publication using minimum node version for backward compatibility ([#15216](https://github.com/opensearch-project/OpenSearch/pull/15216))


### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
coordinationState.get().isRemotePublicationEnabled(),
persistedStateRegistry
);
logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString());

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
final CoordinatorPublication publication = new CoordinatorPublication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -97,6 +98,7 @@
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicBoolean allNodesRemotePublicationEnabled = new AtomicBoolean();
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
Expand Down Expand Up @@ -332,11 +334,18 @@
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
final PublicationContext publicationContext = new PublicationContext(
clusterChangedEvent,
isRemotePublicationEnabled,
persistedStateRegistry
);
if (isRemotePublicationEnabled == true) {
if (allNodesRemotePublicationEnabled.get() == false) {
if (validateRemotePublicationOnAllNodes(clusterChangedEvent.state().nodes()) == true) {
allNodesRemotePublicationEnabled.set(true);
}
}
if (allNodesRemotePublicationEnabled.get() == true) {
// if all nodes are remote then create remote publication context
return new RemotePublicationContext(clusterChangedEvent, persistedStateRegistry);
}
}
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, persistedStateRegistry);
Comment on lines +337 to +348
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry why is this needed. Can we publish cluster state dual mode based on the destination node attribute?


// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand All @@ -345,6 +354,17 @@
return publicationContext;
}

private boolean validateRemotePublicationOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
if (node.isRemoteStatePublicationEnabled() == false) {
return false;
}
}
return true;
}

// package private for testing
void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
this.currentPublishRequestToSelf.set(publishRequest);
Expand Down Expand Up @@ -385,25 +405,19 @@
*/
public class PublicationContext {

private final DiscoveryNodes discoveryNodes;
private final ClusterState newState;
private final ClusterState previousState;
private final boolean sendFullVersion;
protected final DiscoveryNodes discoveryNodes;
protected final ClusterState newState;
protected final ClusterState previousState;
protected final boolean sendFullVersion;
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
private final boolean sendRemoteState;
private final PersistedStateRegistry persistedStateRegistry;
protected final PersistedStateRegistry persistedStateRegistry;

PublicationContext(
ClusterChangedEvent clusterChangedEvent,
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
PublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
sendRemoteState = isRemotePublicationEnabled;
this.persistedStateRegistry = persistedStateRegistry;
}

Expand Down Expand Up @@ -468,17 +482,7 @@
} else {
responseActionListener = listener;
}
// TODO Decide to send remote state before starting publication by checking remote publication on all nodes
if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) {
logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination);
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, responseActionListener);
} else {
logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
sendClusterStateDiff(destination, responseActionListener);
}
sendClusterState(destination, responseActionListener);
}

public void sendApplyCommit(
Expand Down Expand Up @@ -517,58 +521,14 @@
);
}

private void sendRemoteClusterState(
final DiscoveryNode destination,
final ClusterState clusterState,
final ActionListener<PublishWithJoinResponse> listener
) {
try {
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
discoveryNodes.getLocalNode(),
clusterState.term(),
clusterState.getVersion(),
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID(),
manifestFileName
);
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
};
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<>() {

@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}

@Override
public void handleResponse(PublishWithJoinResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};
transportService.sendRequest(
destination,
PUBLISH_REMOTE_STATE_ACTION_NAME,
remotePublishRequest,
stateRequestOptions,
responseHandler
);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
listener.onFailure(e);
public void sendClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
logger.info("sending cluster state over transport to node: {}", destination.getName());
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, listener);
} else {
logger.trace("sending cluster state diff for version [{}] to [{}]", newState.version(), destination);
sendClusterStateDiff(destination, listener);
}
}

Expand Down Expand Up @@ -648,4 +608,69 @@
}
}

/**
* An extension of {@code PublicationContext} to support remote cluster state publication
*
* @opensearch.internal
*/
public class RemotePublicationContext extends PublicationContext {

RemotePublicationContext(ClusterChangedEvent clusterChangedEvent, PersistedStateRegistry persistedStateRegistry) {
super(clusterChangedEvent, persistedStateRegistry);
}

@Override
public void sendClusterState(final DiscoveryNode destination, final ActionListener<PublishWithJoinResponse> listener) {
try {
logger.info("sending remote cluster state to node: {}", destination.getName());
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
discoveryNodes.getLocalNode(),
newState.term(),
newState.getVersion(),
newState.getClusterName().value(),
newState.metadata().clusterUUID(),
manifestFileName
);
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
};

Check warning on line 639 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L637-L639

Added lines #L637 - L639 were not covered by tests
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<>() {

@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);

Check warning on line 644 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L644

Added line #L644 was not covered by tests
}

@Override
public void handleResponse(PublishWithJoinResponse response) {
listener.onResponse(response);
}

Check warning on line 650 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L649-L650

Added lines #L649 - L650 were not covered by tests

@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}

Check warning on line 655 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L654-L655

Added lines #L654 - L655 were not covered by tests

@Override
public String executor() {
return ThreadPool.Names.GENERIC;

Check warning on line 659 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L659

Added line #L659 was not covered by tests
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
}
};
transportService.sendRequest(
destination,
PUBLISH_REMOTE_STATE_ACTION_NAME,
remotePublishRequest,
stateRequestOptions,
responseHandler
);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
listener.onFailure(e);

Check warning on line 671 in server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java#L669-L671

Added lines #L669 - L671 were not covered by tests
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,12 @@ public String getLastUploadedManifestFile() {
public void setLastAcceptedState(ClusterState clusterState) {
try {
final RemoteClusterStateManifestInfo manifestDetails;
if (shouldWriteFullClusterState(clusterState)) {
// Decide the codec version
int codecVersion = ClusterMetadataManifest.getCodecForVersion(clusterState.nodes().getMinNodeVersion());
assert codecVersion >= 0 : codecVersion;
logger.info("codec version is {}", codecVersion);

if (shouldWriteFullClusterState(clusterState, codecVersion)) {
final Optional<ClusterMetadataManifest> latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
Expand All @@ -718,7 +723,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
clusterState.metadata().clusterUUID()
);
}
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, codecVersion);
} else {
assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
: "Previous manifest and previous ClusterState are not in sync";
Expand Down Expand Up @@ -758,11 +763,13 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest,
return true;
}

private boolean shouldWriteFullClusterState(ClusterState clusterState) {
private boolean shouldWriteFullClusterState(ClusterState clusterState, int codecVersion) {
assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion;
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT
|| lastAcceptedManifest.getCodecVersion() != codecVersion) {
soosinha marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.remote.ClusterMetadataManifest.Builder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -46,6 +46,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
// required for state publication
public static final int CODEC_V3 = 3; // In Codec V3, we have introduced new diff field in diff-manifest's routing_table_diff

public static final int[] CODEC_VERSIONS = { CODEC_V0, CODEC_V1, CODEC_V2, CODEC_V3 };

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid");
Expand Down Expand Up @@ -237,12 +239,34 @@ private static ClusterStateDiffManifest diffManifest(Object[] fields) {
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V3;
public static final int MANIFEST_CURRENT_CODEC_VERSION = CODEC_V3;

private static final Map<Version, Integer> VERSION_TO_CODEC_MAPPING;

static {
declareParser(PARSER_V0, CODEC_V0);
declareParser(PARSER_V1, CODEC_V1);
declareParser(PARSER_V2, CODEC_V2);
declareParser(PARSER_V3, CODEC_V3);

assert Arrays.stream(CODEC_VERSIONS).max().getAsInt() == MANIFEST_CURRENT_CODEC_VERSION;
Map<Version, Integer> versionToCodecMapping = new HashMap<>();
for (Version version : Version.getDeclaredVersions(Version.class)) {
if (version.onOrAfter(Version.V_2_10_0) && version.before(Version.V_2_12_0)) {
versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V0);
} else if (version.onOrAfter(Version.V_2_12_0) && version.before(Version.V_2_15_0)) {
versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V1);
} else if (version.onOrAfter(Version.V_2_15_0) && version.before(Version.V_2_16_0)) {
versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V2);
} else if (version.onOrAfter(Version.V_2_16_0)) {
versionToCodecMapping.put(version, ClusterMetadataManifest.CODEC_V3);
soosinha marked this conversation as resolved.
Show resolved Hide resolved
}
}
VERSION_TO_CODEC_MAPPING = Collections.unmodifiableMap(versionToCodecMapping);
}

public static int getCodecForVersion(Version version) {
return VERSION_TO_CODEC_MAPPING.getOrDefault(version, -1);
}

private static void declareParser(ConstructingObjectParser<ClusterMetadataManifest, Void> parser, long codec_version) {
Expand Down
Loading
Loading