Skip to content

Commit

Permalink
Upload incremental cluster state on master re-election
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Sep 3, 2024
1 parent 82762d4 commit 667544c
Show file tree
Hide file tree
Showing 12 changed files with 724 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.coordination.CoordinationState;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand All @@ -32,9 +34,11 @@
import java.util.Base64;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
Expand All @@ -54,21 +58,13 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {

private static String INDEX_NAME = "test-index";
private boolean isRemoteStateEnabled = true;
private String isRemotePublicationEnabled = "true";
private boolean isRemotePublicationEnabled = true;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
isRemoteStateEnabled = true;
isRemotePublicationEnabled = "true";
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled)
.build();
isRemotePublicationEnabled = true;
}

@Override
Expand All @@ -91,6 +87,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
.put(REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled)
.build();
}

Expand Down Expand Up @@ -177,6 +174,59 @@ public void testRemotePublicationDownloadStats() {

}

public void testMasterReElectionUsesIncrementalUpload() throws IOException {
prepareCluster(3, 2, INDEX_NAME, 1, 1);
PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
GatewayMetaState.RemotePersistedState remotePersistedState = (GatewayMetaState.RemotePersistedState) persistedStateRegistry
.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
ClusterMetadataManifest manifest = remotePersistedState.getLastAcceptedManifest();
// force elected master to step down
internalCluster().stopCurrentClusterManagerNode();
ensureStableCluster(4);

persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest();

// coordination metadata is updated, it will be unequal
assertNotEquals(manifest.getCoordinationMetadata(), manifestAfterElection.getCoordinationMetadata());
// all other attributes are not uploaded again and will be pointing to same files in manifest after new master is elected
assertEquals(manifest.getClusterUUID(), manifestAfterElection.getClusterUUID());
assertEquals(manifest.getIndices(), manifestAfterElection.getIndices());
assertEquals(manifest.getSettingsMetadata(), manifestAfterElection.getSettingsMetadata());
assertEquals(manifest.getTemplatesMetadata(), manifestAfterElection.getTemplatesMetadata());
assertEquals(manifest.getCustomMetadataMap(), manifestAfterElection.getCustomMetadataMap());
assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion());
assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting());
}

public void testVotingConfigAreCommitted() throws ExecutionException, InterruptedException {
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);
// add two new nodes to the cluster, to update the voting config
internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY);
ensureStableCluster(7);

internalCluster().getInstances(PersistedStateRegistry.class).forEach(persistedStateRegistry -> {
CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.LOCAL
);
CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
if (remoteState != null) {
assertEquals(
localState.getLastAcceptedState().getLastCommittedConfiguration(),
remoteState.getLastAcceptedState().getLastCommittedConfiguration()
);
assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size());
}
});
}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,4 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
verifyRestoredData(indexStats, indexName, true);
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

public void prepareCluster(
int numClusterManagerNodes,
int numDataOnlyNodes,
String indices,
int replicaCount,
int shardCount,
Settings settings
) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,7 @@ protected void restore(boolean restoreAllShards, String... indices) {
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

protected void prepareCluster(
Expand All @@ -368,11 +362,16 @@ protected void prepareCluster(
int shardCount,
Settings settings
) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -460,6 +461,10 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);

if (shouldUpdateRemotePersistedState(publishRequest)) {
updateRemotePersistedStateOnPublishRequest(publishRequest);
}
assert getLastAcceptedState() == clusterState;

return new PublishResponse(clusterState.term(), clusterState.version());
Expand Down Expand Up @@ -572,6 +577,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
);

persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
if (shouldCommitRemotePersistedState()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -617,6 +625,37 @@ public void close() throws IOException {
IOUtils.close(persistedStateRegistry);
}

private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) {
return isRemotePublicationEnabled
&& localNode.isClusterManagerNode()
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false;
}

private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) {
if (publishRequest.hasManifest()) {
assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null;
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState());
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)
.setLastAcceptedManifest(publishRequest.getAcceptedManifest().get());
} else {
// We will end up here if PublishRequest was sent not using Remote Store even with remotePublication enabled on this node
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null);
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null);
}
}

private boolean shouldCommitRemotePersistedState() {
return isRemotePublicationEnabled
&& localNode.isClusterManagerNode()
&& persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.getNodes()
.isLocalNodeElectedClusterManager() == false
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down Expand Up @@ -654,6 +693,22 @@ public interface PersistedState extends Closeable {
*/
PersistedStateStats getStats();

/**
* Returns the last accepted {@link ClusterMetadataManifest}.
*
* @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest
* has been accepted yet.
*/
default ClusterMetadataManifest getLastAcceptedManifest() {
// return null by default, this method needs to be overridden wherever required
return null;
}

/**
* Sets the last accepted {@link ClusterMetadataManifest}.
*/
default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {}

/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
Expand All @@ -662,14 +717,7 @@ public interface PersistedState extends Closeable {
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
// the cluster uuid might not been known yet.
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
Expand All @@ -694,6 +742,18 @@ default void markLastAcceptedStateAsCommitted() {
}
}

default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
return metadataBuilder;
}

default void close() throws IOException {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.set(incomingState);
return response;
} else {
Expand Down Expand Up @@ -230,7 +230,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
incomingState.stateUUID(),
request.bytes().length()
);
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
}
Expand Down Expand Up @@ -281,7 +281,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.set(clusterState);
return response;
} else {
Expand All @@ -300,7 +300,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
Expand All @@ -314,7 +314,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
Expand All @@ -324,6 +324,9 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(publishRequest);
}
}
if (manifest != null) {
return handlePublishRequest.apply(new PublishRequest(incomingState, manifest));
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

Expand Down
Loading

0 comments on commit 667544c

Please sign in to comment.