Skip to content

Commit

Permalink
Parallelize ESIntegTestCase.ensureClusterStateConsistency across mult…
Browse files Browse the repository at this point in the history
…iple nodes

We can get a non-trivial speedup here but running all the nodes in
parallel. This is quite helpful for end-to-end integ test performance
since we run this method after effectively every test.
  • Loading branch information
original-brownbear committed Aug 9, 2024
1 parent fd916c2 commit 89d2c13
Showing 1 changed file with 63 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
Expand Down Expand Up @@ -1236,47 +1237,72 @@ protected void ensureClusterSizeConsistency() {
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {
final NamedWriteableRegistry namedWriteableRegistry = cluster().getNamedWriteableRegistry();
final Client masterClient = client();
ClusterState masterClusterState = masterClient.admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
// remove local node reference
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
String masterId = masterClusterState.nodes().getMasterNodeId();
final PlainActionFuture<Void> future = new PlainActionFuture<>();
final List<SubscribableListener<ClusterStateResponse>> localStates = new ArrayList<>(cluster().size());
for (Client client : cluster().getClients()) {
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
// remove local node reference
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
final Map<String, Object> localStateMap = convertToMap(localClusterState);
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version()
&& masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("cluster state UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("cluster state size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull(
"cluster state JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)
);
} catch (final AssertionError error) {
logger.error(
"Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(),
localClusterState.toString()
localStates.add(
SubscribableListener.newForked(l -> client.admin().cluster().prepareState().all().setLocal(true).execute(l))
);
}
try (RefCountingListener refCountingListener = new RefCountingListener(future)) {
SubscribableListener.<ClusterStateResponse>newForked(l -> client().admin().cluster().prepareState().all().execute(l))
.andThenAccept(masterStateResponse -> {
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterStateResponse.getState());
// remove local node reference
final ClusterState masterClusterState = ClusterState.Builder.fromBytes(
masterClusterStateBytes,
null,
namedWriteableRegistry
);
throw error;
}
}
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
String masterId = masterClusterState.nodes().getMasterNodeId();
for (SubscribableListener<ClusterStateResponse> localStateListener : localStates) {
localStateListener.andThenAccept(localClusterStateResponse -> {
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterStateResponse.getState());
// remove local node reference
final ClusterState localClusterState = ClusterState.Builder.fromBytes(
localClusterStateBytes,
null,
namedWriteableRegistry
);
final Map<String, Object> localStateMap = convertToMap(localClusterState);
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to
// match)
if (masterClusterState.version() == localClusterState.version()
&& masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals(
"cluster state UUID does not match",
masterClusterState.stateUUID(),
localClusterState.stateUUID()
);
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("cluster state size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull(
"cluster state JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)
);
} catch (final AssertionError error) {
logger.error(
"Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(),
localClusterState.toString()
);
throw error;
}
}
}).addListener(refCountingListener.acquire());
}
})
.addListener(refCountingListener.acquire());
}
safeGet(future);
}

}

protected void ensureClusterStateCanBeReadByNodeTool() throws IOException {
Expand Down

0 comments on commit 89d2c13

Please sign in to comment.