diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index c86e80e289b7d..ff5cdd4e31e0c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -281,16 +281,16 @@ assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master b Map tasks = getPendingAsTasks(); final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)"; - tasks.put(BECOME_MASTER_TASK, joinProcessedListener); + tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result + tasks.put(FINISH_ELECTION_TASK, electionFinishedListener); clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); } public synchronized void closeAndProcessPending(String reason) { innerClose(); Map tasks = getPendingAsTasks(); - final String source = "zen-disco-process-pending-joins [" + reason + "]"; - - tasks.put(FINISH_ELECTION_NOT_MASTER_TASK, joinProcessedListener); + final String source = "zen-disco-election-stop [" + reason + "]"; + tasks.put(FINISH_ELECTION_TASK, electionFinishedListener); clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); } @@ -327,12 +327,15 @@ private void onFailure(Throwable t) { } } - private final ClusterStateTaskListener joinProcessedListener = new ClusterStateTaskListener() { + private final ClusterStateTaskListener electionFinishedListener = new ClusterStateTaskListener() { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - assert newState.nodes().isLocalNodeElectedMaster() : "should have become a master but isn't " + newState.prettyPrint(); - onElectedAsMaster(newState); + if (newState.nodes().isLocalNodeElectedMaster()) { + ElectionContext.this.onElectedAsMaster(newState); + } else { + onFailure(source, new NotMasterException("election stopped [" + source + "]")); + } } @Override @@ -379,7 +382,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } - // a task indicated that the current node should become master, if no current master is known + /** + * a task indicated that the current node should become master, if no current master is known + */ private static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", LocalTransportAddress.buildUnique(), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { @Override @@ -388,9 +393,11 @@ public String toString() { } }; - // a task that is used to process pending joins without explicitly becoming a master and listening to the results - // this task is used when election is stop without the local node becoming a master per se (though it might - private static final DiscoveryNode FINISH_ELECTION_NOT_MASTER_TASK = new DiscoveryNode("_NOT_MASTER_TASK_", + /** + * a task that is used to signal the election is stopped and we should process pending joins. + * it may be use in combination with {@link #BECOME_MASTER_TASK} + */ + private static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_", LocalTransportAddress.buildUnique(), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) { @Override public String toString() { @@ -402,31 +409,35 @@ class JoinTaskExecutor implements ClusterStateTaskExecutor { @Override public BatchResult execute(ClusterState currentState, List joiningNodes) throws Exception { - final DiscoveryNodes currentNodes = currentState.nodes(); final BatchResult.Builder results = BatchResult.builder(); + + final DiscoveryNodes currentNodes = currentState.nodes(); boolean nodesChanged = false; ClusterState.Builder newState = ClusterState.builder(currentState); DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes); - if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) { + if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) { + return results.successes(joiningNodes).build(currentState); + } else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) { + assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes; // use these joins to try and become the master. // Note that we don't have to do any validation of the amount of joining nodes - the commit // during the cluster state publishing guarantees that we have enough - nodesBuilder.masterNodeId(currentNodes.getLocalNodeId()); ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()) .removeGlobalBlock(discoverySettings.getNoMasterBlock()).build(); newState.blocks(clusterBlocks); nodesChanged = true; - } - - if (nodesBuilder.isLocalNodeElectedMaster() == false) { + } else if (nodesBuilder.isLocalNodeElectedMaster() == false) { logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode()); throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request"); } + assert nodesBuilder.isLocalNodeElectedMaster(); + + // processing any joins for (final DiscoveryNode node : joiningNodes) { - if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_NOT_MASTER_TASK)) { + if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) { // noop } else if (currentNodes.nodeExists(node)) { logger.debug("received a join request for an existing node [{}]", node);