Skip to content

Commit

Permalink
Cross Cluster Search: do not use dedicated masters as gateways
Browse files Browse the repository at this point in the history
When we are connecting to a remote cluster we should never select
dedicated master nodes as gateway nodes, or we will end up loading them
with requests that should rather go to other type of nodes e.g. data
nodes or coord_only nodes.

This commit adds the selection based on the node role, to the existing
selection based on version and potential node attributes.

Closes elastic#30687
  • Loading branch information
javanna committed May 29, 2018
1 parent eaee530 commit c47686b
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.client.Client;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
Expand All @@ -36,6 +35,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
Expand Down Expand Up @@ -97,6 +97,9 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
Setting.affixKeySetting("search.remote.", "skip_unavailable",
key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);

private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());

private final TransportService transportService;
private final int numRemoteConnections;
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();
Expand All @@ -121,13 +124,6 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
connectionListener.onResponse(null);
} else {
CountDown countDown = new CountDown(seeds.size());
Predicate<DiscoveryNode> nodePredicate = (node) -> Version.CURRENT.isCompatible(node.getVersion());
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for
// cross cluster search
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
nodePredicate = nodePredicate.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
}
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
Expand All @@ -143,7 +139,7 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>

if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), entry.getValue(), transportService, numRemoteConnections,
nodePredicate);
getNodePredicate(settings));
remoteClusters.put(entry.getKey(), remote);
}

Expand All @@ -168,6 +164,15 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
}

static Predicate<DiscoveryNode> getNodePredicate(Settings settings) {
if (REMOTE_NODE_ATTRIBUTE.exists(settings)) {
// nodes can be tagged with node.attr.remote_gateway: true to allow a node to be a gateway node for cross cluster search
String attribute = REMOTE_NODE_ATTRIBUTE.get(settings);
return DEFAULT_NODE_PREDICATE.and((node) -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
}
return DEFAULT_NODE_PREDICATE;
}

/**
* Returns <code>true</code> if at least one remote cluster is configured
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
Expand All @@ -30,7 +29,9 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -40,6 +41,7 @@
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -50,6 +52,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -279,6 +282,75 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException {
}
}

public void testRemoteNodeRoles() throws IOException, InterruptedException {
final Settings settings = Settings.EMPTY;
final List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
final Settings data = Settings.builder().put("node.master", false).build();
final Settings dedicatedMaster = Settings.builder().put("node.data", false).put("node.ingest", "false").build();
try (MockTransportService c1N1 =
startTransport("cluster_1_node_1", knownNodes, Version.CURRENT, dedicatedMaster);
MockTransportService c1N2 =
startTransport("cluster_1_node_2", knownNodes, Version.CURRENT, data);
MockTransportService c2N1 =
startTransport("cluster_2_node_1", knownNodes, Version.CURRENT, dedicatedMaster);
MockTransportService c2N2 =
startTransport("cluster_2_node_2", knownNodes, Version.CURRENT, data)) {
final DiscoveryNode c1N1Node = c1N1.getLocalDiscoNode();
final DiscoveryNode c1N2Node = c1N2.getLocalDiscoNode();
final DiscoveryNode c2N1Node = c2N1.getLocalDiscoNode();
final DiscoveryNode c2N2Node = c2N2.getLocalDiscoNode();
knownNodes.add(c1N1Node);
knownNodes.add(c1N2Node);
knownNodes.add(c2N1Node);
knownNodes.add(c2N2Node);
Collections.shuffle(knownNodes, random());

try (MockTransportService transportService = MockTransportService.createNewService(
settings,
Version.CURRENT,
threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
final Settings.Builder builder = Settings.builder();
builder.putList("search.remote.cluster_1.seeds", c1N1Node.getAddress().toString());
builder.putList("search.remote.cluster_2.seeds", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());

final InetSocketAddress c1N1Address = c1N1Node.getAddress().address();
final InetSocketAddress c1N2Address = c1N2Node.getAddress().address();
final InetSocketAddress c2N1Address = c2N1Node.getAddress().address();
final InetSocketAddress c2N2Address = c2N2Node.getAddress().address();

final CountDownLatch firstLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_1",
Arrays.asList(c1N1Address, c1N2Address),
connectionListener(firstLatch));
firstLatch.await();

final CountDownLatch secondLatch = new CountDownLatch(1);
service.updateRemoteCluster(
"cluster_2",
Arrays.asList(c2N1Address, c2N2Address),
connectionListener(secondLatch));
secondLatch.await();

assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node));
assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node));
assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node));
}
}
}
}

private ActionListener<Void> connectionListener(final CountDownLatch latch) {
return ActionListener.wrap(x -> latch.countDown(), x -> fail());
}
Expand Down Expand Up @@ -630,4 +702,115 @@ public void testRemoteClusterSkipIfDisconnectedSetting() {
}
}
}

public void testGetNodePredicateNodeRoles() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY);
{
DiscoveryNode all = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class)), Version.CURRENT);
assertTrue(nodePredicate.test(all));
}
{
DiscoveryNode dataMaster = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.MASTER)), Version.CURRENT);
assertTrue(nodePredicate.test(dataMaster));
}
{
DiscoveryNode dedicatedMaster = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER)), Version.CURRENT);
assertFalse(nodePredicate.test(dedicatedMaster));
}
{
DiscoveryNode dedicatedIngest = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST)), Version.CURRENT);
assertTrue(nodePredicate.test(dedicatedIngest));
}
{
DiscoveryNode masterIngest = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.INGEST, DiscoveryNode.Role.MASTER)), Version.CURRENT);
assertTrue(nodePredicate.test(masterIngest));
}
{
DiscoveryNode dedicatedData = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA)), Version.CURRENT);
assertTrue(nodePredicate.test(dedicatedData));
}
{
DiscoveryNode ingestData = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.of(DiscoveryNode.Role.DATA, DiscoveryNode.Role.INGEST)), Version.CURRENT);
assertTrue(nodePredicate.test(ingestData));
}
{
DiscoveryNode coordOnly = new DiscoveryNode("id", address, Collections.emptyMap(),
new HashSet<>(EnumSet.noneOf(DiscoveryNode.Role.class)), Version.CURRENT);
assertTrue(nodePredicate.test(coordOnly));
}
}

public void testGetNodePredicateNodeVersion() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(Settings.EMPTY);
Version version = VersionUtils.randomVersion(random());
DiscoveryNode node = new DiscoveryNode("id", address, Collections.emptyMap(), roles, version);
assertThat(nodePredicate.test(node), equalTo(Version.CURRENT.isCompatible(version)));
}

public void testGetNodePredicateNodeAttrs() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Set<DiscoveryNode.Role> roles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
{
DiscoveryNode nonGatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
roles, Version.CURRENT);
assertFalse(nodePredicate.test(nonGatewayNode));
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(nonGatewayNode));
}
{
DiscoveryNode gatewayNode = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
roles, Version.CURRENT);
assertTrue(nodePredicate.test(gatewayNode));
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(gatewayNode));
}
{
DiscoveryNode noAttrNode = new DiscoveryNode("id", address, Collections.emptyMap(), roles, Version.CURRENT);
assertFalse(nodePredicate.test(noAttrNode));
assertTrue(RemoteClusterService.getNodePredicate(Settings.EMPTY).test(noAttrNode));
}
}

public void testGetNodePredicatesCombination() {
TransportAddress address = new TransportAddress(TransportAddress.META_ADDRESS, 0);
Settings settings = Settings.builder().put("search.remote.node.attr", "gateway").build();
Predicate<DiscoveryNode> nodePredicate = RemoteClusterService.getNodePredicate(settings);
Set<DiscoveryNode.Role> allRoles = new HashSet<>(EnumSet.allOf(DiscoveryNode.Role.class));
Set<DiscoveryNode.Role> dedicatedMasterRoles = new HashSet<>(EnumSet.of(DiscoveryNode.Role.MASTER));
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
dedicatedMasterRoles, Version.CURRENT);
assertFalse(nodePredicate.test(node));
}
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
dedicatedMasterRoles, Version.CURRENT);
assertFalse(nodePredicate.test(node));
}
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "false"),
dedicatedMasterRoles, Version.CURRENT);
assertFalse(nodePredicate.test(node));
}
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
allRoles, Version.CURRENT);
assertTrue(nodePredicate.test(node));
}
{
DiscoveryNode node = new DiscoveryNode("id", address, Collections.singletonMap("gateway", "true"),
allRoles, Version.V_5_3_0);
assertFalse(nodePredicate.test(node));
}
}
}

0 comments on commit c47686b

Please sign in to comment.