Skip to content

Commit

Permalink
Merge branch 'main' into fix-10682
Browse files Browse the repository at this point in the history
  • Loading branch information
shiv0408 committed Sep 5, 2024
2 parents bb26188 + 2eb148c commit 40ef9f6
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationState;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
Expand All @@ -29,20 +31,28 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY;
import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
Expand Down Expand Up @@ -217,11 +227,121 @@ public void testRemotePublicationDownloadStats() {
NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.addMetric(DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);
}

public void testRemotePublicationDisabledByRollingRestart() throws Exception {
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);

Set<String> clusterManagers = internalCluster().getClusterManagerNames();
Set<String> restartedMasters = new HashSet<>();

for (String clusterManager : clusterManagers) {
internalCluster().restartNode(clusterManager, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
restartedMasters.add(nodeName);
return Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, false).build();
}

@Override
public void doAfterNodes(int n, Client client) {
String activeCM = internalCluster().getClusterManagerName();
Set<String> followingCMs = clusterManagers.stream()
.filter(node -> !Objects.equals(node, activeCM))
.collect(Collectors.toSet());
boolean activeCMRestarted = restartedMasters.contains(activeCM);
NodesStatsResponse response = client().admin()
.cluster()
.prepareNodesStats(followingCMs.toArray(new String[0]))
.clear()
.addMetric(DISCOVERY.metricName())
.get();
// after master is flipped to restarted master, publication should happen on Transport
response.getNodes().forEach(nodeStats -> {
if (activeCMRestarted) {
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
assertTrue(
stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0
);
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
} else {
DiscoveryStats stats = nodeStats.getDiscoveryStats();
assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount());
assertEquals(0, stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount());
assertEquals(0, stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount());
}
});

NodesInfoResponse nodesInfoResponse = client().admin()
.cluster()
.prepareNodesInfo(activeCM)
.clear()
.addMetric(SETTINGS.metricName())
.get();
// if masterRestarted is true Publication Setting should be false, and vice versa
assertTrue(
REMOTE_PUBLICATION_EXPERIMENTAL_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()) != activeCMRestarted
);

followingCMs.forEach(node -> {
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
CoordinationState.PersistedState remoteState = registry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
if (activeCMRestarted) {
assertNull(remoteState.getLastAcceptedState());
// assertNull(remoteState.getLastAcceptedManifest());
} else {
ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL)
.getLastAcceptedState();
ClusterState remotePersistedState = remoteState.getLastAcceptedState();
assertTrue(isGlobalStateEquals(localState.metadata(), remotePersistedState.metadata()));
assertEquals(localState.nodes(), remotePersistedState.nodes());
assertEquals(localState.routingTable(), remotePersistedState.routingTable());
assertEquals(localState.customs(), remotePersistedState.customs());
}
});
}
});

}
ensureGreen(INDEX_NAME);
ensureStableCluster(5);

String activeCM = internalCluster().getClusterManagerName();
Set<String> followingCMs = clusterManagers.stream().filter(node -> !Objects.equals(node, activeCM)).collect(Collectors.toSet());
NodesStatsResponse response = client().admin()
.cluster()
.prepareNodesStats(followingCMs.toArray(new String[0]))
.clear()
.addMetric(DISCOVERY.metricName())
.get();
response.getNodes().forEach(nodeStats -> {
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0);
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
});
NodesInfoResponse nodesInfoResponse = client().admin()
.cluster()
.prepareNodesInfo(activeCM)
.clear()
.addMetric(SETTINGS.metricName())
.get();
// if masterRestarted is true Publication Setting should be false, and vice versa
assertFalse(REMOTE_PUBLICATION_EXPERIMENTAL_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()));

followingCMs.forEach(node -> {
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
assertNull(remoteState.getLastAcceptedState());
// assertNull(remoteState.getLastAcceptedManifest());
});
}

public void testMasterReElectionUsesIncrementalUpload() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2143,6 +2143,17 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
}
}

/**
* Returns the name of all the cluster managers in the cluster
*/
public Set<String> getClusterManagerNames() {
return nodes.entrySet()
.stream()
.filter(entry -> CLUSTER_MANAGER_NODE_PREDICATE.test(entry.getValue()))
.map(entry -> entry.getKey())
.collect(Collectors.toSet());
}

/**
* Returns the name of the current cluster-manager node in the cluster.
*/
Expand Down

0 comments on commit 40ef9f6

Please sign in to comment.