From f1d00c1faede1cf847d87d0cb855cf5f01138260 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 25 Jan 2019 16:20:23 +0100 Subject: [PATCH 1/5] Extract TransportRequestDeduplication from ShardStateAction * Extracted the logic for master request duplication so it can be reused by the snapshotting logic * Removed custom listener used by `ShardStateAction` to not leak these into future users of this class * Changed semantics slightly to get rid of redundant instantiations of the composite listener * Relates #37686 --- .../TransportReplicationAction.java | 6 +- .../action/shard/ShardStateAction.java | 141 ++---------------- .../cluster/IndicesClusterStateService.java | 3 +- .../TransportRequestDeduplicator.java | 113 ++++++++++++++ .../action/shard/ShardStateActionTests.java | 93 +++--------- .../discovery/ClusterDisruptionIT.java | 7 +- .../TransportRequestDeduplicatorTests.java | 99 ++++++++++++ 7 files changed, 253 insertions(+), 209 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java create mode 100644 server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 4894ca1f772c4..c0f0278479a0c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -1192,12 +1192,12 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R onSuccess.run(); } - protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess, + protected final ActionListener createShardActionListener(final Runnable onSuccess, final Consumer onPrimaryDemoted, final Consumer onIgnoredFailure) { - return new ShardStateAction.Listener() { + return new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { onSuccess.run(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index e5c46cbb0ee08..ba2e56eff34f9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -48,18 +49,17 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -71,7 +71,6 @@ import java.util.Locale; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.function.Predicate; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -89,7 +88,7 @@ public class ShardStateAction { // a list of shards that failed during replication // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final ConcurrentMap remoteFailedShardsCache = ConcurrentCollections.newConcurrentMap(); + private final TransportRequestDeduplicator remoteFailedShardsCache = new TransportRequestDeduplicator<>(); @Inject public ShardStateAction(ClusterService clusterService, TransportService transportService, @@ -106,7 +105,7 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor } private void sendShardAction(final String actionName, final ClusterState currentState, - final TransportRequest request, final Listener listener) { + final TransportRequest request, final ActionListener listener) { ClusterStateObserver observer = new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext()); DiscoveryNode masterNode = currentState.nodes().getMasterNode(); @@ -120,7 +119,7 @@ private void sendShardAction(final String actionName, final ClusterState current actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleResponse(TransportResponse.Empty response) { - listener.onSuccess(); + listener.onResponse(null); } @Override @@ -163,32 +162,12 @@ private static boolean isMasterChannelException(TransportException exp) { * @param listener callback upon completion of the request */ public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message, - @Nullable final Exception failure, Listener listener) { + @Nullable final Exception failure, ActionListener listener) { assert primaryTerm > 0L : "primary term should be strictly positive"; final FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); - final CompositeListener compositeListener = new CompositeListener(listener); - final CompositeListener existingListener = remoteFailedShardsCache.putIfAbsent(shardEntry, compositeListener); - if (existingListener == null) { - sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, new Listener() { - @Override - public void onSuccess() { - try { - compositeListener.onSuccess(); - } finally { - remoteFailedShardsCache.remove(shardEntry); - } - } - @Override - public void onFailure(Exception e) { - try { - compositeListener.onFailure(e); - } finally { - remoteFailedShardsCache.remove(shardEntry); - } - } - }); - } else { - existingListener.addListener(listener); + final ActionListener transportListener = remoteFailedShardsCache.register(shardEntry, listener); + if (transportListener != null) { + sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, transportListener); } } @@ -200,7 +179,7 @@ int remoteShardFailedCacheSize() { * Send a shard failed request to the master node to update the cluster state when a shard on the local node failed. */ public void localShardFailed(final ShardRouting shardRouting, final String message, - @Nullable final Exception failure, Listener listener) { + @Nullable final Exception failure, ActionListener listener) { localShardFailed(shardRouting, message, failure, listener, clusterService.state()); } @@ -208,7 +187,7 @@ public void localShardFailed(final ShardRouting shardRouting, final String messa * Send a shard failed request to the master node to update the cluster state when a shard on the local node failed. */ public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure, - Listener listener, final ClusterState currentState) { + ActionListener listener, final ClusterState currentState) { FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), 0L, message, failure, true); sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener); @@ -216,7 +195,8 @@ public void localShardFailed(final ShardRouting shardRouting, final String messa // visible for testing protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, - TransportRequest request, Listener listener, Predicate changePredicate) { + TransportRequest request, ActionListener listener, + Predicate changePredicate) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { @@ -494,10 +474,10 @@ public int hashCode() { } } - public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { + public void shardStarted(ShardRouting shardRouting, String message, ActionListener listener) { shardStarted(shardRouting, message, listener, clusterService.state()); } - public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) { + public void shardStarted(ShardRouting shardRouting, String message, ActionListener listener, ClusterState currentState) { StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message); sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener); } @@ -641,97 +621,6 @@ public String toString() { } } - public interface Listener { - - default void onSuccess() { - } - - /** - * Notification for non-channel exceptions that are not handled - * by {@link ShardStateAction}. - * - * The exceptions that are handled by {@link ShardStateAction} - * are: - * - {@link NotMasterException} - * - {@link NodeDisconnectedException} - * - {@link FailedToCommitClusterStateException} - * - * Any other exception is communicated to the requester via - * this notification. - * - * @param e the unexpected cause of the failure on the master - */ - default void onFailure(final Exception e) { - } - - } - - /** - * A composite listener that allows registering multiple listeners dynamically. - */ - static final class CompositeListener implements Listener { - private boolean isNotified = false; - private Exception failure = null; - private final List listeners = new ArrayList<>(); - - CompositeListener(Listener listener) { - listeners.add(listener); - } - - void addListener(Listener listener) { - final boolean ready; - synchronized (this) { - ready = this.isNotified; - if (ready == false) { - listeners.add(listener); - } - } - if (ready) { - if (failure != null) { - listener.onFailure(failure); - } else { - listener.onSuccess(); - } - } - } - - private void onCompleted(Exception failure) { - synchronized (this) { - this.failure = failure; - this.isNotified = true; - } - RuntimeException firstException = null; - for (Listener listener : listeners) { - try { - if (failure != null) { - listener.onFailure(failure); - } else { - listener.onSuccess(); - } - } catch (RuntimeException innerEx) { - if (firstException == null) { - firstException = innerEx; - } else { - firstException.addSuppressed(innerEx); - } - } - } - if (firstException != null) { - throw firstException; - } - } - - @Override - public void onSuccess() { - onCompleted(null); - } - - @Override - public void onFailure(Exception failure) { - onCompleted(failure); - } - } - public static class NoLongerPrimaryShardException extends ElasticsearchException { public NoLongerPrimaryShardException(ShardId shardId, String msg) { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index c8afe92be8d37..a2af50ac81da0 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -106,8 +106,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final ShardStateAction shardStateAction; private final NodeMappingRefreshAction nodeMappingRefreshAction; - private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() { - }; + private static final ActionListener SHARD_STATE_ACTION_LISTENER = ActionListener.wrap(() -> {}); private final Settings settings; // a list of shards that failed during recovery diff --git a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java b/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java new file mode 100644 index 0000000000000..017b566780e20 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +/** + * Deduplicator for {@link TransportRequest}s that keeps track of {@link TransportRequest}s that should + * not be sent in parallel. + * @param Transport Request Class + */ +public final class TransportRequestDeduplicator { + + private final ConcurrentMap requests = ConcurrentCollections.newConcurrentMap(); + + /** + * Register a listener for the given request with the deduplicator. + * If the request is not yet registered with the deduplicator it will return an {@link ActionListener} + * that must be completed by the called when the request completes. If the request is already known to + * the deduplicator it will keep track of the given listener and invoke it when the listener returned + * for the first invocation with the request is completed. + * The caller of this method should therefore execute the transport request if returned an instance + * of {@link ActionListener} and invoke that listener when completing the request and do nothing when + * being returned {@code null}. + * @param request Request to deduplicate + * @param listener Listener to invoke on request completion + * @return Listener that must be invoked by the caller or null when the request is already known + */ + public ActionListener register(T request, ActionListener listener) { + return requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); + } + + public int size() { + return requests.size(); + } + + private final class CompositeListener implements ActionListener { + + private final List> listeners = new ArrayList<>(); + + private final T request; + + private boolean isNotified; + private Exception failure; + + CompositeListener(T request) { + this.request = request; + } + + CompositeListener addListener(ActionListener listener) { + synchronized (this) { + if (this.isNotified == false) { + listeners.add(listener); + return listeners.size() == 1 ? this : null; + } + } + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onResponse(null); + } + return null; + } + + private void onCompleted(Exception failure) { + synchronized (this) { + this.failure = failure; + this.isNotified = true; + } + try { + if (failure == null) { + ActionListener.onResponse(listeners, null); + } else { + ActionListener.onFailure(listeners, failure); + } + } finally { + requests.remove(request); + } + } + + @Override + public void onResponse(final Void aVoid) { + onCompleted(null); + } + + @Override + public void onFailure(Exception failure) { + onCompleted(failure); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 2a994e2861836..4d7dd68bc844b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -73,7 +74,6 @@ import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -107,7 +107,7 @@ public void setOnAfterWaitForNewMasterAndRetry(Runnable onAfterWaitForNewMasterA @Override protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, TransportRequest request, - Listener listener, Predicate changePredicate) { + ActionListener listener, Predicate changePredicate) { onBeforeWaitForNewMasterAndRetry.run(); super.waitForNewMasterAndRetry(actionName, observer, request, listener, changePredicate); onAfterWaitForNewMasterAndRetry.run(); @@ -160,9 +160,9 @@ public void testSuccess() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); ShardRouting shardRouting = getRandomShardRouting(index); - shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(shardRouting, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { success.set(true); latch.countDown(); } @@ -209,9 +209,9 @@ public void testNoMaster() throws InterruptedException { }); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { success.set(true); latch.countDown(); } @@ -258,9 +258,9 @@ public void testMasterChannelException() throws InterruptedException { setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { success.set(true); latch.countDown(); } @@ -294,9 +294,9 @@ public void testUnhandledFailure() { AtomicBoolean failure = new AtomicBoolean(); ShardRouting failedShard = getRandomShardRouting(index); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { failure.set(false); assert false; } @@ -326,9 +326,9 @@ public void testShardNotFound() throws InterruptedException { ShardRouting failedShard = getRandomShardRouting(index); RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build(); setState(clusterService, ClusterState.builder(clusterService.state()).routingTable(routingTable)); - shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.localShardFailed(failedShard, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { success.set(true); latch.countDown(); } @@ -362,9 +362,9 @@ public void testNoLongerPrimaryShardException() throws InterruptedException { assertThat(primaryTerm, greaterThanOrEqualTo(1L)); shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), primaryTerm + 1, randomBoolean(), "test", getSimulatedFailure(), - new ShardStateAction.Listener() { + new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { failure.set(null); latch.countDown(); } @@ -397,9 +397,9 @@ public void testCacheRemoteShardFailed() throws Exception { long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); for (int i = 0; i < numListeners; i++) { shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), - primaryTerm, markAsStale, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + primaryTerm, markAsStale, "test", getSimulatedFailure(), new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { latch.countDown(); } @Override @@ -448,9 +448,9 @@ public void testRemoteShardFailedConcurrently() throws Exception { ShardRouting failedShard = randomFrom(failedShards); shardStateAction.remoteShardFailed(failedShard.shardId(), failedShard.allocationId().getId(), randomLongBetween(1, Long.MAX_VALUE), randomBoolean(), "test", getSimulatedFailure(), - new ShardStateAction.Listener() { + new ActionListener() { @Override - public void onSuccess() { + public void onResponse(Void aVoid) { notifiedResponses.incrementAndGet(); } @Override @@ -543,61 +543,4 @@ BytesReference serialize(Writeable writeable, Version version) throws IOExceptio return out.bytes(); } } - - public void testCompositeListener() throws Exception { - AtomicInteger successCount = new AtomicInteger(); - AtomicInteger failureCount = new AtomicInteger(); - Exception failure = randomBoolean() ? getSimulatedFailure() : null; - ShardStateAction.CompositeListener compositeListener = new ShardStateAction.CompositeListener(new ShardStateAction.Listener() { - @Override - public void onSuccess() { - successCount.incrementAndGet(); - } - @Override - public void onFailure(Exception e) { - assertThat(e, sameInstance(failure)); - failureCount.incrementAndGet(); - } - }); - int iterationsPerThread = scaledRandomIntBetween(100, 1000); - Thread[] threads = new Thread[between(1, 4)]; - Phaser barrier = new Phaser(threads.length + 1); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread(() -> { - barrier.arriveAndAwaitAdvance(); - for (int n = 0; n < iterationsPerThread; n++) { - compositeListener.addListener(new ShardStateAction.Listener() { - @Override - public void onSuccess() { - successCount.incrementAndGet(); - } - @Override - public void onFailure(Exception e) { - assertThat(e, sameInstance(failure)); - failureCount.incrementAndGet(); - } - }); - } - }); - threads[i].start(); - } - barrier.arriveAndAwaitAdvance(); - if (failure != null) { - compositeListener.onFailure(failure); - } else { - compositeListener.onSuccess(); - } - for (Thread t : threads) { - t.join(); - } - assertBusy(() -> { - if (failure != null) { - assertThat(successCount.get(), equalTo(0)); - assertThat(failureCount.get(), equalTo(threads.length*iterationsPerThread + 1)); - } else { - assertThat(successCount.get(), equalTo(threads.length*iterationsPerThread + 1)); - assertThat(failureCount.get(), equalTo(0)); - } - }); - } } diff --git a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index d94c34c7b33eb..b7dcb6f504d1a 100644 --- a/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; @@ -316,10 +317,10 @@ public void testSendingShardFailure() throws Exception { setDisruptionScheme(networkDisruption); networkDisruption.startDisrupting(); - service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new - ShardStateAction.Listener() { + service.localShardFailed(failedShard, "simulated", new CorruptIndexException("simulated", (String) null), + new ActionListener() { @Override - public void onSuccess() { + public void onResponse(final Void aVoid) { success.set(true); latch.countDown(); } diff --git a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java new file mode 100644 index 0000000000000..803bd95cdd9a9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; + +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; + +public class TransportRequestDeduplicatorTests extends ESTestCase { + + public void testRequestDeduplication() throws Exception { + AtomicInteger successCount; + successCount = new AtomicInteger(); + AtomicInteger failureCount = new AtomicInteger(); + Exception failure = randomBoolean() ? new TransportException("simulated") : null; + final TransportRequest request = new TransportRequest() { + @Override + public void setParentTask(final TaskId taskId) { + } + }; + final TransportRequestDeduplicator deduplicator = new TransportRequestDeduplicator<>(); + final ActionListener listener = deduplicator.register(request, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + successCount.incrementAndGet(); + } + + @Override + public void onFailure(Exception e) { + assertThat(e, sameInstance(failure)); + failureCount.incrementAndGet(); + } + }); + int iterationsPerThread = scaledRandomIntBetween(100, 1000); + Thread[] threads = new Thread[between(1, 4)]; + Phaser barrier = new Phaser(threads.length + 1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + barrier.arriveAndAwaitAdvance(); + for (int n = 0; n < iterationsPerThread; n++) { + deduplicator.register(request, new ActionListener() { + @Override + public void onResponse(final Void aVoid) { + successCount.incrementAndGet(); + } + + @Override + public void onFailure(Exception e) { + assertThat(e, sameInstance(failure)); + failureCount.incrementAndGet(); + } + }); + } + }); + threads[i].start(); + } + barrier.arriveAndAwaitAdvance(); + for (Thread t : threads) { + t.join(); + } + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onResponse(null); + } + assertBusy(() -> { + if (failure != null) { + assertThat(successCount.get(), equalTo(0)); + assertThat(failureCount.get(), equalTo(threads.length * iterationsPerThread + 1)); + } else { + assertThat(successCount.get(), equalTo(threads.length * iterationsPerThread + 1)); + assertThat(failureCount.get(), equalTo(0)); + } + }); + } + +} From 2f7227474194924a7eabe26eb18f78dc4f2c195f Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 05:58:12 +0100 Subject: [PATCH 2/5] CR: fix type, renaming, change method signature --- .../action/shard/ShardStateAction.java | 12 +++++------ .../TransportRequestDeduplicator.java | 20 +++++++++---------- .../TransportRequestDeduplicatorTests.java | 11 ++++++---- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 3669e34167e1b..071885202c458 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -88,7 +88,7 @@ public class ShardStateAction { // a list of shards that failed during replication // we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard. - private final TransportRequestDeduplicator remoteFailedShardsCache = new TransportRequestDeduplicator<>(); + private final TransportRequestDeduplicator remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>(); @Inject public ShardStateAction(ClusterService clusterService, TransportService transportService, @@ -164,15 +164,13 @@ private static boolean isMasterChannelException(TransportException exp) { public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message, @Nullable final Exception failure, ActionListener listener) { assert primaryTerm > 0L : "primary term should be strictly positive"; - final FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale); - final ActionListener transportListener = remoteFailedShardsCache.register(shardEntry, listener); - if (transportListener != null) { - sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, transportListener); - } + remoteFailedShardsDeduplicator.executeOnce( + new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale), listener, + (req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener)); } int remoteShardFailedCacheSize() { - return remoteFailedShardsCache.size(); + return remoteFailedShardsDeduplicator.size(); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java b/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java index 017b566780e20..c4bb2c1720788 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; /** * Deduplicator for {@link TransportRequest}s that keeps track of {@link TransportRequest}s that should @@ -37,19 +38,18 @@ public final class TransportRequestDeduplicator { /** * Register a listener for the given request with the deduplicator. - * If the request is not yet registered with the deduplicator it will return an {@link ActionListener} - * that must be completed by the called when the request completes. If the request is already known to - * the deduplicator it will keep track of the given listener and invoke it when the listener returned - * for the first invocation with the request is completed. - * The caller of this method should therefore execute the transport request if returned an instance - * of {@link ActionListener} and invoke that listener when completing the request and do nothing when - * being returned {@code null}. + * If the request is not yet registered with the deduplicator it will invoke the passed callback with an {@link ActionListener} + * that must be completed by the caller when the request completes. If the request is already known to the deduplicator it will keep + * track of the given listener and invoke it when the listener passed to the callback on first invocation is completed. * @param request Request to deduplicate * @param listener Listener to invoke on request completion - * @return Listener that must be invoked by the caller or null when the request is already known + * @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator */ - public ActionListener register(T request, ActionListener listener) { - return requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); + public void executeOnce(T request, ActionListener listener, BiConsumer> callback) { + ActionListener completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener); + if (completionListener != null) { + callback.accept(request, completionListener); + } } public int size() { diff --git a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java index 803bd95cdd9a9..f727ed3785364 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicInteger; @@ -41,7 +42,8 @@ public void setParentTask(final TaskId taskId) { } }; final TransportRequestDeduplicator deduplicator = new TransportRequestDeduplicator<>(); - final ActionListener listener = deduplicator.register(request, new ActionListener() { + final CompletableFuture> listenerFuture = new CompletableFuture<>(); + deduplicator.executeOnce(request, new ActionListener() { @Override public void onResponse(Void aVoid) { successCount.incrementAndGet(); @@ -52,7 +54,7 @@ public void onFailure(Exception e) { assertThat(e, sameInstance(failure)); failureCount.incrementAndGet(); } - }); + }, (req, reqListener) -> listenerFuture.complete(reqListener)); int iterationsPerThread = scaledRandomIntBetween(100, 1000); Thread[] threads = new Thread[between(1, 4)]; Phaser barrier = new Phaser(threads.length + 1); @@ -60,7 +62,7 @@ public void onFailure(Exception e) { threads[i] = new Thread(() -> { barrier.arriveAndAwaitAdvance(); for (int n = 0; n < iterationsPerThread; n++) { - deduplicator.register(request, new ActionListener() { + deduplicator.executeOnce(request, new ActionListener() { @Override public void onResponse(final Void aVoid) { successCount.incrementAndGet(); @@ -71,7 +73,7 @@ public void onFailure(Exception e) { assertThat(e, sameInstance(failure)); failureCount.incrementAndGet(); } - }); + }, (req, reqListener) -> fail("Callback should not be invoked more than once.")); } }); threads[i].start(); @@ -80,6 +82,7 @@ public void onFailure(Exception e) { for (Thread t : threads) { t.join(); } + final ActionListener listener = listenerFuture.get(); if (failure != null) { listener.onFailure(failure); } else { From 1dca2ca36ea5373133475d1a389a84f08f4649d5 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 06:01:28 +0100 Subject: [PATCH 3/5] adjust test --- .../TransportRequestDeduplicatorTests.java | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java index f727ed3785364..b496de7998b64 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java @@ -18,11 +18,11 @@ */ package org.elasticsearch.transport; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicInteger; @@ -42,19 +42,7 @@ public void setParentTask(final TaskId taskId) { } }; final TransportRequestDeduplicator deduplicator = new TransportRequestDeduplicator<>(); - final CompletableFuture> listenerFuture = new CompletableFuture<>(); - deduplicator.executeOnce(request, new ActionListener() { - @Override - public void onResponse(Void aVoid) { - successCount.incrementAndGet(); - } - - @Override - public void onFailure(Exception e) { - assertThat(e, sameInstance(failure)); - failureCount.incrementAndGet(); - } - }, (req, reqListener) -> listenerFuture.complete(reqListener)); + final SetOnce> listenerHolder = new SetOnce<>(); int iterationsPerThread = scaledRandomIntBetween(100, 1000); Thread[] threads = new Thread[between(1, 4)]; Phaser barrier = new Phaser(threads.length + 1); @@ -64,7 +52,7 @@ public void onFailure(Exception e) { for (int n = 0; n < iterationsPerThread; n++) { deduplicator.executeOnce(request, new ActionListener() { @Override - public void onResponse(final Void aVoid) { + public void onResponse(Void aVoid) { successCount.incrementAndGet(); } @@ -73,7 +61,7 @@ public void onFailure(Exception e) { assertThat(e, sameInstance(failure)); failureCount.incrementAndGet(); } - }, (req, reqListener) -> fail("Callback should not be invoked more than once.")); + }, (req, reqListener) -> listenerHolder.set(reqListener)); } }); threads[i].start(); @@ -82,7 +70,7 @@ public void onFailure(Exception e) { for (Thread t : threads) { t.join(); } - final ActionListener listener = listenerFuture.get(); + final ActionListener listener = listenerHolder.get(); if (failure != null) { listener.onFailure(failure); } else { @@ -91,9 +79,9 @@ public void onFailure(Exception e) { assertBusy(() -> { if (failure != null) { assertThat(successCount.get(), equalTo(0)); - assertThat(failureCount.get(), equalTo(threads.length * iterationsPerThread + 1)); + assertThat(failureCount.get(), equalTo(threads.length * iterationsPerThread)); } else { - assertThat(successCount.get(), equalTo(threads.length * iterationsPerThread + 1)); + assertThat(successCount.get(), equalTo(threads.length * iterationsPerThread)); assertThat(failureCount.get(), equalTo(0)); } }); From 47598ee7079dada5709a5af32ecb5cf90d1209d1 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 06:06:26 +0100 Subject: [PATCH 4/5] check dedup size --- .../transport/TransportRequestDeduplicatorTests.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java index b496de7998b64..ab178134995a7 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportRequestDeduplicatorTests.java @@ -32,8 +32,7 @@ public class TransportRequestDeduplicatorTests extends ESTestCase { public void testRequestDeduplication() throws Exception { - AtomicInteger successCount; - successCount = new AtomicInteger(); + AtomicInteger successCount = new AtomicInteger(); AtomicInteger failureCount = new AtomicInteger(); Exception failure = randomBoolean() ? new TransportException("simulated") : null; final TransportRequest request = new TransportRequest() { @@ -71,11 +70,13 @@ public void onFailure(Exception e) { t.join(); } final ActionListener listener = listenerHolder.get(); + assertThat(deduplicator.size(), equalTo(1)); if (failure != null) { listener.onFailure(failure); } else { listener.onResponse(null); } + assertThat(deduplicator.size(), equalTo(0)); assertBusy(() -> { if (failure != null) { assertThat(successCount.get(), equalTo(0)); From beb4962475456adaabf923cf86fc5150c7442907 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 30 Jan 2019 16:30:49 +0100 Subject: [PATCH 5/5] CR: reword JDoc --- .../transport/TransportRequestDeduplicator.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java b/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java index c4bb2c1720788..d929ef34ce2c3 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportRequestDeduplicator.java @@ -37,9 +37,10 @@ public final class TransportRequestDeduplicator { private final ConcurrentMap requests = ConcurrentCollections.newConcurrentMap(); /** - * Register a listener for the given request with the deduplicator. - * If the request is not yet registered with the deduplicator it will invoke the passed callback with an {@link ActionListener} - * that must be completed by the caller when the request completes. If the request is already known to the deduplicator it will keep + * Ensures a given request not executed multiple times when another equal request is already in-flight. + * If the request is not yet known to the deduplicator it will invoke the passed callback with an {@link ActionListener} + * that must be completed by the caller when the request completes. Once that listener is completed the request will be removed from + * the deduplicator's internal state. If the request is already known to the deduplicator it will keep * track of the given listener and invoke it when the listener passed to the callback on first invocation is completed. * @param request Request to deduplicate * @param listener Listener to invoke on request completion