From 5bb51f621b7426c632e8e3397dd2833e39aa0094 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 23 May 2018 11:03:27 +0100 Subject: [PATCH 1/5] Decouple ClusterStateTaskListener & ClusterApplier Today, the `ClusterApplier` and `MasterService` both use the `ClusterStateTaskListener` interface to notify their callers when asynchronous activities have completed. However, this is not wholly appropriate: none of the callers into the `ClusterApplier` care about the `ClusterState` arguments that they receive. This change introduces a dedicated ClusterApplyListener interface for callers into the `ClusterApplier`, to distinguish these listeners from the real `ClusterStateTaskListener`s that are waiting for responses from the `MasterService`. --- .../cluster/service/ClusterApplier.java | 20 +++++++++- .../service/ClusterApplierService.java | 33 ++++++++--------- .../discovery/single/SingleNodeDiscovery.java | 16 ++++++-- .../discovery/zen/ZenDiscovery.java | 16 ++++++-- .../indices/store/IndicesStore.java | 14 ++++++- .../health/ClusterStateHealthTests.java | 11 +++++- .../service/ClusterApplierServiceTests.java | 37 ++++++++++--------- .../single/SingleNodeDiscoveryTests.java | 5 +-- .../discovery/zen/ZenDiscoveryUnitTests.java | 5 +-- .../store/IndicesStoreIntegrationIT.java | 5 ++- .../test/ClusterServiceUtils.java | 9 +++-- .../BlockClusterStateProcessing.java | 13 ++++++- .../SlowClusterStateProcessing.java | 13 ++++++- 13 files changed, 135 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java index 0a2ef347d0665..6be7e56278a70 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.service; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskListener; import java.util.function.Supplier; @@ -38,11 +37,28 @@ public interface ClusterApplier { * @param clusterStateSupplier the cluster state supplier which provides the latest cluster state to apply * @param listener callback that is invoked after cluster state is applied */ - void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterStateTaskListener listener); + void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener); /** * Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs. */ ClusterState.Builder newClusterStateBuilder(); + /** + * Listener for results of cluster state application + */ + interface ClusterApplyListener { + /** + * Called on successful cluster state application + * @param source information where the cluster state came from + */ + void onSuccess(String source); + + /** + * Called on failure during cluster state application + * @param source information where the cluster state came from + * @param e exception that occurred + */ + void onFailure(String source, Exception e); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 01fa5837387c8..ff581ec3fdedd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -141,10 +141,10 @@ protected synchronized void doStart() { } class UpdateTask extends SourcePrioritizedRunnable implements Function { - final ClusterStateTaskListener listener; + final ClusterApplyListener listener; final Function updateFunction; - UpdateTask(Priority priority, String source, ClusterStateTaskListener listener, + UpdateTask(Priority priority, String source, ClusterApplyListener listener, Function updateFunction) { super(priority, source); this.listener = listener; @@ -301,7 +301,7 @@ public void run() { } public void runOnApplierThread(final String source, Consumer clusterStateConsumer, - final ClusterStateTaskListener listener, Priority priority) { + final ClusterApplyListener listener, Priority priority) { submitStateUpdateTask(source, ClusterStateTaskConfig.build(priority), (clusterState) -> { clusterStateConsumer.accept(clusterState); @@ -311,13 +311,13 @@ public void runOnApplierThread(final String source, Consumer clust } public void runOnApplierThread(final String source, Consumer clusterStateConsumer, - final ClusterStateTaskListener listener) { + final ClusterApplyListener listener) { runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH); } @Override public void onNewClusterState(final String source, final Supplier clusterStateSupplier, - final ClusterStateTaskListener listener) { + final ClusterApplyListener listener) { Function applyFunction = currentState -> { ClusterState nextState = clusterStateSupplier.get(); if (nextState != null) { @@ -331,12 +331,12 @@ public void onNewClusterState(final String source, final Supplier private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config, final Function executor, - final ClusterStateTaskListener listener) { + final ClusterApplyListener listener) { if (!lifecycle.started()) { return; } try { - UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor); + UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor); if (config.timeout() != null) { threadPoolExecutor.execute(updateTask, config.timeout(), () -> threadPool.generic().execute( @@ -417,7 +417,7 @@ protected void runTask(UpdateTask task) { } if (previousClusterState == newClusterState) { - task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState); + task.listener.onSuccess(task.source); TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime); warnAboutSlowTaskIfNeeded(executionTime, task.source); @@ -486,7 +486,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl callClusterStateListeners(clusterChangedEvent); - task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState); + task.listener.onSuccess(task.source); } private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) { @@ -511,11 +511,11 @@ private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent) }); } - private static class SafeClusterStateTaskListener implements ClusterStateTaskListener { - private final ClusterStateTaskListener listener; + private static class SafeClusterApplyListener implements ClusterApplyListener { + private final ClusterApplyListener listener; private final Logger logger; - SafeClusterStateTaskListener(ClusterStateTaskListener listener, Logger logger) { + SafeClusterApplyListener(ClusterApplyListener listener, Logger logger) { this.listener = listener; this.logger = logger; } @@ -532,14 +532,13 @@ public void onFailure(String source, Exception e) { } @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { try { - listener.clusterStateProcessed(source, oldState, newState); + listener.onSuccess(source); } catch (Exception e) { logger.error(new ParameterizedMessage( - "exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" + - "{}\nnew cluster state:\n{}", - source, oldState, newState), e); + "exception thrown by listener while notifying of cluster state processed from [{}]", + source), e); } } } diff --git a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java index 94ea33d1a16ab..969de0728cfbb 100644 --- a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery.single; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; @@ -27,6 +28,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; @@ -65,9 +67,9 @@ public synchronized void publish(final ClusterChangedEvent event, clusterState = event.state(); CountDownLatch latch = new CountDownLatch(1); - ClusterStateTaskListener listener = new ClusterStateTaskListener() { + ClusterApplyListener listener = new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); ackListener.onNodeAck(transportService.getLocalNode(), null); } @@ -101,7 +103,15 @@ public synchronized void startInitialJoin() { // apply a fresh cluster state just so that state recovery gets triggered by GatewayService // TODO: give discovery module control over GatewayService clusterState = ClusterState.builder(clusterState).build(); - clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, (source, e) -> {}); + clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, new ClusterApplyListener() { + @Override + public void onSuccess(String source) { + } + + @Override + public void onFailure(String source, Exception e) { + } + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 4621e6769e962..5165802c8ffbb 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -789,9 +790,9 @@ boolean processNextCommittedClusterState(String reason) { clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])", this::clusterState, - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { try { pendingStatesQueue.markAsProcessed(newClusterState); } catch (Exception e) { @@ -997,7 +998,16 @@ protected void rejoin(String reason) { .build(); committedState.set(clusterState); - clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied + + clusterApplier.onNewClusterState(reason, this::clusterState, new ClusterApplyListener() { + @Override + public void onSuccess(String source) { + } + + @Override + public void onFailure(String source, Exception e) { + } + }); // don't wait for state to be applied } } diff --git a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 29f6e7aeeecc1..d0bdfdb5ed692 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; @@ -290,8 +291,17 @@ private void allNodesResponded() { logger.debug(() -> new ParameterizedMessage("{} failed to delete unallocated shard, ignoring", shardId), ex); } }, - (source, e) -> logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard", shardId), e) - ); + new ClusterApplyListener() { + @Override + public void onSuccess(String source) { + logger.trace("{} successfully deleted unallocated shard", shardId); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard", shardId), e); + } + }); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index 0f5f4870ae1bb..ea09d8e5dd40b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.cluster.routing.RoutingTableGenerator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenIntMap; @@ -134,7 +135,15 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte clusterService.getClusterApplierService().onNewClusterState("restore master", () -> ClusterState.builder(currentState) .nodes(DiscoveryNodes.builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNodeId())).build(), - (source, e) -> {}); + new ClusterApplyListener() { + @Override + public void onSuccess(String source) { + } + + @Override + public void onFailure(String source, Exception e) { + } + }); logger.info("--> waiting for listener to be called and cluster state being blocked"); listenerCalled.await(); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 7a8261776bd41..3e7c415db7b96 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -135,9 +136,9 @@ public void testClusterStateUpdateLogging() throws Exception { clusterApplierService.currentTimeOverride = System.nanoTime(); clusterApplierService.runOnApplierThread("test1", currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(), - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } @@ -151,9 +152,9 @@ public void onFailure(String source, Exception e) { clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos(); throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); }, - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { fail(); } @@ -166,9 +167,9 @@ public void onFailure(String source, Exception e) { // We don't check logging for this on since there is no guarantee that it will occur before our check clusterApplierService.runOnApplierThread("test3", currentState -> {}, - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } @@ -216,9 +217,9 @@ public void testLongClusterStateUpdateLogging() throws Exception { clusterApplierService.currentTimeOverride = System.nanoTime(); clusterApplierService.runOnApplierThread("test1", currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(), - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); processedFirstTask.countDown(); } @@ -234,9 +235,9 @@ public void onFailure(String source, Exception e) { clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos(); throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); }, - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { fail(); } @@ -247,9 +248,9 @@ public void onFailure(String source, Exception e) { }); clusterApplierService.runOnApplierThread("test3", currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(), - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } @@ -262,9 +263,9 @@ public void onFailure(String source, Exception e) { // We don't check logging for this on since there is no guarantee that it will occur before our check clusterApplierService.runOnApplierThread("test4", currentState -> {}, - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } @@ -340,10 +341,10 @@ public void testClusterStateApplierCantSampleClusterState() throws InterruptedEx CountDownLatch latch = new CountDownLatch(1); clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } @@ -390,9 +391,9 @@ public void onTimeout(TimeValue timeout) { CountDownLatch latch = new CountDownLatch(1); clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(), - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } diff --git a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java index 23a510a257f21..d045adcaead21 100644 --- a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterApplier; @@ -72,9 +71,9 @@ public ClusterState.Builder newClusterStateBuilder() { @Override public void onNewClusterState(String source, Supplier clusterStateSupplier, - ClusterStateTaskListener listener) { + ClusterApplyListener listener) { clusterState.set(clusterStateSupplier.get()); - listener.clusterStateProcessed(source, clusterState.get(), clusterState.get()); + listener.onSuccess(source); } }); discovery.start(); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 0ecb5a296f570..a2121d3ca4e37 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -314,8 +313,8 @@ public ClusterState.Builder newClusterStateBuilder() { } @Override - public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterStateTaskListener listener) { - listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get()); + public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { + listener.onSuccess(source); } }; ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), diff --git a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index 7f6155979c916..6b57c9757366a 100644 --- a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -446,9 +447,9 @@ public void testShardActiveElseWhere() throws Exception { .routingTable(RoutingTable.builder().add(indexRoutingTableBuilder).build()) .build(); CountDownLatch latch = new CountDownLatch(1); - clusterApplierService.onNewClusterState("test", () -> newState, new ClusterStateTaskListener() { + clusterApplierService.onNewClusterState("test", () -> newState, new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index df1e216f4bbac..626a5fc8f889f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; @@ -72,9 +73,9 @@ public static void setState(ClusterApplierService executor, ClusterState cluster CountDownLatch latch = new CountDownLatch(1); AtomicReference exception = new AtomicReference<>(); executor.onNewClusterState("test setting state", - () -> ClusterState.builder(clusterState).version(clusterState.version() + 1).build(), new ClusterStateTaskListener() { + () -> ClusterState.builder(clusterState).version(clusterState.version() + 1).build(), new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } @@ -163,9 +164,9 @@ public static BiConsumer createClusterStatePub CountDownLatch latch = new CountDownLatch(1); AtomicReference ex = new AtomicReference<>(); clusterApplier.onNewClusterState("mock_publish_to_self[" + event.source() + "]", () -> event.state(), - new ClusterStateTaskListener() { + new ClusterApplyListener() { @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + public void onSuccess(String source) { latch.countDown(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java index f144cb0b118a0..6b5dada5d108d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java @@ -19,6 +19,7 @@ package org.elasticsearch.test.disruption; import org.apache.logging.log4j.core.util.Throwables; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; @@ -68,8 +69,16 @@ public void startDisrupting() { Throwables.rethrow(e); } } - }, (source, e) -> logger.error("unexpected error during disruption", e), - Priority.IMMEDIATE); + }, new ClusterApplyListener() { + @Override + public void onSuccess(String source) { + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("unexpected error during disruption", e); + } + }, Priority.IMMEDIATE); try { started.await(); } catch (InterruptedException e) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java index 03ffe1d690ae5..ff043fd132289 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java @@ -19,6 +19,7 @@ package org.elasticsearch.test.disruption; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; @@ -116,8 +117,16 @@ private boolean interruptClusterStateProcessing(final TimeValue duration) throws } catch (InterruptedException e) { ExceptionsHelper.reThrowIfNotNull(e); } - }, (source, e) -> countDownLatch.countDown(), - Priority.IMMEDIATE); + }, new ClusterApplyListener() { + @Override + public void onSuccess(String source) { + } + + @Override + public void onFailure(String source, Exception e) { + countDownLatch.countDown(); + } + }, Priority.IMMEDIATE); try { countDownLatch.await(); } catch (InterruptedException e) { From ca622b42e89cbc086e40657b15240c3c291bb2e1 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 23 May 2018 14:18:38 +0100 Subject: [PATCH 2/5] Introduce default empty onSuccess method --- .../cluster/service/ClusterApplier.java | 3 ++- .../cluster/service/ClusterApplierService.java | 1 - .../discovery/single/SingleNodeDiscovery.java | 14 +------------- .../discovery/zen/ZenDiscovery.java | 17 +++-------------- .../indices/store/IndicesStore.java | 12 +----------- .../cluster/health/ClusterStateHealthTests.java | 11 +---------- .../elasticsearch/test/ClusterServiceUtils.java | 1 - .../disruption/BlockClusterStateProcessing.java | 11 +---------- .../disruption/SlowClusterStateProcessing.java | 11 +---------- 9 files changed, 10 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java index 6be7e56278a70..c587ab272e903 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java @@ -52,7 +52,8 @@ interface ClusterApplyListener { * Called on successful cluster state application * @param source information where the cluster state came from */ - void onSuccess(String source); + default void onSuccess(String source) { + } /** * Called on failure during cluster state application diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index ff581ec3fdedd..adf6dc3556205 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateTaskConfig; -import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.TimeoutClusterStateListener; diff --git a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java index 969de0728cfbb..b7a56ca29e906 100644 --- a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java @@ -20,10 +20,8 @@ package org.elasticsearch.discovery.single; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -34,8 +32,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryStats; -import org.elasticsearch.discovery.zen.PendingClusterStateStats; -import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -103,15 +99,7 @@ public synchronized void startInitialJoin() { // apply a fresh cluster state just so that state recovery gets triggered by GatewayService // TODO: give discovery module control over GatewayService clusterState = ClusterState.builder(clusterState).build(); - clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, new ClusterApplyListener() { - @Override - public void onSuccess(String source) { - } - - @Override - public void onFailure(String source, Exception e) { - } - }); + clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, (source, e) -> { }); } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 5165802c8ffbb..9e2db94a607f0 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -21,9 +21,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -35,12 +32,11 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -55,6 +51,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; @@ -999,15 +996,7 @@ protected void rejoin(String reason) { committedState.set(clusterState); - clusterApplier.onNewClusterState(reason, this::clusterState, new ClusterApplyListener() { - @Override - public void onSuccess(String source) { - } - - @Override - public void onFailure(String source, Exception e) { - } - }); // don't wait for state to be applied + clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied } } diff --git a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index d0bdfdb5ed692..b05813e88a8a4 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -291,17 +291,7 @@ private void allNodesResponded() { logger.debug(() -> new ParameterizedMessage("{} failed to delete unallocated shard, ignoring", shardId), ex); } }, - new ClusterApplyListener() { - @Override - public void onSuccess(String source) { - logger.trace("{} successfully deleted unallocated shard", shardId); - } - - @Override - public void onFailure(String source, Exception e) { - logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard", shardId), e); - } - }); + (source, e) -> logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard", shardId), e)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index ea09d8e5dd40b..92f5b11daa521 100644 --- a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -40,7 +40,6 @@ import org.elasticsearch.cluster.routing.RoutingTableGenerator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenIntMap; @@ -135,15 +134,7 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte clusterService.getClusterApplierService().onNewClusterState("restore master", () -> ClusterState.builder(currentState) .nodes(DiscoveryNodes.builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNodeId())).build(), - new ClusterApplyListener() { - @Override - public void onSuccess(String source) { - } - - @Override - public void onFailure(String source, Exception e) { - } - }); + (source, e) -> { }); logger.info("--> waiting for listener to be called and cluster state being blocked"); listenerCalled.await(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 626a5fc8f889f..8c4076e327d70 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java index 6b5dada5d108d..bbcb30dc24395 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java @@ -69,16 +69,7 @@ public void startDisrupting() { Throwables.rethrow(e); } } - }, new ClusterApplyListener() { - @Override - public void onSuccess(String source) { - } - - @Override - public void onFailure(String source, Exception e) { - logger.error("unexpected error during disruption", e); - } - }, Priority.IMMEDIATE); + }, (source, e) -> logger.error("unexpected error during disruption", e), Priority.IMMEDIATE); try { started.await(); } catch (InterruptedException e) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java index ff043fd132289..1981ba97bfcf7 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java @@ -117,16 +117,7 @@ private boolean interruptClusterStateProcessing(final TimeValue duration) throws } catch (InterruptedException e) { ExceptionsHelper.reThrowIfNotNull(e); } - }, new ClusterApplyListener() { - @Override - public void onSuccess(String source) { - } - - @Override - public void onFailure(String source, Exception e) { - countDownLatch.countDown(); - } - }, Priority.IMMEDIATE); + }, (source, e) -> countDownLatch.countDown(), Priority.IMMEDIATE); try { countDownLatch.await(); } catch (InterruptedException e) { From bfbb6292f08d22a02064f317dd9996376ae9ca64 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 23 May 2018 14:21:08 +0100 Subject: [PATCH 3/5] Whitespace and imports --- .../elasticsearch/discovery/single/SingleNodeDiscovery.java | 2 +- .../java/org/elasticsearch/discovery/zen/ZenDiscovery.java | 2 +- .../java/org/elasticsearch/indices/store/IndicesStore.java | 4 ++-- .../elasticsearch/cluster/health/ClusterStateHealthTests.java | 2 +- .../test/disruption/BlockClusterStateProcessing.java | 4 ++-- .../test/disruption/SlowClusterStateProcessing.java | 4 ++-- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java index b7a56ca29e906..cd775e29f5a2f 100644 --- a/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java @@ -99,7 +99,7 @@ public synchronized void startInitialJoin() { // apply a fresh cluster state just so that state recovery gets triggered by GatewayService // TODO: give discovery module control over GatewayService clusterState = ClusterState.builder(clusterState).build(); - clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, (source, e) -> { }); + clusterApplier.onNewClusterState("single-node-start-initial-join", () -> clusterState, (source, e) -> {}); } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 9e2db94a607f0..047b4b6bdfb6f 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -51,7 +52,6 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoveryStats; diff --git a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index b05813e88a8a4..29f6e7aeeecc1 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/server/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -32,7 +32,6 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; @@ -291,7 +290,8 @@ private void allNodesResponded() { logger.debug(() -> new ParameterizedMessage("{} failed to delete unallocated shard, ignoring", shardId), ex); } }, - (source, e) -> logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard", shardId), e)); + (source, e) -> logger.error(() -> new ParameterizedMessage("{} unexpected error during deletion of unallocated shard", shardId), e) + ); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java index 92f5b11daa521..0f5f4870ae1bb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/health/ClusterStateHealthTests.java @@ -134,7 +134,7 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte clusterService.getClusterApplierService().onNewClusterState("restore master", () -> ClusterState.builder(currentState) .nodes(DiscoveryNodes.builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNodeId())).build(), - (source, e) -> { }); + (source, e) -> {}); logger.info("--> waiting for listener to be called and cluster state being blocked"); listenerCalled.await(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java index bbcb30dc24395..f144cb0b118a0 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/BlockClusterStateProcessing.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.disruption; import org.apache.logging.log4j.core.util.Throwables; -import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; @@ -69,7 +68,8 @@ public void startDisrupting() { Throwables.rethrow(e); } } - }, (source, e) -> logger.error("unexpected error during disruption", e), Priority.IMMEDIATE); + }, (source, e) -> logger.error("unexpected error during disruption", e), + Priority.IMMEDIATE); try { started.await(); } catch (InterruptedException e) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java index 1981ba97bfcf7..03ffe1d690ae5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/SlowClusterStateProcessing.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.disruption; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; @@ -117,7 +116,8 @@ private boolean interruptClusterStateProcessing(final TimeValue duration) throws } catch (InterruptedException e) { ExceptionsHelper.reThrowIfNotNull(e); } - }, (source, e) -> countDownLatch.countDown(), Priority.IMMEDIATE); + }, (source, e) -> countDownLatch.countDown(), + Priority.IMMEDIATE); try { countDownLatch.await(); } catch (InterruptedException e) { From f95f05c1cc1d83a1755232e3ba4b123696b3a44e Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 23 May 2018 14:27:19 +0100 Subject: [PATCH 4/5] Whitespace --- .../elasticsearch/cluster/service/ClusterApplierService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index adf6dc3556205..2fb7c25671c88 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -536,8 +536,7 @@ public void onSuccess(String source) { listener.onSuccess(source); } catch (Exception e) { logger.error(new ParameterizedMessage( - "exception thrown by listener while notifying of cluster state processed from [{}]", - source), e); + "exception thrown by listener while notifying of cluster state processed from [{}]", source), e); } } } From 64fe7a76147020aaf18627271b5209e7647b945d Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 23 May 2018 14:28:07 +0100 Subject: [PATCH 5/5] Whitespace --- .../main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 047b4b6bdfb6f..a4144306eb058 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -995,7 +995,6 @@ protected void rejoin(String reason) { .build(); committedState.set(clusterState); - clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied } }