From 5588309c324a5975b57f48d46537eeb3416bf7b0 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 21 Jan 2019 14:04:18 +0100 Subject: [PATCH 1/5] Streamline skip_unavailable handling so it is available when calling other API through RemoteClusterAwareClient --- .../action/LatchedActionListener.java | 2 +- .../shards/ClusterSearchShardsResponse.java | 4 - .../action/search/TransportSearchAction.java | 98 +++++-- .../transport/CCSActionListener.java | 28 ++ .../transport/RemoteClusterAwareClient.java | 39 ++- .../transport/RemoteClusterConnection.java | 59 +--- .../transport/RemoteClusterService.java | 66 +---- .../search/TransportSearchActionTests.java | 210 ++++++++++--- .../RemoteClusterAwareClientTests.java | 276 ++++++++++++++++++ .../RemoteClusterConnectionTests.java | 203 +------------ .../transport/RemoteClusterServiceTests.java | 171 ----------- 11 files changed, 593 insertions(+), 563 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/CCSActionListener.java create mode 100644 server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java diff --git a/server/src/main/java/org/elasticsearch/action/LatchedActionListener.java b/server/src/main/java/org/elasticsearch/action/LatchedActionListener.java index e5e0af9307211..9050134199a56 100644 --- a/server/src/main/java/org/elasticsearch/action/LatchedActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/LatchedActionListener.java @@ -28,7 +28,7 @@ public class LatchedActionListener implements ActionListener { private final ActionListener delegate; - private final CountDownLatch latch; + protected final CountDownLatch latch; public LatchedActionListener(ActionListener delegate, CountDownLatch latch) { this.delegate = delegate; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index 57407bd61fb82..c8889c86c1df7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -29,15 +29,11 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.Map; public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject { - public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], - new DiscoveryNode[0], Collections.emptyMap()); - private final ClusterSearchShardsGroup[] groups; private final DiscoveryNode[] nodes; private final Map indicesAndFilters; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 3f03c521df52a..43031d55f59ac 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -22,10 +22,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -39,6 +41,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.shard.ShardId; @@ -48,8 +51,10 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.CCSActionListener; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; @@ -60,8 +65,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; @@ -195,17 +203,22 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(), (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY); } else { - remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), - searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { - List remoteShardIterators = new ArrayList<>(); - Map remoteAliasFilters = new HashMap<>(); - BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, - remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses); - executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, - remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, - clusters); - }, listener::onFailure)); + AtomicInteger skippedClusters = new AtomicInteger(0); + collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters, + remoteClusterIndices, remoteClusterService, threadPool, + ActionListener.wrap( + searchShardsResponses -> { + List remoteShardIterators = new ArrayList<>(); + Map remoteAliasFilters = new HashMap<>(); + BiFunction clusterNodeLookup = processRemoteShards( + searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); + int total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1); + int successful = searchShardsResponses.size(); + executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, + remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, + new SearchResponse.Clusters(total, successful, skippedClusters.get())); + }, + listener::onFailure)); } }, listener::onFailure); if (searchRequest.source() == null) { @@ -216,18 +229,59 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< } } - static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map remoteIndices, - Map searchShardsResponses) { - int localClusters = localIndices == null ? 0 : 1; - int totalClusters = remoteIndices.size() + localClusters; - int successfulClusters = localClusters; - for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) { - if (searchShardsResponse != ClusterSearchShardsResponse.EMPTY) { - successfulClusters++; - } + static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters, + Map remoteIndicesByCluster, RemoteClusterService remoteClusterService, + ThreadPool threadPool, ActionListener> listener) { + final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); + final Map searchShardsResponses = new ConcurrentHashMap<>(); + final AtomicReference transportException = new AtomicReference<>(); + for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { + final String clusterAlias = entry.getKey(); + Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); + final String[] indices = entry.getValue().indices(); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) + .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); + clusterClient.admin().cluster().searchShards(searchShardsRequest, new CCSActionListener() { + @Override + public void onSkippedFailure(Exception e) { + skippedClusters.incrementAndGet(); + maybeFinish(); + } + + @Override + public void onResponse(ClusterSearchShardsResponse response) { + searchShardsResponses.put(clusterAlias, response); + maybeFinish(); + } + + private void maybeFinish() { + if (responsesCountDown.countDown()) { + RemoteTransportException exception = transportException.get(); + if (exception == null) { + listener.onResponse(searchShardsResponses); + } else { + listener.onFailure(transportException.get()); + } + } + } + + @Override + public void onFailure(Exception e) { + RemoteTransportException exception = + new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); + if (transportException.compareAndSet(null, exception) == false) { + exception = transportException.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + if (responsesCountDown.countDown()) { + listener.onFailure(exception); + } + } + } + ); } - int skippedClusters = totalClusters - successfulClusters; - return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); } static BiFunction processRemoteShards(Map searchShardsResponses, diff --git a/server/src/main/java/org/elasticsearch/transport/CCSActionListener.java b/server/src/main/java/org/elasticsearch/transport/CCSActionListener.java new file mode 100644 index 0000000000000..d2d60d54cd5fd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/CCSActionListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.action.ActionListener; + +public interface CCSActionListener extends ActionListener { + + void onSkippedFailure(Exception e); + +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 2ca42ff85abdf..935a349fc3e8e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -29,6 +29,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; +import java.util.function.Consumer; + final class RemoteClusterAwareClient extends AbstractClient { private final TransportService service; @@ -45,18 +47,31 @@ final class RemoteClusterAwareClient extends AbstractClient { @Override protected void doExecute(Action action, Request request, ActionListener listener) { - remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { - Transport.Connection connection; - if (request instanceof RemoteClusterAwareRequest) { - DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); - connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); - } else { - connection = remoteClusterService.getConnection(clusterAlias); - } - service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, action.getResponseReader())); - }, - listener::onFailure)); + RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias); + final ActionListener responseListener; + final Consumer onConnectFailure; + if (listener instanceof CCSActionListener && remoteClusterConnection.isSkipUnavailable()) { + CCSActionListener skipUnavailableActionListener = (CCSActionListener) listener; + onConnectFailure = skipUnavailableActionListener::onSkippedFailure; + responseListener = ActionListener.wrap(listener::onResponse, skipUnavailableActionListener::onSkippedFailure); + } else { + onConnectFailure = listener::onFailure; + responseListener = listener; + } + // in case we have no connected nodes we try to connect and if we fail we either notify the listener + // or not depending on the skip_unavailable setting and whether a CCSActionListener was provided + remoteClusterConnection.ensureConnected(ActionListener.wrap(res -> { + Transport.Connection connection; + if (request instanceof RemoteClusterAwareRequest) { + DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); + connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); + } else { + connection = remoteClusterService.getConnection(clusterAlias); + } + service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(responseListener, action.getResponseReader())); + }, + onConnectFailure)); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 7ea55925262ff..175be0903b795 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -25,9 +25,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -62,7 +59,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -172,6 +168,10 @@ void updateSkipUnavailable(boolean skipUnavailable) { this.skipUnavailable = skipUnavailable; } + boolean isSkipUnavailable() { + return skipUnavailable; + } + @Override public void onNodeDisconnected(DiscoveryNode node) { boolean remove = connectedNodes.remove(node); @@ -181,31 +181,11 @@ public void onNodeDisconnected(DiscoveryNode node) { } } - /** - * Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end. - */ - public void fetchSearchShards(ClusterSearchShardsRequest searchRequest, - ActionListener listener) { - - final ActionListener searchShardsListener; - final Consumer onConnectFailure; - if (skipUnavailable) { - onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY); - searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY)); - } else { - onConnectFailure = listener::onFailure; - searchShardsListener = listener; - } - // in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on - // the skip_unavailable setting - ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure)); - } - /** * Ensures that this cluster is connected. If the cluster is connected this operation * will invoke the listener immediately. */ - public void ensureConnected(ActionListener voidActionListener) { + void ensureConnected(ActionListener voidActionListener) { if (connectedNodes.size() == 0) { connectHandler.connect(voidActionListener); } else { @@ -213,35 +193,6 @@ public void ensureConnected(ActionListener voidActionListener) { } } - private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, - final ActionListener listener) { - final DiscoveryNode node = getAnyConnectedNode(); - Transport.Connection connection = connectionManager.getConnection(node); - transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY, - new TransportResponseHandler() { - - @Override - public ClusterSearchShardsResponse read(StreamInput in) throws IOException { - return new ClusterSearchShardsResponse(in); - } - - @Override - public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { - listener.onResponse(clusterSearchShardsResponse); - } - - @Override - public void handleException(TransportException e) { - listener.onFailure(e); - } - - @Override - public String executor() { - return ThreadPool.Names.SEARCH; - } - }); - } - /** * Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function * that returns null if the node ID is not found. diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index d9fcb01df4ce8..fcc028efd5e70 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -24,8 +24,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; @@ -50,10 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -287,7 +283,7 @@ public Map groupIndices(IndicesOptions indicesOptions, String clusterAlias = entry.getKey(); List originalIndices = entry.getValue(); originalIndicesMap.put(clusterAlias, - new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions)); + new OriginalIndices(originalIndices.toArray(new String[0]), indicesOptions)); } } } else { @@ -311,55 +307,6 @@ public Set getRegisteredRemoteClusterNames() { return remoteClusters.keySet(); } - public void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, - Map remoteIndicesByCluster, - ActionListener> listener) { - final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); - final Map searchShardsResponses = new ConcurrentHashMap<>(); - final AtomicReference transportException = new AtomicReference<>(); - for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { - final String clusterName = entry.getKey(); - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName); - if (remoteClusterConnection == null) { - throw new IllegalArgumentException("no such remote cluster: " + clusterName); - } - final String[] indices = entry.getValue().indices(); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) - .indicesOptions(indicesOptions).local(true).preference(preference) - .routing(routing); - remoteClusterConnection.fetchSearchShards(searchShardsRequest, - new ActionListener() { - @Override - public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { - searchShardsResponses.put(clusterName, clusterSearchShardsResponse); - if (responsesCountDown.countDown()) { - RemoteTransportException exception = transportException.get(); - if (exception == null) { - listener.onResponse(searchShardsResponses); - } else { - listener.onFailure(transportException.get()); - } - } - } - - @Override - public void onFailure(Exception e) { - RemoteTransportException exception = - new RemoteTransportException("error while communicating with remote cluster [" + clusterName + "]", e); - if (transportException.compareAndSet(null, exception) == false) { - exception = transportException.accumulateAndGet(exception, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - if (responsesCountDown.countDown()) { - listener.onFailure(exception); - } - } - }); - } - } - /** * Returns a connection to the given node on the given remote cluster * @throws IllegalArgumentException if the remote cluster is unknown @@ -368,14 +315,6 @@ public Transport.Connection getConnection(DiscoveryNode node, String cluster) { return getRemoteClusterConnection(cluster).getConnection(node); } - /** - * Ensures that the given cluster alias is connected. If the cluster is connected this operation - * will invoke the listener immediately. - */ - void ensureConnected(String clusterAlias, ActionListener listener) { - getRemoteClusterConnection(clusterAlias).ensureConnected(listener); - } - public Transport.Connection getConnection(String cluster) { return getRemoteClusterConnection(cluster).getConnection(); } @@ -399,7 +338,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) { clusterSettings.addAffixUpdateConsumer(SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (alias, value) -> {}); } - synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { + private synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); if (remote != null) { remote.updateSkipUnavailable(skipUnavailable); @@ -510,5 +449,4 @@ public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) Collection getConnections() { return remoteClusters.values(); } - } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 16ff4389d7c4a..850e8d9a442bd 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; @@ -38,13 +39,19 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.NodeDisconnectedException; +import org.elasticsearch.transport.RemoteClusterAwareClientTests; +import org.elasticsearch.transport.RemoteClusterConnectionTests; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; @@ -53,13 +60,22 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.startsWith; public class TransportSearchActionTests extends ESTestCase { @@ -304,41 +320,169 @@ public void close() { } } - public void testBuildClusters() { - OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices(); - Map remoteIndices = new HashMap<>(); - Map searchShardsResponses = new HashMap<>(); - int numRemoteClusters = randomIntBetween(0, 10); - boolean onlySuccessful = randomBoolean(); - int localClusters = localIndices == null ? 0 : 1; - int total = numRemoteClusters + localClusters; - int successful = localClusters; - int skipped = 0; - for (int i = 0; i < numRemoteClusters; i++) { - String cluster = randomAlphaOfLengthBetween(5, 10); - remoteIndices.put(cluster, randomOriginalIndices()); - if (onlySuccessful || randomBoolean()) { - //whatever response counts as successful as long as it's not the empty placeholder - searchShardsResponses.put(cluster, new ClusterSearchShardsResponse(null, null, null)); - successful++; - } else { - searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY); - skipped++; - } - } - SearchResponse.Clusters clusters = TransportSearchAction.buildClusters(localIndices, remoteIndices, searchShardsResponses); - assertEquals(total, clusters.getTotal()); - assertEquals(successful, clusters.getSuccessful()); - assertEquals(skipped, clusters.getSkipped()); + private MockTransportService startTransport(String id, List knownNodes) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool); } - private static OriginalIndices randomOriginalIndices() { - int numLocalIndices = randomIntBetween(0, 5); - String[] localIndices = new String[numLocalIndices]; - for (int i = 0; i < numLocalIndices; i++) { - localIndices[i] = randomAlphaOfLengthBetween(3, 10); + public void testCollectSearchShards() throws Exception { + int numClusters = randomIntBetween(2, 10); + MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + for (int i = 0; i < numClusters; i++) { + List knownNodes = new CopyOnWriteArrayList<>(); + MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes); + mockTransportServices[i] = remoteSeedTransport; + DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); + knownNodes.add(remoteSeedNode); + nodes[i] = remoteSeedNode; + builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); + remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); + } + Settings settings = builder.build(); + + try { + try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + RemoteClusterService remoteClusterService = service.getRemoteClusterService(); + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); + assertEquals(1, shardsResponse.getNodes().length); + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference failure = new AtomicReference<>(); + AtomicInteger skippedClusters = new AtomicInteger(0); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + } + + int numDisconnectedClusters = randomIntBetween(1, numClusters); + Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); + Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); + while (disconnectedNodes.size() < numDisconnectedClusters) { + int i = randomIntBetween(0, numClusters - 1); + if (disconnectedNodes.add(nodes[i])) { + assertTrue(disconnectedNodesIndices.add(i)); + } + } + + CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); + RemoteClusterAwareClientTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); + } + } + }); + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference failure = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(r -> fail("no response expected"), failure::set), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); + } + + //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again + for (int i : disconnectedNodesIndices) { + RemoteClusterAwareClientTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters - disconnectedNodesIndices.size(), map.size()); + assertEquals(skippedClusters.get(), disconnectedNodesIndices.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + if (disconnectedNodesIndices.contains(i)) { + assertFalse(map.containsKey(clusterAlias)); + } else { + assertNotNull(map.get(clusterAlias)); + } + } + } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); + + service.clearAllRules(); + if (randomBoolean()) { + for (int i : disconnectedNodesIndices) { + if (randomBoolean()) { + RemoteClusterAwareClientTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + } + + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicInteger skippedClusters = new AtomicInteger(0); + AtomicReference> response = new AtomicReference<>(); + TransportSearchAction.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, skippedClusters, + remoteIndicesByCluster, remoteClusterService, threadPool, + new LatchedActionListener<>(ActionListener.wrap(response::set, e -> fail("no failures expected")), latch)); + awaitLatch(latch, 5, TimeUnit.SECONDS); + assertEquals(0, skippedClusters.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + assertNotNull(map.get(clusterAlias)); + } + } + assertEquals(0, service.getConnectionManager().size()); + } + } finally { + for (MockTransportService mockTransportService : mockTransportServices) { + mockTransportService.close(); + } } - return new OriginalIndices(localIndices, IndicesOptions.fromOptions(randomBoolean(), - randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java new file mode 100644 index 0000000000000..f5722d446acae --- /dev/null +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java @@ -0,0 +1,276 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.instanceOf; + +public class RemoteClusterAwareClientTests extends ESTestCase { + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, List knownNodes) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, Version.CURRENT, threadPool); + } + + public void testSearchShards() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) { + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) { + SearchRequest request = new SearchRequest("test-index"); + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, e -> fail("no failures expected")), + e -> fail("no skipped failures expected"), responseLatch)); + responseLatch.await(); + assertNotNull(reference.get()); + ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); + assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); + } + } + } + } + + public void testSearchShardsThreadContextHeader() { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes); + MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes)) { + knownNodes.add(seedTransport.getLocalDiscoNode()); + knownNodes.add(discoverableTransport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) { + SearchRequest request = new SearchRequest("test-index"); + int numThreads = 10; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + final String threadId = Integer.toString(i); + executorService.submit(() -> { + ThreadContext threadContext = seedTransport.threadPool.getThreadContext(); + threadContext.putHeader("threadId", threadId); + AtomicReference reference = new AtomicReference<>(); + final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + CountDownLatch responseLatch = new CountDownLatch(1); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedCCSActionListener<>(ActionListener.wrap( + resp -> { + reference.set(resp); + assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId")); + }, + e -> fail("no failures expected")), e -> fail("no skipped failures expected"), responseLatch)); + try { + responseLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertNotNull(reference.get()); + ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); + assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); + }); + } + ThreadPool.terminate(executorService, 5, TimeUnit.SECONDS); + } + } + } + } + + public static void addConnectionListener(RemoteClusterService service, String clusterAlias, TransportConnectionListener listener) { + RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias); + ConnectionManager connectionManager = connection.getConnectionManager(); + connectionManager.addListener(listener); + } + + public static void addConnectionListener(RemoteClusterService service, TransportConnectionListener listener) { + for (RemoteClusterConnection connection : service.getConnections()) { + ConnectionManager connectionManager = connection.getConnectionManager(); + connectionManager.addListener(listener); + } + } + + public static void updateSkipUnavailable(RemoteClusterService service, String clusterAlias, boolean skipUnavailable) { + RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias); + connection.updateSkipUnavailable(skipUnavailable); + } + + public void testSearchShardsSkipUnavailable() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedNode); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) { + + SearchRequest request = new SearchRequest("test-index"); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + { + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, + e -> fail("no failures expected")), e -> fail("no skipped failures expected"), responseLatch)); + assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); + assertNotNull(reference.get()); + assertEquals(knownNodes, Arrays.asList(reference.get().getNodes())); + } + + CountDownLatch disconnectedLatch = new CountDownLatch(1); + addConnectionListener(service.getRemoteClusterService(), "cluster1", new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (node.equals(seedNode)) { + disconnectedLatch.countDown(); + } + } + }); + + service.addFailToSendNoConnectRule(seedTransport); + + if (randomBoolean()) { + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false); + } + { + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + AtomicReference failReference = new AtomicReference<>(); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, failReference::set), + e -> fail("no skipped failures expected"), responseLatch)); + assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); + assertNotNull(failReference.get()); + assertThat(failReference.get(), instanceOf(TransportException.class)); + assertNull(reference.get()); + } + + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", true); + { + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + AtomicReference skippedFailureReference = new AtomicReference<>(); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, + e -> fail("no failures expected")), skippedFailureReference::set, responseLatch)); + assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); + assertNull(reference.get()); + assertNotNull(skippedFailureReference.get()); + assertThat(skippedFailureReference.get(), instanceOf(TransportException.class)); + } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(10, TimeUnit.SECONDS)); + + if (randomBoolean()) { + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false); + } + + service.clearAllRules(); + //check that we reconnect once the node is back up + { + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + client.admin().cluster().searchShards(searchShardsRequest, + new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, + e -> fail("no failures expected")), e -> fail("no skipped failures expected"), responseLatch)); + assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); + assertNotNull(reference.get()); + assertEquals(knownNodes, Arrays.asList(reference.get().getNodes())); + } + } + } + } + } + + private static class LatchedCCSActionListener extends LatchedActionListener + implements CCSActionListener { + + private final Consumer skippedFailureConsumer; + + LatchedCCSActionListener(ActionListener delegate, Consumer consumer, CountDownLatch latch) { + super(delegate, latch); + this.skippedFailureConsumer = consumer; + } + + @Override + public void onSkippedFailure(Exception e) { + try { + skippedFailureConsumer.accept(e); + } finally { + latch.countDown(); + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 02e701ed4bc86..308d330d54f61 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -43,7 +42,6 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.core.internal.io.IOUtils; @@ -558,7 +556,7 @@ public void run() { } } - private List>> seedNodes(final DiscoveryNode... seedNodes) { + private static List>> seedNodes(final DiscoveryNode... seedNodes) { if (seedNodes.length == 0) { return Collections.emptyList(); } else if (seedNodes.length == 1) { @@ -570,205 +568,6 @@ private List>> seedNodes(final DiscoveryNo } } - public void testFetchShards() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(discoverableTransport.getLocalDiscoNode()); - Collections.shuffle(knownNodes, random()); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - final List>> seedNodes = seedNodes(seedNode); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { - if (randomBoolean()) { - updateSeedNodes(connection, seedNodes); - } - if (randomBoolean()) { - connection.updateSkipUnavailable(randomBoolean()); - } - SearchRequest request = new SearchRequest("test-index"); - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - responseLatch.await(); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); - assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); - assertTrue(connection.assertNoRunningConnections()); - } - } - } - } - - public void testFetchShardsThreadContextHeader() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedTransport.getLocalDiscoNode()); - knownNodes.add(discoverableTransport.getLocalDiscoNode()); - Collections.shuffle(knownNodes, random()); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - final List>> seedNodes = seedNodes(seedNode); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes, service, Integer.MAX_VALUE, n -> true, null)) { - SearchRequest request = new SearchRequest("test-index"); - Thread[] threads = new Thread[10]; - for (int i = 0; i < threads.length; i++) { - final String threadId = Integer.toString(i); - threads[i] = new Thread(() -> { - ThreadContext threadContext = seedTransport.threadPool.getThreadContext(); - threadContext.putHeader("threadId", threadId); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - final ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - CountDownLatch responseLatch = new CountDownLatch(1); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap( - resp -> { - reference.set(resp); - assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId")); - }, - failReference::set), responseLatch)); - try { - responseLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); - assertEquals(knownNodes, Arrays.asList(clusterSearchShardsResponse.getNodes())); - }); - } - for (int i = 0; i < threads.length; i++) { - threads[i].start(); - } - - for (int i = 0; i < threads.length; i++) { - threads[i].join(); - } - assertTrue(connection.assertNoRunningConnections()); - } - } - } - } - - public void testFetchShardsSkipUnavailable() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedNode); - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - seedNodes(seedNode), service, Integer.MAX_VALUE, n -> true, null)) { - ConnectionManager connectionManager = connection.getConnectionManager(); - - SearchRequest request = new SearchRequest("test-index"); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response != ClusterSearchShardsResponse.EMPTY); - assertEquals(knownNodes, Arrays.asList(response.getNodes())); - } - - CountDownLatch disconnectedLatch = new CountDownLatch(1); - connectionManager.addListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (node.equals(seedNode)) { - disconnectedLatch.countDown(); - } - } - }); - - service.addFailToSendNoConnectRule(seedTransport); - - if (randomBoolean()) { - connection.updateSkipUnavailable(false); - } - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap((s) -> { - reference.set(s); - }, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNotNull(failReference.get()); - assertNull(reference.get()); - assertThat(failReference.get(), instanceOf(TransportException.class)); - } - - connection.updateSkipUnavailable(true); - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response == ClusterSearchShardsResponse.EMPTY); - } - - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(10, TimeUnit.SECONDS)); - - if (randomBoolean()) { - connection.updateSkipUnavailable(false); - } - - service.clearAllRules(); - //check that we reconnect once the node is back up - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - connection.fetchSearchShards(searchShardsRequest, - new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(failReference.get()); - assertNotNull(reference.get()); - ClusterSearchShardsResponse response = reference.get(); - assertTrue(response != ClusterSearchShardsResponse.EMPTY); - assertEquals(knownNodes, Arrays.asList(response.getNodes())); - } - } - } - } - } - public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index d5671eec21961..e10cd1fe608c1 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -20,9 +20,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; @@ -33,7 +31,6 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; @@ -60,9 +57,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -711,172 +706,6 @@ public void onFailure(Exception e) { } } - public void testCollectSearchShards() throws Exception { - int numClusters = randomIntBetween(2, 10); - MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; - DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; - Map remoteIndicesByCluster = new HashMap<>(); - Settings.Builder builder = Settings.builder(); - for (int i = 0; i < numClusters; i++) { - List knownNodes = new CopyOnWriteArrayList<>(); - MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes, Version.CURRENT); - mockTransportServices[i] = remoteSeedTransport; - DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); - knownNodes.add(remoteSeedNode); - nodes[i] = remoteSeedNode; - builder.put("cluster.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); - remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); - } - Settings settings = builder.build(); - - try { - try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - try (RemoteClusterService remoteClusterService = new RemoteClusterService(settings, service)) { - assertFalse(remoteClusterService.isCrossClusterSearchEnabled()); - remoteClusterService.initializeRemoteClusters(); - assertTrue(remoteClusterService.isCrossClusterSearchEnabled()); - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - assertEquals(1, shardsResponse.getNodes().length); - } - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", - null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(response.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); - assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); - } - int numDisconnectedClusters = randomIntBetween(1, numClusters); - Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); - Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); - while(disconnectedNodes.size() < numDisconnectedClusters) { - int i = randomIntBetween(0, numClusters - 1); - if (disconnectedNodes.add(nodes[i])) { - assertTrue(disconnectedNodesIndices.add(i)); - } - } - - CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - for (RemoteClusterConnection connection : remoteClusterService.getConnections()) { - connection.getConnectionManager().addListener(new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (disconnectedNodes.remove(node)) { - disconnectedLatch.countDown(); - } - } - }); - } - - for (DiscoveryNode disconnectedNode : disconnectedNodes) { - service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); - } - - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(response.get()); - assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(RemoteTransportException.class)); - assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); - assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); - } - - //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again - for (int i : disconnectedNodesIndices) { - remoteClusterService.updateSkipUnavailable("remote" + i, true); - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - if (disconnectedNodesIndices.contains(i)) { - assertTrue(shardsResponse == ClusterSearchShardsResponse.EMPTY); - } else { - assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY); - } - } - } - - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(5, TimeUnit.SECONDS)); - - service.clearAllRules(); - if (randomBoolean()) { - for (int i : disconnectedNodesIndices) { - if (randomBoolean()) { - remoteClusterService.updateSkipUnavailable("remote" + i, true); - } - - } - } - { - final CountDownLatch latch = new CountDownLatch(1); - AtomicReference> response = new AtomicReference<>(); - AtomicReference failure = new AtomicReference<>(); - remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, - new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); - awaitLatch(latch, 5, TimeUnit.SECONDS); - assertNull(failure.get()); - assertNotNull(response.get()); - Map map = response.get(); - assertEquals(numClusters, map.size()); - for (int i = 0; i < numClusters; i++) { - String clusterAlias = "remote" + i; - assertTrue(map.containsKey(clusterAlias)); - ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); - assertNotSame(ClusterSearchShardsResponse.EMPTY, shardsResponse); - } - } - assertEquals(0, service.getConnectionManager().size()); - } - } - } finally { - for (MockTransportService mockTransportService : mockTransportServices) { - mockTransportService.close(); - } - } - } - public void testRemoteClusterSkipIfDisconnectedSetting() { { Settings settings = Settings.builder() From 543c9aa01788ceb74a2787d041a7c9f966391c48 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 22 Jan 2019 10:39:51 +0100 Subject: [PATCH 2/5] fix successful clusters counting --- .../elasticsearch/action/search/TransportSearchAction.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 43031d55f59ac..a4c1c79f04e30 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -212,11 +212,12 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< Map remoteAliasFilters = new HashMap<>(); BiFunction clusterNodeLookup = processRemoteShards( searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - int total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1); - int successful = searchShardsResponses.size(); + int localClusters = localIndices == null ? 0 : 1; + int totalClusters = remoteClusterIndices.size() + localClusters; + int successfulClusters = searchShardsResponses.size() + localClusters; executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, - new SearchResponse.Clusters(total, successful, skippedClusters.get())); + new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); }, listener::onFailure)); } From f64574fc82d37cbebbc97202747f01817e36da83 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 22 Jan 2019 14:50:26 +0100 Subject: [PATCH 3/5] iter --- .../action/search/TransportSearchAction.java | 38 +++-- .../transport/CCSActionListener.java | 28 ---- .../transport/RemoteClusterAwareClient.java | 19 +-- .../transport/RemoteClusterConnection.java | 3 + .../transport/RemoteClusterService.java | 15 ++ .../search/TransportSearchActionTests.java | 8 +- .../RemoteClusterAwareClientTests.java | 142 +----------------- .../transport/RemoteClusterServiceTests.java | 38 ++++- 8 files changed, 82 insertions(+), 209 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/transport/CCSActionListener.java diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index a4c1c79f04e30..30e030eca7376 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -51,7 +51,6 @@ import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.CCSActionListener; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.RemoteTransportException; @@ -238,20 +237,32 @@ static void collectSearchShards(IndicesOptions indicesOptions, String preference final AtomicReference transportException = new AtomicReference<>(); for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { final String clusterAlias = entry.getKey(); + boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); final String[] indices = entry.getValue().indices(); ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices) .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing); - clusterClient.admin().cluster().searchShards(searchShardsRequest, new CCSActionListener() { + clusterClient.admin().cluster().searchShards(searchShardsRequest, new ActionListener() { @Override - public void onSkippedFailure(Exception e) { - skippedClusters.incrementAndGet(); + public void onResponse(ClusterSearchShardsResponse response) { + searchShardsResponses.put(clusterAlias, response); maybeFinish(); } @Override - public void onResponse(ClusterSearchShardsResponse response) { - searchShardsResponses.put(clusterAlias, response); + public void onFailure(Exception e) { + if (skipUnavailable) { + skippedClusters.incrementAndGet(); + } else { + RemoteTransportException exception = + new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); + if (transportException.compareAndSet(null, exception) == false) { + transportException.accumulateAndGet(exception, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + } maybeFinish(); } @@ -265,21 +276,6 @@ private void maybeFinish() { } } } - - @Override - public void onFailure(Exception e) { - RemoteTransportException exception = - new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e); - if (transportException.compareAndSet(null, exception) == false) { - exception = transportException.accumulateAndGet(exception, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - if (responsesCountDown.countDown()) { - listener.onFailure(exception); - } - } } ); } diff --git a/server/src/main/java/org/elasticsearch/transport/CCSActionListener.java b/server/src/main/java/org/elasticsearch/transport/CCSActionListener.java deleted file mode 100644 index d2d60d54cd5fd..0000000000000 --- a/server/src/main/java/org/elasticsearch/transport/CCSActionListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport; - -import org.elasticsearch.action.ActionListener; - -public interface CCSActionListener extends ActionListener { - - void onSkippedFailure(Exception e); - -} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 935a349fc3e8e..2156a93b128af 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -29,8 +29,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import java.util.function.Consumer; - final class RemoteClusterAwareClient extends AbstractClient { private final TransportService service; @@ -47,20 +45,9 @@ final class RemoteClusterAwareClient extends AbstractClient { @Override protected void doExecute(Action action, Request request, ActionListener listener) { - RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias); - final ActionListener responseListener; - final Consumer onConnectFailure; - if (listener instanceof CCSActionListener && remoteClusterConnection.isSkipUnavailable()) { - CCSActionListener skipUnavailableActionListener = (CCSActionListener) listener; - onConnectFailure = skipUnavailableActionListener::onSkippedFailure; - responseListener = ActionListener.wrap(listener::onResponse, skipUnavailableActionListener::onSkippedFailure); - } else { - onConnectFailure = listener::onFailure; - responseListener = listener; - } // in case we have no connected nodes we try to connect and if we fail we either notify the listener // or not depending on the skip_unavailable setting and whether a CCSActionListener was provided - remoteClusterConnection.ensureConnected(ActionListener.wrap(res -> { + remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { Transport.Connection connection; if (request instanceof RemoteClusterAwareRequest) { DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); @@ -69,9 +56,9 @@ void doExecute(Action action, Request request, ActionListener(responseListener, action.getResponseReader())); + new ActionListenerResponseHandler<>(listener, action.getResponseReader())); }, - onConnectFailure)); + listener::onFailure)); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 175be0903b795..d7e3de92e4028 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -168,6 +168,9 @@ void updateSkipUnavailable(boolean skipUnavailable) { this.skipUnavailable = skipUnavailable; } + /** + * Returns whether this cluster is configured to be skipped when unavailable + */ boolean isSkipUnavailable() { return skipUnavailable; } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index fcc028efd5e70..7d19b2eebcb1d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -315,6 +315,21 @@ public Transport.Connection getConnection(DiscoveryNode node, String cluster) { return getRemoteClusterConnection(cluster).getConnection(node); } + /** + * Ensures that the given cluster alias is connected. If the cluster is connected this operation + * will invoke the listener immediately. + */ + void ensureConnected(String clusterAlias, ActionListener listener) { + getRemoteClusterConnection(clusterAlias).ensureConnected(listener); + } + + /** + * Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable + */ + public boolean isSkipUnavailable(String clusterAlias) { + return getRemoteClusterConnection(clusterAlias).isSkipUnavailable(); + } + public Transport.Connection getConnection(String cluster) { return getRemoteClusterConnection(cluster).getConnection(); } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 850e8d9a442bd..1b99beee65e81 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -46,9 +46,9 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; -import org.elasticsearch.transport.RemoteClusterAwareClientTests; import org.elasticsearch.transport.RemoteClusterConnectionTests; import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteClusterServiceTests; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; @@ -392,7 +392,7 @@ public void testCollectSearchShards() throws Exception { } CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - RemoteClusterAwareClientTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { + RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { @Override public void onNodeDisconnected(DiscoveryNode node) { if (disconnectedNodes.remove(node)) { @@ -421,7 +421,7 @@ public void onNodeDisconnected(DiscoveryNode node) { //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again for (int i : disconnectedNodesIndices) { - RemoteClusterAwareClientTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); } { @@ -454,7 +454,7 @@ public void onNodeDisconnected(DiscoveryNode node) { if (randomBoolean()) { for (int i : disconnectedNodesIndices) { if (randomBoolean()) { - RemoteClusterAwareClientTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); + RemoteClusterServiceTests.updateSkipUnavailable(remoteClusterService, "remote" + i, true); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java index f5722d446acae..1a6eaff9e5a2e 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterAwareClientTests.java @@ -42,9 +42,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; - -import static org.hamcrest.Matchers.instanceOf; public class RemoteClusterAwareClientTests extends ESTestCase { @@ -81,8 +78,7 @@ public void testSearchShards() throws Exception { .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) .routing(request.routing()); client.admin().cluster().searchShards(searchShardsRequest, - new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, e -> fail("no failures expected")), - e -> fail("no skipped failures expected"), responseLatch)); + new LatchedActionListener<>(ActionListener.wrap(reference::set, e -> fail("no failures expected")), responseLatch)); responseLatch.await(); assertNotNull(reference.get()); ClusterSearchShardsResponse clusterSearchShardsResponse = reference.get(); @@ -120,12 +116,12 @@ public void testSearchShardsThreadContextHeader() { .routing(request.routing()); CountDownLatch responseLatch = new CountDownLatch(1); client.admin().cluster().searchShards(searchShardsRequest, - new LatchedCCSActionListener<>(ActionListener.wrap( + new LatchedActionListener<>(ActionListener.wrap( resp -> { reference.set(resp); assertEquals(threadId, seedTransport.threadPool.getThreadContext().getHeader("threadId")); }, - e -> fail("no failures expected")), e -> fail("no skipped failures expected"), responseLatch)); + e -> fail("no failures expected")), responseLatch)); try { responseLatch.await(); } catch (InterruptedException e) { @@ -141,136 +137,4 @@ public void testSearchShardsThreadContextHeader() { } } } - - public static void addConnectionListener(RemoteClusterService service, String clusterAlias, TransportConnectionListener listener) { - RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias); - ConnectionManager connectionManager = connection.getConnectionManager(); - connectionManager.addListener(listener); - } - - public static void addConnectionListener(RemoteClusterService service, TransportConnectionListener listener) { - for (RemoteClusterConnection connection : service.getConnections()) { - ConnectionManager connectionManager = connection.getConnectionManager(); - connectionManager.addListener(listener); - } - } - - public static void updateSkipUnavailable(RemoteClusterService service, String clusterAlias, boolean skipUnavailable) { - RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias); - connection.updateSkipUnavailable(skipUnavailable); - } - - public void testSearchShardsSkipUnavailable() throws Exception { - List knownNodes = new CopyOnWriteArrayList<>(); - try (MockTransportService seedTransport = startTransport("seed_node", knownNodes)) { - DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); - knownNodes.add(seedNode); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); - try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { - service.start(); - service.acceptIncomingRequests(); - try (RemoteClusterAwareClient client = new RemoteClusterAwareClient(Settings.EMPTY, threadPool, service, "cluster1")) { - - SearchRequest request = new SearchRequest("test-index"); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") - .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) - .routing(request.routing()); - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - client.admin().cluster().searchShards(searchShardsRequest, - new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, - e -> fail("no failures expected")), e -> fail("no skipped failures expected"), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNotNull(reference.get()); - assertEquals(knownNodes, Arrays.asList(reference.get().getNodes())); - } - - CountDownLatch disconnectedLatch = new CountDownLatch(1); - addConnectionListener(service.getRemoteClusterService(), "cluster1", new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node) { - if (node.equals(seedNode)) { - disconnectedLatch.countDown(); - } - } - }); - - service.addFailToSendNoConnectRule(seedTransport); - - if (randomBoolean()) { - updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false); - } - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference failReference = new AtomicReference<>(); - client.admin().cluster().searchShards(searchShardsRequest, - new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, failReference::set), - e -> fail("no skipped failures expected"), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNotNull(failReference.get()); - assertThat(failReference.get(), instanceOf(TransportException.class)); - assertNull(reference.get()); - } - - updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", true); - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - AtomicReference skippedFailureReference = new AtomicReference<>(); - client.admin().cluster().searchShards(searchShardsRequest, - new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, - e -> fail("no failures expected")), skippedFailureReference::set, responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNull(reference.get()); - assertNotNull(skippedFailureReference.get()); - assertThat(skippedFailureReference.get(), instanceOf(TransportException.class)); - } - - //give transport service enough time to realize that the node is down, and to notify the connection listeners - //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next - assertTrue(disconnectedLatch.await(10, TimeUnit.SECONDS)); - - if (randomBoolean()) { - updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false); - } - - service.clearAllRules(); - //check that we reconnect once the node is back up - { - CountDownLatch responseLatch = new CountDownLatch(1); - AtomicReference reference = new AtomicReference<>(); - client.admin().cluster().searchShards(searchShardsRequest, - new LatchedCCSActionListener<>(ActionListener.wrap(reference::set, - e -> fail("no failures expected")), e -> fail("no skipped failures expected"), responseLatch)); - assertTrue(responseLatch.await(10, TimeUnit.SECONDS)); - assertNotNull(reference.get()); - assertEquals(knownNodes, Arrays.asList(reference.get().getNodes())); - } - } - } - } - } - - private static class LatchedCCSActionListener extends LatchedActionListener - implements CCSActionListener { - - private final Consumer skippedFailureConsumer; - - LatchedCCSActionListener(ActionListener delegate, Consumer consumer, CountDownLatch latch) { - super(delegate, latch); - this.skippedFailureConsumer = consumer; - } - - @Override - public void onSkippedFailure(Exception e) { - try { - skippedFailureConsumer.accept(e); - } finally { - latch.countDown(); - } - } - } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index e10cd1fe608c1..60f3ece86bcbe 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -908,7 +908,7 @@ public void testRemoteClusterWithProxy() throws Exception { } } - private void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) + private static void updateRemoteCluster(RemoteClusterService service, String clusterAlias, List addresses, String proxyAddress) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference exceptionAtomicReference = new AtomicReference<>(); @@ -922,4 +922,40 @@ private void updateRemoteCluster(RemoteClusterService service, String clusterAli throw exceptionAtomicReference.get(); } } + + public static void updateSkipUnavailable(RemoteClusterService service, String clusterAlias, boolean skipUnavailable) { + RemoteClusterConnection connection = service.getRemoteClusterConnection(clusterAlias); + connection.updateSkipUnavailable(skipUnavailable); + } + + public static void addConnectionListener(RemoteClusterService service, TransportConnectionListener listener) { + for (RemoteClusterConnection connection : service.getConnections()) { + ConnectionManager connectionManager = connection.getConnectionManager(); + connectionManager.addListener(listener); + } + } + + public void testSkipUnavailable() { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedNode); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalDiscoNode().getAddress().toString()); + try (MockTransportService service = MockTransportService.createNewService(builder.build(), Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + + assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + + if (randomBoolean()) { + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false); + assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + } + + updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", true); + assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + } + } + } } From 87b40216d91c345ae6fc1637ea93884f9eea7bc2 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 22 Jan 2019 14:51:22 +0100 Subject: [PATCH 4/5] revert --- .../java/org/elasticsearch/action/LatchedActionListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/LatchedActionListener.java b/server/src/main/java/org/elasticsearch/action/LatchedActionListener.java index 9050134199a56..e5e0af9307211 100644 --- a/server/src/main/java/org/elasticsearch/action/LatchedActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/LatchedActionListener.java @@ -28,7 +28,7 @@ public class LatchedActionListener implements ActionListener { private final ActionListener delegate; - protected final CountDownLatch latch; + private final CountDownLatch latch; public LatchedActionListener(ActionListener delegate, CountDownLatch latch) { this.delegate = delegate; From 73d5e0c3bdcc215751c2a43cd6b987189f456d22 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 22 Jan 2019 14:53:25 +0100 Subject: [PATCH 5/5] revert --- .../transport/RemoteClusterAwareClient.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 2156a93b128af..2ca42ff85abdf 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -45,20 +45,18 @@ final class RemoteClusterAwareClient extends AbstractClient { @Override protected void doExecute(Action action, Request request, ActionListener listener) { - // in case we have no connected nodes we try to connect and if we fail we either notify the listener - // or not depending on the skip_unavailable setting and whether a CCSActionListener was provided remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { - Transport.Connection connection; - if (request instanceof RemoteClusterAwareRequest) { - DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); - connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); - } else { - connection = remoteClusterService.getConnection(clusterAlias); - } - service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, action.getResponseReader())); - }, - listener::onFailure)); + Transport.Connection connection; + if (request instanceof RemoteClusterAwareRequest) { + DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); + connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); + } else { + connection = remoteClusterService.getConnection(clusterAlias); + } + service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, action.getResponseReader())); + }, + listener::onFailure)); } @Override