diff --git a/docs/changelog/88562.yaml b/docs/changelog/88562.yaml new file mode 100644 index 0000000000000..ce6d4b70cc45f --- /dev/null +++ b/docs/changelog/88562.yaml @@ -0,0 +1,5 @@ +pr: 88562 +summary: Polling for cluster diagnostics information +area: Health +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java index 2e0303dbd5b4c..258f7af803036 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsService.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction; +import org.elasticsearch.action.admin.cluster.coordination.CoordinationDiagnosticsAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -25,6 +26,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -38,6 +40,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -50,6 +53,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -66,7 +71,7 @@ * Since this service needs to be able to run when there is no master at all, it does not depend on the dedicated health node (which * requires the existence of a master). */ -public class CoordinationDiagnosticsService implements ClusterStateListener { +public class CoordinationDiagnosticsService implements ClusterStateListener, Coordinator.PeerFinderListener { private final ClusterService clusterService; private final TransportService transportService; private final Coordinator coordinator; @@ -101,6 +106,33 @@ public class CoordinationDiagnosticsService implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(CoordinationDiagnosticsService.class); + /* + * This is a reference to the task that is periodically reaching out to a master eligible node to get its CoordinationDiagnosticsResult + * for diagnosis. It is null when no polling is occurring. + * The field is accessed (reads/writes) from multiple threads, and is also reassigned on multiple threads. + */ + private volatile AtomicReference remoteStableMasterHealthIndicatorTask = null; + /* + * This field holds the result of the task in the remoteStableMasterHealthIndicatorTask field above. The field is accessed + * (reads/writes) from multiple threads, and is also reassigned on multiple threads. + */ + private volatile AtomicReference remoteCoordinationDiagnosisResult = new AtomicReference<>(); + + /** + * This field has a reference to an AtomicBoolean indicating whether the most recent attempt at polling for remote coordination + * diagnostics ought to still be running. There is one AtomicBoolean per remote coordination diagnostics poll. We keep track of the + * current (or most recent anyway) one here so that the cancelPollingRemoteStableMasterHealthIndicatorService can cancel it. All + * older copies will have already been cancelled, which will help any still-running polls to know that they have been cancelled (only + * one needs to be running at any given time). + */ + private volatile AtomicBoolean remoteCoordinationDiagnosticsCancelled = new AtomicBoolean(true); + + /* + * The previous three variables (remoteStableMasterHealthIndicatorTask, remoteCoordinationDiagnosisResult, and + * remoteCoordinationDiagnosticsCancelled) are reassigned on multiple threads. This mutex is used to protect those reassignments. + */ + private final Object remoteDiagnosticsMutex = new Object(); + /** * This is the default amount of time we look back to see if we have had a master at all, before moving on with other checks */ @@ -146,6 +178,7 @@ public CoordinationDiagnosticsService( this.unacceptableNullTransitions = NO_MASTER_TRANSITIONS_THRESHOLD_SETTING.get(clusterService.getSettings()); this.unacceptableIdentityChanges = IDENTITY_CHANGES_THRESHOLD_SETTING.get(clusterService.getSettings()); clusterService.addListener(this); + coordinator.addPeerFinderListener(this); } /** @@ -655,7 +688,9 @@ void beginPollingClusterFormationInfo( cancellableConsumer.accept( fetchClusterFormationInfo( masterEligibleNode, - responseConsumer.andThen(rescheduleFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer)) + responseConsumer.andThen( + rescheduleClusterFormationFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer) + ) ) ); } catch (EsRejectedExecutionException e) { @@ -669,14 +704,14 @@ void beginPollingClusterFormationInfo( } /** - * This wraps the responseConsumer in a Consumer that will run rescheduleFetchConsumer() after responseConsumer has + * This wraps the responseConsumer in a Consumer that will run rescheduleClusterFormationFetchConsumer() after responseConsumer has * completed, adding the resulting Cancellable to cancellableConsumer. * @param masterEligibleNode The node being polled * @param responseConsumer The response consumer to be wrapped * @param cancellableConsumer The list of Cancellables * @return */ - private Consumer rescheduleFetchConsumer( + private Consumer rescheduleClusterFormationFetchConsumer( DiscoveryNode masterEligibleNode, Consumer responseConsumer, Consumer cancellableConsumer @@ -686,7 +721,9 @@ private Consumer masterEligibleNodes) { + synchronized (remoteDiagnosticsMutex) { + if (remoteCoordinationDiagnosticsCancelled.get()) { // Don't start a 2nd one if the last hasn't been cancelled + AtomicReference cancellableReference = new AtomicReference<>(); + AtomicReference resultReference = new AtomicReference<>(); + AtomicBoolean isCancelled = new AtomicBoolean(false); + beginPollingRemoteStableMasterHealthIndicatorService(masterEligibleNodes, resultReference::set, task -> { + synchronized (remoteDiagnosticsMutex) { + if (isCancelled.get()) { + /* + * cancelPollingRemoteStableMasterHealthIndicatorService() has been called already and if we were to put + * this task into this cancellableReference it would be lost (and never cancelled). + */ + task.cancel(); + logger.trace("A Cancellable came in for a cancelled remote coordination diagnostics task"); + } else { + cancellableReference.set(task); + } + } + }, isCancelled); + remoteStableMasterHealthIndicatorTask = cancellableReference; + remoteCoordinationDiagnosisResult = resultReference; + // We're leaving the old one forever false in case something is still running and checks it in the future + remoteCoordinationDiagnosticsCancelled = isCancelled; + } + } + } + + /** + * This method returns quickly, but in the background schedules to query the remote node's cluster diagnostics in 10 seconds, and + * repeats doing that until cancel() is called on all of the Cancellable that this method sends to the cancellableConsumer. This method + * exists (rather than being just part of the beginPollingRemoteStableMasterHealthIndicatorService() above) in order to facilitate + * unit testing. + * masterEligibleNodes A collection of all master eligible nodes that may be polled + * @param responseConsumer A consumer for any results produced for a node by this method + * @param cancellableConsumer A consumer for any Cancellable tasks produced by this method + * @param isCancelled If true, this task has been cancelled in another thread and does not need to do any more work + */ + // Non-private for testing + void beginPollingRemoteStableMasterHealthIndicatorService( + Collection masterEligibleNodes, + Consumer responseConsumer, + Consumer cancellableConsumer, + AtomicBoolean isCancelled + ) { + masterEligibleNodes.stream().findAny().ifPresentOrElse(masterEligibleNode -> { + cancellableConsumer.accept( + fetchCoordinationDiagnostics( + masterEligibleNode, + responseConsumer.andThen( + rescheduleDiagnosticsFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer, isCancelled) + ), + isCancelled + ) + ); + }, () -> logger.trace("No master eligible node found")); + } + + /** + * This wraps the responseConsumer in a Consumer that will run rescheduleDiagnosticsFetchConsumer() after responseConsumer has + * completed, adding the resulting Cancellable to cancellableConsumer. + * @param masterEligibleNode The node being polled + * @param responseConsumer The response consumer to be wrapped + * @param cancellableConsumer The list of Cancellables + * @param isCancelled If true, this task has been cancelled in another thread and does not need to do any more work + * @return A wrapped Consumer that will run fetchCoordinationDiagnostics() + */ + private Consumer rescheduleDiagnosticsFetchConsumer( + DiscoveryNode masterEligibleNode, + Consumer responseConsumer, + Consumer cancellableConsumer, + AtomicBoolean isCancelled + ) { + return response -> { + if (isCancelled.get()) { + logger.trace("An attempt to reschedule a cancelled remote coordination diagnostics task is being ignored"); + } else { + /* + * We make sure that the task hasn't been cancelled. If it has, we don't reschedule the task. Since this block is not + * synchronized it is possible that the task will be cancelled after the check and before the following is run. In that case + * the cancellableConsumer will immediately cancel the task. + */ + cancellableConsumer.accept( + fetchCoordinationDiagnostics( + masterEligibleNode, + responseConsumer.andThen( + rescheduleDiagnosticsFetchConsumer(masterEligibleNode, responseConsumer, cancellableConsumer, isCancelled) + ), + isCancelled + ) + ); + } + }; + } + + /** + * This method returns quickly, but in the background schedules to query the remote node's cluster diagnostics in 10 seconds + * unless cancel() is called on the Cancellable that this method returns. + * @param node The node to poll for cluster diagnostics + * @param responseConsumer The consumer of the cluster diagnostics for the node, or the exception encountered while contacting it + * @param isCancelled If true, this task has been cancelled in another thread and does not need to do any more work + * @return A Cancellable for the task that is scheduled to fetch cluster diagnostics + */ + private Scheduler.Cancellable fetchCoordinationDiagnostics( + DiscoveryNode node, + Consumer responseConsumer, + AtomicBoolean isCancelled + ) { + StepListener connectionListener = new StepListener<>(); + StepListener fetchCoordinationDiagnosticsListener = new StepListener<>(); + long startTime = System.nanoTime(); + connectionListener.whenComplete(releasable -> { + if (isCancelled.get()) { + IOUtils.close(releasable); + logger.trace( + "Opened connection to {} for a remote coordination diagnostics request, but the task was cancelled and the " + + "trasport request will not be made", + node + ); + } else { + /* + * Since this block is not synchronized it is possible that this task is cancelled between the check above and when the + * code below is run, but this is harmless and not worth the additional synchronization in the normal case. The result + * will just be ignored. + */ + logger.trace("Opened connection to {}, making master stability request", node); + // If we don't get a response in 10 seconds that is a failure worth capturing on its own: + final TimeValue transportTimeout = TimeValue.timeValueSeconds(10); + transportService.sendRequest( + node, + CoordinationDiagnosticsAction.NAME, + new CoordinationDiagnosticsAction.Request(true), + TransportRequestOptions.timeout(transportTimeout), + new ActionListenerResponseHandler<>( + ActionListener.runBefore(fetchCoordinationDiagnosticsListener, () -> Releasables.close(releasable)), + CoordinationDiagnosticsAction.Response::new + ) + ); + } + }, e -> { + logger.warn("Exception connecting to master node", e); + responseConsumer.accept(new RemoteMasterHealthResult(node, null, e)); + }); + + fetchCoordinationDiagnosticsListener.whenComplete(response -> { + long endTime = System.nanoTime(); + logger.trace("Received master stability result from {} in {}", node, TimeValue.timeValueNanos(endTime - startTime)); + /* + * It is possible that the task has been cancelled at this point, but it does not really matter. In that case this result + * will be ignored and soon garbage collected. + */ + responseConsumer.accept(new RemoteMasterHealthResult(node, response.getCoordinationDiagnosticsResult(), null)); + }, e -> { + logger.warn("Exception in master stability request to master node", e); + responseConsumer.accept(new RemoteMasterHealthResult(node, null, e)); + }); + + return transportService.getThreadPool().schedule(() -> { + Version minSupportedVersion = Version.V_8_4_0; + if (node.getVersion().onOrAfter(minSupportedVersion) == false) { // This was introduced in 8.4.0 + logger.trace( + "Cannot get cluster coordination info for {} because it is at version {} and {} is required", + node, + node.getVersion(), + minSupportedVersion + ); + } else if (isCancelled.get()) { + logger.trace("The remote coordination diagnostics task has been cancelled, so not opening a remote connection"); + } else { + /* + * Since this block is not synchronized it is possible that this task is cancelled between the check above and when the + * code below is run, but this is harmless and not worth the additional synchronization in the normal case. In that case + * the connection will just be closed and the transport request will not be made. + */ + transportService.connectToNode( + // Note: This connection must be explicitly closed in the connectionListener + node, + ConnectionProfile.buildDefaultConnectionProfile(clusterService.getSettings()), + connectionListener + ); + } + }, new TimeValue(10, TimeUnit.SECONDS), ThreadPool.Names.SAME); + } + + private void cancelPollingRemoteStableMasterHealthIndicatorService() { + synchronized (remoteDiagnosticsMutex) { + if (remoteStableMasterHealthIndicatorTask != null && remoteCoordinationDiagnosticsCancelled.getAndSet(true) == false) { + remoteStableMasterHealthIndicatorTask.get().cancel(); + remoteCoordinationDiagnosisResult = new AtomicReference<>(); + } + } + } + + @Override + public void onFoundPeersUpdated() { + /* + * If we are on a non-master-eligible node, and the list of peers in PeerFinder is non-empty, that implies that there is + * currently no master node elected. At the time that clusterChanged is called notifying us that there is no master, the list of + * peers is empty (it is before this method is called). That is why this logic is in here rather than in clusterChanged. + * This begins polling a random master-eligible node for its result from this service. However there's a 10-second delay before it + * starts, so in the normal situation where during a master transition it flips from master1 -> null -> master2, it the + * polling tasks will be canceled before any requests are actually made. + * Note that this method can be called from multiple threads. + */ + if (clusterService.localNode().isMasterNode() == false) { + /* + * Note that PeerFinder (the source of master eligible nodes) could be updating the master eligible nodes on a different + * thread, so making a copy here so that it doesn't change for the short duration of this method. + */ + List masterEligibleNodes = new ArrayList<>(getMasterEligibleNodes()); + if (masterEligibleNodes.isEmpty()) { + cancelPollingRemoteStableMasterHealthIndicatorService(); + } else { + beginPollingRemoteStableMasterHealthIndicatorService(masterEligibleNodes); + } + } + } + // Non-private for testing record ClusterFormationStateOrException( ClusterFormationFailureHelper.ClusterFormationState clusterFormationState, @@ -923,4 +1178,7 @@ public void writeTo(StreamOutput out) throws IOException { } } + + // Non-private for testing: + record RemoteMasterHealthResult(DiscoveryNode node, CoordinationDiagnosticsResult result, Exception remoteException) {} } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsServiceTests.java index bf465fec9a368..689e05a4a70a5 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationDiagnosticsServiceTests.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -54,6 +55,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -1016,6 +1018,154 @@ public void testBeginPollingClusterFormationInfoCancel() { } } + public void testBeginPollingRemoteStableMasterHealthIndicatorService() { + /* + * This test sets up a 5-node cluster (3 master eligible). We call beginPollingRemoteStableMasterHealthIndicatorService() on each + * non-master-eligible node, once using the set of master-eligible nodes that were NOT the initial elected master, and once using + * just the initially-elected master (the reason being that these two groups of nodes will have a different view of master + * stability due to things that are coming up). We simulate running for a while, and assert that we have diagnostic results on + * each non-master node. Then we disconnect all master-eligible nodes except for the elected master and simulate running for a + * while. Then when we check the diagnostic result we expect to see an exception when we're only polling the non-elected-master + * master-eligible nodes (because they've been disconnected). And we expect to see a RED status on the diagnostic result where we + * were polling the master-eligible node that is not disconnected (since it can't form a quorum any more).) + */ + try (Cluster cluster = new Cluster(3, true, Settings.EMPTY)) { + createAndAddNonMasterNode(cluster); + createAndAddNonMasterNode(cluster); + cluster.runRandomly(); + cluster.stabilise(); + List masterNodes = cluster.clusterNodes.stream() + .map(Cluster.ClusterNode::getLocalNode) + .filter(DiscoveryNode::isMasterNode) + .toList(); + Cluster.ClusterNode electedMaster = cluster.getAnyLeader(); + List masterNodesNotIncludingElectedMaster = masterNodes.stream() + .filter(node -> node.equals(electedMaster.getLocalNode()) == false) + .toList(); + cluster.clusterNodes.stream().filter(node -> node.getLocalNode().isMasterNode() == false).forEach(node -> { + List healthResultsNotElectedMaster = new ArrayList<>(); + node.coordinationDiagnosticsService.beginPollingRemoteStableMasterHealthIndicatorService( + masterNodesNotIncludingElectedMaster, + healthResultsNotElectedMaster::add, + cancellable -> {}, + new AtomicBoolean(false) + ); + + List healthResultsOriginalMaster = new ArrayList<>(); + node.coordinationDiagnosticsService.beginPollingRemoteStableMasterHealthIndicatorService( + List.of(electedMaster.getLocalNode()), + healthResultsOriginalMaster::add, + cancellable -> {}, + new AtomicBoolean(false) + ); + + cluster.runRandomly(false, true, EXTREME_DELAY_VARIABILITY); + cluster.stabilise(); + + /* + * The cluster has now run normally for some period of time, so check that the output of + * beginPollingRemoteStableMasterHealthIndicatorService() is present and not an exception, both when we're polling a node + * that was not elected master, and when we're polling the node that was elected master: + */ + assertThat(healthResultsNotElectedMaster.size(), greaterThanOrEqualTo(1)); + CoordinationDiagnosticsService.RemoteMasterHealthResult result = healthResultsNotElectedMaster.get( + healthResultsNotElectedMaster.size() - 1 + ); + assertThat((result.result()), notNullValue()); + assertThat((result.node()), notNullValue()); + assertThat((result.remoteException()), nullValue()); + CoordinationDiagnosticsService.CoordinationDiagnosticsResult diagResult = result.result(); + assertThat(diagResult.status(), equalTo(CoordinationDiagnosticsService.CoordinationDiagnosticsStatus.GREEN)); + + assertThat(healthResultsOriginalMaster.size(), greaterThanOrEqualTo(1)); + result = healthResultsOriginalMaster.get(healthResultsOriginalMaster.size() - 1); + assertThat((result.result()), notNullValue()); + assertThat((result.node()), notNullValue()); + assertThat((result.remoteException()), nullValue()); + diagResult = result.result(); + assertThat(diagResult.status(), equalTo(CoordinationDiagnosticsService.CoordinationDiagnosticsStatus.GREEN)); + + /* + * Now disconnect all master-eligible nodes except the elected master (the fact that it's the elected master does not + * really matter, it's just a convenient way to distinguish it). + */ + for (Cluster.ClusterNode clusterNode : cluster.clusterNodes) { + if (clusterNode.getLocalNode().isMasterNode() + && clusterNode.getLocalNode().equals(electedMaster.getLocalNode()) == false) { + clusterNode.disconnect(); + } + } + cluster.runFor(DEFAULT_STABILISATION_TIME, "Cannot call stabilise() because there is no master"); + + /* + * At this point the cluster has been running for a while with 2 of its 3 master-eligible nodes missing. So when a + * non-master-eligible node tries to poll one of the disconnected nodes, we expect it to have received an exception: + */ + assertThat(healthResultsNotElectedMaster.size(), greaterThanOrEqualTo(1)); + result = healthResultsNotElectedMaster.get(healthResultsNotElectedMaster.size() - 1); + assertThat((result.result()), nullValue()); + assertThat((result.node()), notNullValue()); + assertThat((result.remoteException()), notNullValue()); + + /* + * Since the lone master-eligible node we can talk to cannot form a quorum, we expect its status to be red: + */ + assertThat(healthResultsOriginalMaster.size(), greaterThanOrEqualTo(1)); + result = healthResultsOriginalMaster.get(healthResultsOriginalMaster.size() - 1); + assertThat((result.result()), notNullValue()); + assertThat((result.node()), notNullValue()); + assertThat((result.remoteException()), nullValue()); + diagResult = result.result(); + assertThat(diagResult.status(), equalTo(CoordinationDiagnosticsService.CoordinationDiagnosticsStatus.RED)); + + for (Cluster.ClusterNode clusterNode : cluster.clusterNodes) { + if (clusterNode.getLocalNode().isMasterNode()) { + clusterNode.heal(); + } + } + }); + + } + } + + public void testBeginPollingRemoteStableMasterHealthIndicatorServiceCancel() { + /* + * This test sets up a 5-node cluster (3 master eligible). We call beginPollingRemoteStableMasterHealthIndicatorService() on each + * non-master-eligible node. But we immediately call cancel, which is what will happen in practice most often since usually the + * master becomes null and then is immediately non-null when a new master is elected. This means that polling will not be started + * since there is a 10-second delay, and we expect no results. + */ + try (Cluster cluster = new Cluster(3, true, Settings.EMPTY)) { + createAndAddNonMasterNode(cluster); + createAndAddNonMasterNode(cluster); + cluster.runRandomly(); + cluster.stabilise(); + List masterNodes = cluster.clusterNodes.stream() + .map(Cluster.ClusterNode::getLocalNode) + .filter(DiscoveryNode::isMasterNode) + .toList(); + cluster.clusterNodes.stream().filter(node -> node.getLocalNode().isMasterNode() == false).forEach(node -> { + List healthResults = new ArrayList<>(); + node.coordinationDiagnosticsService.beginPollingRemoteStableMasterHealthIndicatorService( + masterNodes, + healthResults::add, + Scheduler.Cancellable::cancel, + new AtomicBoolean(false) + ); + + cluster.runRandomly(false, true, EXTREME_DELAY_VARIABILITY); + cluster.stabilise(); + + /* + * The cluster has now run normally for some period of time, but cancel() was called before polling began, so we expect + * no results: + */ + assertThat(healthResults.size(), equalTo(0)); + }); + + } + } + public void testResultSerialization() { CoordinationDiagnosticsService.CoordinationDiagnosticsStatus status = getRandomStatus(); CoordinationDiagnosticsService.CoordinationDiagnosticsDetails details = getRandomDetails(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 7beaa3c9456eb..971cf9c57d484 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -16,6 +16,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction; +import org.elasticsearch.action.admin.cluster.coordination.CoordinationDiagnosticsAction; import org.elasticsearch.action.admin.cluster.coordination.MasterHistoryAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction; @@ -1251,6 +1252,12 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { getElectionStrategy(), nodeHealthService ); + coordinationDiagnosticsService = new CoordinationDiagnosticsService( + clusterService, + transportService, + coordinator, + masterHistoryService + ); client.initialize( Map.of( NodesHotThreadsAction.INSTANCE, @@ -1258,7 +1265,13 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { MasterHistoryAction.INSTANCE, new MasterHistoryAction.TransportAction(transportService, new ActionFilters(Set.of()), masterHistoryService), ClusterFormationInfoAction.INSTANCE, - new ClusterFormationInfoAction.TransportAction(transportService, new ActionFilters(Set.of()), coordinator) + new ClusterFormationInfoAction.TransportAction(transportService, new ActionFilters(Set.of()), coordinator), + CoordinationDiagnosticsAction.INSTANCE, + new CoordinationDiagnosticsAction.TransportAction( + transportService, + new ActionFilters(Set.of()), + coordinationDiagnosticsService + ) ), transportService.getTaskManager(), localNode::getId, @@ -1266,12 +1279,6 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { null, getNamedWriteableRegistry() ); - coordinationDiagnosticsService = new CoordinationDiagnosticsService( - clusterService, - transportService, - coordinator, - masterHistoryService - ); stableMasterHealthIndicatorService = new StableMasterHealthIndicatorService(coordinationDiagnosticsService); masterService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(