Skip to content

Commit a070b8a

Browse files
Extract TransportRequestDeduplication from ShardStateAction (#37870)
* Extracted the logic for master request duplication so it can be reused by the snapshotting logic * Removed custom listener used by `ShardStateAction` to not leak these into future users of this class * Changed semantics slightly to get rid of redundant instantiations of the composite listener * Relates #37686
1 parent 6500b0c commit a070b8a

File tree

7 files changed

+241
-206
lines changed

7 files changed

+241
-206
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,12 +1192,12 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, R
11921192
onSuccess.run();
11931193
}
11941194

1195-
protected final ShardStateAction.Listener createShardActionListener(final Runnable onSuccess,
1195+
protected final ActionListener<Void> createShardActionListener(final Runnable onSuccess,
11961196
final Consumer<Exception> onPrimaryDemoted,
11971197
final Consumer<Exception> onIgnoredFailure) {
1198-
return new ShardStateAction.Listener() {
1198+
return new ActionListener<Void>() {
11991199
@Override
1200-
public void onSuccess() {
1200+
public void onResponse(Void aVoid) {
12011201
onSuccess.run();
12021202
}
12031203

server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

Lines changed: 16 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.ElasticsearchException;
2626
import org.elasticsearch.ExceptionsHelper;
2727
import org.elasticsearch.Version;
28+
import org.elasticsearch.action.ActionListener;
2829
import org.elasticsearch.cluster.ClusterChangedEvent;
2930
import org.elasticsearch.cluster.ClusterState;
3031
import org.elasticsearch.cluster.ClusterStateObserver;
@@ -48,18 +49,17 @@
4849
import org.elasticsearch.common.io.stream.StreamInput;
4950
import org.elasticsearch.common.io.stream.StreamOutput;
5051
import org.elasticsearch.common.unit.TimeValue;
51-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
5252
import org.elasticsearch.index.shard.ShardId;
5353
import org.elasticsearch.node.NodeClosedException;
5454
import org.elasticsearch.tasks.Task;
5555
import org.elasticsearch.threadpool.ThreadPool;
5656
import org.elasticsearch.transport.ConnectTransportException;
5757
import org.elasticsearch.transport.EmptyTransportResponseHandler;
58-
import org.elasticsearch.transport.NodeDisconnectedException;
5958
import org.elasticsearch.transport.RemoteTransportException;
6059
import org.elasticsearch.transport.TransportChannel;
6160
import org.elasticsearch.transport.TransportException;
6261
import org.elasticsearch.transport.TransportRequest;
62+
import org.elasticsearch.transport.TransportRequestDeduplicator;
6363
import org.elasticsearch.transport.TransportRequestHandler;
6464
import org.elasticsearch.transport.TransportResponse;
6565
import org.elasticsearch.transport.TransportService;
@@ -71,7 +71,6 @@
7171
import java.util.Locale;
7272
import java.util.Objects;
7373
import java.util.Set;
74-
import java.util.concurrent.ConcurrentMap;
7574
import java.util.function.Predicate;
7675

7776
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
@@ -89,7 +88,7 @@ public class ShardStateAction {
8988

9089
// a list of shards that failed during replication
9190
// we keep track of these shards in order to avoid sending duplicate failed shard requests for a single failing shard.
92-
private final ConcurrentMap<FailedShardEntry, CompositeListener> remoteFailedShardsCache = ConcurrentCollections.newConcurrentMap();
91+
private final TransportRequestDeduplicator<FailedShardEntry> remoteFailedShardsDeduplicator = new TransportRequestDeduplicator<>();
9392

9493
@Inject
9594
public ShardStateAction(ClusterService clusterService, TransportService transportService,
@@ -106,7 +105,7 @@ public ShardStateAction(ClusterService clusterService, TransportService transpor
106105
}
107106

108107
private void sendShardAction(final String actionName, final ClusterState currentState,
109-
final TransportRequest request, final Listener listener) {
108+
final TransportRequest request, final ActionListener<Void> listener) {
110109
ClusterStateObserver observer =
111110
new ClusterStateObserver(currentState, clusterService, null, logger, threadPool.getThreadContext());
112111
DiscoveryNode masterNode = currentState.nodes().getMasterNode();
@@ -120,7 +119,7 @@ private void sendShardAction(final String actionName, final ClusterState current
120119
actionName, request, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
121120
@Override
122121
public void handleResponse(TransportResponse.Empty response) {
123-
listener.onSuccess();
122+
listener.onResponse(null);
124123
}
125124

126125
@Override
@@ -163,60 +162,39 @@ private static boolean isMasterChannelException(TransportException exp) {
163162
* @param listener callback upon completion of the request
164163
*/
165164
public void remoteShardFailed(final ShardId shardId, String allocationId, long primaryTerm, boolean markAsStale, final String message,
166-
@Nullable final Exception failure, Listener listener) {
165+
@Nullable final Exception failure, ActionListener<Void> listener) {
167166
assert primaryTerm > 0L : "primary term should be strictly positive";
168-
final FailedShardEntry shardEntry = new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale);
169-
final CompositeListener compositeListener = new CompositeListener(listener);
170-
final CompositeListener existingListener = remoteFailedShardsCache.putIfAbsent(shardEntry, compositeListener);
171-
if (existingListener == null) {
172-
sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), shardEntry, new Listener() {
173-
@Override
174-
public void onSuccess() {
175-
try {
176-
compositeListener.onSuccess();
177-
} finally {
178-
remoteFailedShardsCache.remove(shardEntry);
179-
}
180-
}
181-
@Override
182-
public void onFailure(Exception e) {
183-
try {
184-
compositeListener.onFailure(e);
185-
} finally {
186-
remoteFailedShardsCache.remove(shardEntry);
187-
}
188-
}
189-
});
190-
} else {
191-
existingListener.addListener(listener);
192-
}
167+
remoteFailedShardsDeduplicator.executeOnce(
168+
new FailedShardEntry(shardId, allocationId, primaryTerm, message, failure, markAsStale), listener,
169+
(req, reqListener) -> sendShardAction(SHARD_FAILED_ACTION_NAME, clusterService.state(), req, reqListener));
193170
}
194171

195172
int remoteShardFailedCacheSize() {
196-
return remoteFailedShardsCache.size();
173+
return remoteFailedShardsDeduplicator.size();
197174
}
198175

199176
/**
200177
* Send a shard failed request to the master node to update the cluster state when a shard on the local node failed.
201178
*/
202179
public void localShardFailed(final ShardRouting shardRouting, final String message,
203-
@Nullable final Exception failure, Listener listener) {
180+
@Nullable final Exception failure, ActionListener<Void> listener) {
204181
localShardFailed(shardRouting, message, failure, listener, clusterService.state());
205182
}
206183

207184
/**
208185
* Send a shard failed request to the master node to update the cluster state when a shard on the local node failed.
209186
*/
210187
public void localShardFailed(final ShardRouting shardRouting, final String message, @Nullable final Exception failure,
211-
Listener listener, final ClusterState currentState) {
188+
ActionListener<Void> listener, final ClusterState currentState) {
212189
FailedShardEntry shardEntry = new FailedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(),
213190
0L, message, failure, true);
214191
sendShardAction(SHARD_FAILED_ACTION_NAME, currentState, shardEntry, listener);
215192
}
216193

217194
// visible for testing
218195
protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer,
219-
TransportRequest request, Listener listener, Predicate<ClusterState> changePredicate) {
196+
TransportRequest request, ActionListener<Void> listener,
197+
Predicate<ClusterState> changePredicate) {
220198
observer.waitForNextChange(new ClusterStateObserver.Listener() {
221199
@Override
222200
public void onNewClusterState(ClusterState state) {
@@ -497,14 +475,14 @@ public int hashCode() {
497475
public void shardStarted(final ShardRouting shardRouting,
498476
final long primaryTerm,
499477
final String message,
500-
final Listener listener) {
478+
final ActionListener<Void> listener) {
501479
shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state());
502480
}
503481

504482
public void shardStarted(final ShardRouting shardRouting,
505483
final long primaryTerm,
506484
final String message,
507-
final Listener listener,
485+
final ActionListener<Void> listener,
508486
final ClusterState currentState) {
509487
StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message);
510488
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
@@ -670,97 +648,6 @@ public String toString() {
670648
}
671649
}
672650

673-
public interface Listener {
674-
675-
default void onSuccess() {
676-
}
677-
678-
/**
679-
* Notification for non-channel exceptions that are not handled
680-
* by {@link ShardStateAction}.
681-
*
682-
* The exceptions that are handled by {@link ShardStateAction}
683-
* are:
684-
* - {@link NotMasterException}
685-
* - {@link NodeDisconnectedException}
686-
* - {@link FailedToCommitClusterStateException}
687-
*
688-
* Any other exception is communicated to the requester via
689-
* this notification.
690-
*
691-
* @param e the unexpected cause of the failure on the master
692-
*/
693-
default void onFailure(final Exception e) {
694-
}
695-
696-
}
697-
698-
/**
699-
* A composite listener that allows registering multiple listeners dynamically.
700-
*/
701-
static final class CompositeListener implements Listener {
702-
private boolean isNotified = false;
703-
private Exception failure = null;
704-
private final List<Listener> listeners = new ArrayList<>();
705-
706-
CompositeListener(Listener listener) {
707-
listeners.add(listener);
708-
}
709-
710-
void addListener(Listener listener) {
711-
final boolean ready;
712-
synchronized (this) {
713-
ready = this.isNotified;
714-
if (ready == false) {
715-
listeners.add(listener);
716-
}
717-
}
718-
if (ready) {
719-
if (failure != null) {
720-
listener.onFailure(failure);
721-
} else {
722-
listener.onSuccess();
723-
}
724-
}
725-
}
726-
727-
private void onCompleted(Exception failure) {
728-
synchronized (this) {
729-
this.failure = failure;
730-
this.isNotified = true;
731-
}
732-
RuntimeException firstException = null;
733-
for (Listener listener : listeners) {
734-
try {
735-
if (failure != null) {
736-
listener.onFailure(failure);
737-
} else {
738-
listener.onSuccess();
739-
}
740-
} catch (RuntimeException innerEx) {
741-
if (firstException == null) {
742-
firstException = innerEx;
743-
} else {
744-
firstException.addSuppressed(innerEx);
745-
}
746-
}
747-
}
748-
if (firstException != null) {
749-
throw firstException;
750-
}
751-
}
752-
753-
@Override
754-
public void onSuccess() {
755-
onCompleted(null);
756-
}
757-
758-
@Override
759-
public void onFailure(Exception failure) {
760-
onCompleted(failure);
761-
}
762-
}
763-
764651
public static class NoLongerPrimaryShardException extends ElasticsearchException {
765652

766653
public NoLongerPrimaryShardException(ShardId shardId, String msg) {

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
109109
private final ShardStateAction shardStateAction;
110110
private final NodeMappingRefreshAction nodeMappingRefreshAction;
111111

112-
private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {
113-
};
112+
private static final ActionListener<Void> SHARD_STATE_ACTION_LISTENER = ActionListener.wrap(() -> {});
114113

115114
private final Settings settings;
116115
// a list of shards that failed during recovery
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.ConcurrentMap;
28+
import java.util.function.BiConsumer;
29+
30+
/**
31+
* Deduplicator for {@link TransportRequest}s that keeps track of {@link TransportRequest}s that should
32+
* not be sent in parallel.
33+
* @param <T> Transport Request Class
34+
*/
35+
public final class TransportRequestDeduplicator<T extends TransportRequest> {
36+
37+
private final ConcurrentMap<T, CompositeListener> requests = ConcurrentCollections.newConcurrentMap();
38+
39+
/**
40+
* Ensures a given request not executed multiple times when another equal request is already in-flight.
41+
* If the request is not yet known to the deduplicator it will invoke the passed callback with an {@link ActionListener}
42+
* that must be completed by the caller when the request completes. Once that listener is completed the request will be removed from
43+
* the deduplicator's internal state. If the request is already known to the deduplicator it will keep
44+
* track of the given listener and invoke it when the listener passed to the callback on first invocation is completed.
45+
* @param request Request to deduplicate
46+
* @param listener Listener to invoke on request completion
47+
* @param callback Callback to be invoked with request and completion listener the first time the request is added to the deduplicator
48+
*/
49+
public void executeOnce(T request, ActionListener<Void> listener, BiConsumer<T, ActionListener<Void>> callback) {
50+
ActionListener<Void> completionListener = requests.computeIfAbsent(request, CompositeListener::new).addListener(listener);
51+
if (completionListener != null) {
52+
callback.accept(request, completionListener);
53+
}
54+
}
55+
56+
public int size() {
57+
return requests.size();
58+
}
59+
60+
private final class CompositeListener implements ActionListener<Void> {
61+
62+
private final List<ActionListener<Void>> listeners = new ArrayList<>();
63+
64+
private final T request;
65+
66+
private boolean isNotified;
67+
private Exception failure;
68+
69+
CompositeListener(T request) {
70+
this.request = request;
71+
}
72+
73+
CompositeListener addListener(ActionListener<Void> listener) {
74+
synchronized (this) {
75+
if (this.isNotified == false) {
76+
listeners.add(listener);
77+
return listeners.size() == 1 ? this : null;
78+
}
79+
}
80+
if (failure != null) {
81+
listener.onFailure(failure);
82+
} else {
83+
listener.onResponse(null);
84+
}
85+
return null;
86+
}
87+
88+
private void onCompleted(Exception failure) {
89+
synchronized (this) {
90+
this.failure = failure;
91+
this.isNotified = true;
92+
}
93+
try {
94+
if (failure == null) {
95+
ActionListener.onResponse(listeners, null);
96+
} else {
97+
ActionListener.onFailure(listeners, failure);
98+
}
99+
} finally {
100+
requests.remove(request);
101+
}
102+
}
103+
104+
@Override
105+
public void onResponse(final Void aVoid) {
106+
onCompleted(null);
107+
}
108+
109+
@Override
110+
public void onFailure(Exception failure) {
111+
onCompleted(failure);
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)