Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,16 +281,16 @@ assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master b
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";

tasks.put(BECOME_MASTER_TASK, joinProcessedListener);
tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}

public synchronized void closeAndProcessPending(String reason) {
innerClose();
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-process-pending-joins [" + reason + "]";

tasks.put(FINISH_ELECTION_NOT_MASTER_TASK, joinProcessedListener);
final String source = "zen-disco-election-stop [" + reason + "]";
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}

Expand Down Expand Up @@ -327,12 +327,15 @@ private void onFailure(Throwable t) {
}
}

private final ClusterStateTaskListener joinProcessedListener = new ClusterStateTaskListener() {
private final ClusterStateTaskListener electionFinishedListener = new ClusterStateTaskListener() {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
assert newState.nodes().isLocalNodeElectedMaster() : "should have become a master but isn't " + newState.prettyPrint();
onElectedAsMaster(newState);
if (newState.nodes().isLocalNodeElectedMaster()) {
ElectionContext.this.onElectedAsMaster(newState);
} else {
onFailure(source, new NotMasterException("election stopped [" + source + "]"));
}
}

@Override
Expand Down Expand Up @@ -379,7 +382,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}

// a task indicated that the current node should become master, if no current master is known
/**
* a task indicated that the current node should become master, if no current master is known
*/
private static final DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", LocalTransportAddress.buildUnique(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
@Override
Expand All @@ -388,9 +393,11 @@ public String toString() {
}
};

// a task that is used to process pending joins without explicitly becoming a master and listening to the results
// this task is used when election is stop without the local node becoming a master per se (though it might
private static final DiscoveryNode FINISH_ELECTION_NOT_MASTER_TASK = new DiscoveryNode("_NOT_MASTER_TASK_",
/**
* a task that is used to signal the election is stopped and we should process pending joins.
* it may be use in combination with {@link #BECOME_MASTER_TASK}
*/
private static final DiscoveryNode FINISH_ELECTION_TASK = new DiscoveryNode("_FINISH_ELECTION_",
LocalTransportAddress.buildUnique(), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT) {
@Override
public String toString() {
Expand All @@ -402,31 +409,35 @@ class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {

@Override
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
final DiscoveryNodes currentNodes = currentState.nodes();
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();

final DiscoveryNodes currentNodes = currentState.nodes();
boolean nodesChanged = false;
ClusterState.Builder newState = ClusterState.builder(currentState);
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);

if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {
return results.successes(joiningNodes).build(currentState);
} else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes;
// use these joins to try and become the master.
// Note that we don't have to do any validation of the amount of joining nodes - the commit
// during the cluster state publishing guarantees that we have enough

nodesBuilder.masterNodeId(currentNodes.getLocalNodeId());
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
newState.blocks(clusterBlocks);
nodesChanged = true;
}

if (nodesBuilder.isLocalNodeElectedMaster() == false) {
} else if (nodesBuilder.isLocalNodeElectedMaster() == false) {
logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
}

assert nodesBuilder.isLocalNodeElectedMaster();

// processing any joins
for (final DiscoveryNode node : joiningNodes) {
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_NOT_MASTER_TASK)) {
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
// noop
} else if (currentNodes.nodeExists(node)) {
logger.debug("received a join request for an existing node [{}]", node);
Expand Down