Skip to content

Commit 2c98dd7

Browse files
committed
Handle publish requests without attached joins
1 parent e11e929 commit 2c98dd7

File tree

3 files changed

+28
-4
lines changed

3 files changed

+28
-4
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,17 @@ protected void onJoin(Join join) {
887887
if (join.getTerm() == getCurrentTerm()) {
888888
handleJoin(join);
889889
}
890-
// TODO: what to do on missing join?
890+
}
891+
892+
@Override
893+
protected void onMissingJoin(DiscoveryNode discoveryNode) {
894+
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
895+
// The remote node did not include a join vote in its publish response. We do not persist joins, so it could be that the remote
896+
// node voted for us and then rebooted, or it could be that it voted for a different node in this term. If we don't have a copy
897+
// of a join from this node then we assume the latter and bump our term to obtain a vote from this node.
898+
if (hasJoinVoteFrom(discoveryNode) == false) {
899+
updateMaxTermSeen(publishRequest.getAcceptedState().term() + 1);
900+
}
891901
}
892902

893903
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ private void onPossibleCommitFailure() {
172172

173173
protected abstract void onJoin(Join join);
174174

175+
protected abstract void onMissingJoin(DiscoveryNode discoveryNode);
176+
175177
protected abstract void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
176178
ActionListener<PublishWithJoinResponse> responseActionListener);
177179

@@ -301,10 +303,14 @@ public void onResponse(PublishWithJoinResponse response) {
301303
return;
302304
}
303305

304-
response.getJoin().ifPresent(join -> {
306+
if (response.getJoin().isPresent()) {
307+
final Join join = response.getJoin().get();
305308
assert discoveryNode.equals(join.getSourceNode());
309+
assert join.getTerm() == response.getPublishResponse().getTerm() : response;
306310
onJoin(join);
307-
});
311+
} else {
312+
onMissingJoin(discoveryNode);
313+
}
308314

309315
assert state == PublicationTargetState.SENT_PUBLISH_REQUEST : state + " -> " + PublicationTargetState.WAITING_FOR_QUORUM;
310316
state = PublicationTargetState.WAITING_FOR_QUORUM;

server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ abstract class MockPublication extends Publication {
101101
Map<DiscoveryNode, ActionListener<PublishWithJoinResponse>> pendingPublications = new HashMap<>();
102102
Map<DiscoveryNode, ActionListener<TransportResponse.Empty>> pendingCommits = new HashMap<>();
103103
Map<DiscoveryNode, Join> joins = new HashMap<>();
104+
Set<DiscoveryNode> missingJoins = new HashSet<>();
104105

105106
MockPublication(Settings settings, PublishRequest publishRequest, Discovery.AckListener ackListener,
106107
LongSupplier currentTimeSupplier) {
@@ -120,6 +121,11 @@ protected void onJoin(Join join) {
120121
assertNull(joins.put(join.getSourceNode(), join));
121122
}
122123

124+
@Override
125+
protected void onMissingJoin(DiscoveryNode discoveryNode) {
126+
assertTrue(missingJoins.add(discoveryNode));
127+
}
128+
123129
@Override
124130
protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
125131
ActionListener<PublishWithJoinResponse> responseActionListener) {
@@ -182,14 +188,16 @@ public void testSimpleClusterStatePublishing() throws InterruptedException {
182188
assertNotEquals(processedNode1PublishResponse.get(), publication.pendingCommits.isEmpty());
183189
assertFalse(publication.joins.containsKey(e.getKey()));
184190
PublishWithJoinResponse publishWithJoinResponse = new PublishWithJoinResponse(publishResponse,
185-
randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), randomNonNegativeLong(),
191+
randomBoolean() ? Optional.empty() : Optional.of(new Join(e.getKey(), randomFrom(n1, n2, n3), publishResponse.getTerm(),
186192
randomNonNegativeLong(), randomNonNegativeLong())));
187193
e.getValue().onResponse(publishWithJoinResponse);
188194
if (publishWithJoinResponse.getJoin().isPresent()) {
189195
assertTrue(publication.joins.containsKey(e.getKey()));
196+
assertFalse(publication.missingJoins.contains(e.getKey()));
190197
assertEquals(publishWithJoinResponse.getJoin().get(), publication.joins.get(e.getKey()));
191198
} else {
192199
assertFalse(publication.joins.containsKey(e.getKey()));
200+
assertTrue(publication.missingJoins.contains(e.getKey()));
193201
}
194202
if (e.getKey().equals(n1)) {
195203
processedNode1PublishResponse.set(true);

0 commit comments

Comments
 (0)