Skip to content

Commit

Permalink
Add CoordinatorTests for empty unicast hosts list (elastic#38209)
Browse files Browse the repository at this point in the history
Today we have DiscoveryDisruptionIT tests for checking that discovery can still
work once the cluster has formed, even if the cluster is misconfigured and only
has a single master-eligible node in its unicast hosts list. In fact with Zen2
we can go one better: we do not need any nodes in the unicast hosts list,
because nodes also use the contents of the last-committed cluster state for
discovery. Additionally, the DiscoveryDisruptionIT tests were failing due to
the overenthusiastic fault-detection timeouts.

This commit replaces these tests with deterministic `CoordinatorTests` that
verify the same behaviour. It also removes some duplication by extracting a
test method called `testFollowerCheckerAfterMasterReelection()`

Closes elastic#37687
  • Loading branch information
DaveCTurner authored Feb 2, 2019
1 parent 80d3092 commit c311062
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,10 @@ private void updateMaxTermSeen(final long term) {
// Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
// since we check whether a term bump is needed at the end of the publication too.
if (publicationInProgress()) {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump",
maxTermSeen, currentTerm);
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
} else {
try {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm);
ensureTermAtLeast(getLocalNode(), maxTermSeen);
startElection();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,9 +953,18 @@ public void onConnectionClosed(Transport.Connection connection) {
responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey()));
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
getExecutorService().execute(() -> {
for (Transport.ResponseContext holderToNotify : pruned) {
holderToNotify.handler().handleException(new NodeDisconnectedException(connection.getNode(), holderToNotify.action()));
getExecutorService().execute(new Runnable() {
@Override
public void run() {
for (Transport.ResponseContext holderToNotify : pruned) {
holderToNotify.handler().handleException(
new NodeDisconnectedException(connection.getNode(), holderToNotify.action()));
}
}

@Override
public String toString() {
return "onConnectionClosed(" + connection.getNode() + ")";
}
});
} catch (EsRejectedExecutionException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -98,6 +99,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
Expand Down Expand Up @@ -1046,7 +1048,7 @@ public void testCannotJoinClusterWithDifferentUUID() throws IllegalAccessExcepti
Loggers.removeAppender(joinLogger, mockAppender);
mockAppender.stop();
}
assertTrue(newNode.getLastAppliedClusterState().version() == 0);
assertEquals(0, newNode.getLastAppliedClusterState().version());

final ClusterNode detachedNode = newNode.restartedNode(
metaData -> DetachClusterCommand.updateMetaData(metaData),
Expand All @@ -1055,6 +1057,27 @@ public void testCannotJoinClusterWithDifferentUUID() throws IllegalAccessExcepti
cluster1.stabilise();
}

public void testDiscoveryUsesNodesFromLastClusterState() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();

final ClusterNode partitionedNode = cluster.getAnyNode();
if (randomBoolean()) {
logger.info("--> blackholing {}", partitionedNode);
partitionedNode.blackhole();
} else {
logger.info("--> disconnecting {}", partitionedNode);
partitionedNode.disconnect();
}
cluster.setEmptyUnicastHostsList();
cluster.stabilise();

partitionedNode.heal();
cluster.runRandomly(false);
cluster.stabilise();
}

private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
Expand Down Expand Up @@ -1094,6 +1117,8 @@ private static int defaultInt(Setting<Integer> setting) {
* defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
// then wait for a follower to be promoted to leader
+ DEFAULT_ELECTION_DELAY
// perhaps there is an election collision requiring another publication (which times out) and a term bump
+ defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
// then wait for the new leader to notice that the old leader is unresponsive
+ (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
Expand All @@ -1110,14 +1135,17 @@ class Cluster {
// TODO does ThreadPool need a node name any more?
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random());
private boolean disruptStorage;

private final VotingConfiguration initialConfiguration;

private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> blackholedNodes = new HashSet<>();
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();

private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier =
localNode -> new MockPersistedState(localNode);
private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier = MockPersistedState::new;

@Nullable // null means construct a list from all the current nodes
private List<TransportAddress> unicastHostsList;

Cluster(int initialNodeCount) {
this(initialNodeCount, true);
Expand Down Expand Up @@ -1177,6 +1205,10 @@ int size() {
}

void runRandomly() {
runRandomly(true);
}

void runRandomly(boolean allowReboots) {

// TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it
assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty());
Expand Down Expand Up @@ -1223,7 +1255,7 @@ void runRandomly() {
thisStep, autoShrinkVotingConfiguration, clusterNode.getId());
clusterNode.submitSetAutoShrinkVotingConfiguration(autoShrinkVotingConfiguration);
}).run();
} else if (rarely()) {
} else if (allowReboots && rarely()) {
// reboot random node
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] rebooting [{}]", thisStep, clusterNode.getId());
Expand Down Expand Up @@ -1504,6 +1536,10 @@ ClusterNode getAnyNodePreferringLeaders() {
return getAnyNode();
}

void setEmptyUnicastHostsList() {
unicastHostsList = emptyList();
}

class MockPersistedState implements PersistedState {
private final PersistedState delegate;
private final NodeEnvironment nodeEnvironment;
Expand Down Expand Up @@ -1678,13 +1714,15 @@ public void connectToNodes(DiscoveryNodes discoveryNodes) {
}

void close() {
logger.trace("taking down [{}]", localNode);
coordinator.stop();
clusterService.stop();
//transportService.stop(); // does blocking stuff :/
clusterService.close();
coordinator.close();
//transportService.close(); // does blocking stuff :/
onNode(() -> {
logger.trace("taking down [{}]", localNode);
coordinator.stop();
clusterService.stop();
//transportService.stop(); // does blocking stuff :/
clusterService.close();
coordinator.close();
//transportService.close(); // does blocking stuff :/
});
}

ClusterNode restartedNode() {
Expand Down Expand Up @@ -1866,7 +1904,8 @@ private boolean isNotUsefullyBootstrapped() {
}

private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
return clusterNodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getAddress).collect(Collectors.toList());
return unicastHostsList != null ? unicastHostsList
: clusterNodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getAddress).collect(Collectors.toList());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,15 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

Expand All @@ -58,77 +55,6 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {

public void testIsolatedUnicastNodes() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);

Set<String> unicastTargetSide = new HashSet<>();
unicastTargetSide.add(unicastTarget);

Set<String> restOfClusterSide = new HashSet<>();
restOfClusterSide.addAll(nodes);
restOfClusterSide.remove(unicastTarget);

// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
clearTemporalResponses();

// Simulate a network issue between the unicast target node and the rest of the cluster
NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(unicastTargetSide, restOfClusterSide),
new NetworkDisconnect());
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();
// Wait until elected master has removed that the unlucky node...
ensureStableCluster(3, nodes.get(1));

// The isolate master node must report no master, so it starts with pinging
assertNoMaster(unicastTarget);
networkDisconnect.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(4);
}

/**
* A 4 node cluster with m_m_n set to 3 and each node has one unicast endpoint. One node partitions from the master node.
* The temporal unicast responses is empty. When partition is solved the one ping response contains a master node.
* The rejoining node should take this master node and connect.
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37687")
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node={}", masterNode);
List<String> otherNodes = new ArrayList<>(nodes);
otherNodes.remove(masterNode);
otherNodes.remove(nodes.get(0)); // <-- Don't isolate the node that is in the unicast endpoint for all the other nodes.
final String isolatedNode = otherNodes.get(0);

// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
clearTemporalResponses();

// Simulate a network issue between the unlucky node and elected master node in both directions.
NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, isolatedNode),
new NetworkDisconnect());
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();
// Wait until elected master has removed that the unlucky node...
ensureStableCluster(3, masterNode);

// The isolate master node must report no master, so it starts with pinging
assertNoMaster(isolatedNode);
networkDisconnect.stopDisrupting();
// Wait until the master node sees all 4 nodes again.
ensureStableCluster(4);
// The elected master shouldn't have changed, since the isolated node never could have elected himself as
// master since m_m_n of 3 could never be satisfied.
assertMaster(masterNode, nodes);
}

/**
* Test cluster join with issues in cluster state publishing *
*/
Expand Down Expand Up @@ -187,7 +113,7 @@ public void testClusterJoinDespiteOfPublishingIssues() throws Exception {
internalCluster().stopRandomNonMasterNode();
}

public void testClusterFormingWithASlowNode() throws Exception {
public void testClusterFormingWithASlowNode() {

SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);

Expand Down
Loading

0 comments on commit c311062

Please sign in to comment.