@@ -89,22 +89,26 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) {
8989 logger .trace ("activating with {}" , lastAcceptedNodes );
9090
9191 synchronized (mutex ) {
92- assert active == false ;
92+ assert assertInactiveWithNoKnownPeers () ;
9393 active = true ;
9494 this .lastAcceptedNodes = lastAcceptedNodes ;
9595 leader = Optional .empty ();
96- handleWakeUp ();
96+ handleWakeUp (); // return value discarded: there are no known peers, so none can be disconnected
9797 }
9898 }
9999
100100 public void deactivate (DiscoveryNode leader ) {
101+ final boolean peersRemoved ;
101102 synchronized (mutex ) {
102103 logger .trace ("deactivating and setting leader to {}" , leader );
103104 active = false ;
104- handleWakeUp ();
105+ peersRemoved = handleWakeUp ();
105106 this .leader = Optional .of (leader );
106107 assert assertInactiveWithNoKnownPeers ();
107108 }
109+ if (peersRemoved ) {
110+ onFoundPeersUpdated ();
111+ }
108112 }
109113
110114 // exposed to subclasses for testing
@@ -114,7 +118,7 @@ protected final boolean holdsLock() {
114118
115119 boolean assertInactiveWithNoKnownPeers () {
116120 assert active == false ;
117- assert peersByAddress .isEmpty ();
121+ assert peersByAddress .isEmpty () : peersByAddress . keySet () ;
118122 return true ;
119123 }
120124
@@ -142,10 +146,20 @@ private DiscoveryNode getLocalNode() {
142146 }
143147
144148 /**
145- * Called on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
149+ * Invoked on receipt of a PeersResponse from a node that believes it's an active leader, which this node should therefore try and join.
150+ * Note that invocations of this method are not synchronised. By the time it is called we may have been deactivated.
146151 */
147152 protected abstract void onActiveMasterFound (DiscoveryNode masterNode , long term );
148153
154+ /**
155+ * Invoked when the set of found peers changes. Note that invocations of this method are not fully synchronised, so we only guarantee
156+ * that the change to the set of found peers happens before this method is invoked. If there are multiple concurrent changes then there
157+ * will be multiple concurrent invocations of this method, with no guarantee as to their order. For this reason we do not pass the
158+ * updated set of peers as an argument to this method, leaving it to the implementation to call getFoundPeers() with appropriate
159+ * synchronisation to avoid lost updates. Also, by the time this method is invoked we may have been deactivated.
160+ */
161+ protected abstract void onFoundPeersUpdated ();
162+
149163 public interface TransportAddressConnector {
150164 /**
151165 * Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it.
@@ -170,7 +184,6 @@ public Iterable<DiscoveryNode> getFoundPeers() {
170184 }
171185
172186 private List <DiscoveryNode > getFoundPeersUnderLock () {
173- assert active ;
174187 assert holdsLock () : "PeerFinder mutex not held" ;
175188 return peersByAddress .values ().stream ().map (Peer ::getDiscoveryNode ).filter (Objects ::nonNull ).collect (Collectors .toList ());
176189 }
@@ -181,16 +194,21 @@ private Peer createConnectingPeer(TransportAddress transportAddress) {
181194 return peer ;
182195 }
183196
184- private void handleWakeUp () {
197+ /**
198+ * @return whether any peers were removed due to disconnection
199+ */
200+ private boolean handleWakeUp () {
185201 assert holdsLock () : "PeerFinder mutex not held" ;
186202
203+ boolean peersRemoved = false ;
204+
187205 for (final Peer peer : peersByAddress .values ()) {
188- peer .handleWakeUp ();
206+ peersRemoved = peer .handleWakeUp () || peersRemoved ; // care: avoid short-circuiting, each peer needs waking up
189207 }
190208
191209 if (active == false ) {
192210 logger .trace ("not active" );
193- return ;
211+ return peersRemoved ;
194212 }
195213
196214 logger .trace ("probing master nodes from cluster state: {}" , lastAcceptedNodes );
@@ -220,15 +238,20 @@ public void onFailure(Exception e) {
220238 @ Override
221239 protected void doRun () {
222240 synchronized (mutex ) {
223- handleWakeUp ();
241+ if (handleWakeUp () == false ) {
242+ return ;
243+ }
224244 }
245+ onFoundPeersUpdated ();
225246 }
226247
227248 @ Override
228249 public String toString () {
229250 return "PeerFinder::handleWakeUp" ;
230251 }
231252 });
253+
254+ return peersRemoved ;
232255 }
233256
234257 private void startProbe (TransportAddress transportAddress ) {
@@ -260,12 +283,12 @@ DiscoveryNode getDiscoveryNode() {
260283 return discoveryNode .get ();
261284 }
262285
263- void handleWakeUp () {
286+ boolean handleWakeUp () {
264287 assert holdsLock () : "PeerFinder mutex not held" ;
265288
266289 if (active == false ) {
267290 removePeer ();
268- return ;
291+ return true ;
269292 }
270293
271294 final DiscoveryNode discoveryNode = getDiscoveryNode ();
@@ -279,8 +302,11 @@ void handleWakeUp() {
279302 } else {
280303 logger .trace ("{} no longer connected" , this );
281304 removePeer ();
305+ return true ;
282306 }
283307 }
308+
309+ return false ;
284310 }
285311
286312 void establishConnection () {
@@ -295,12 +321,17 @@ public void onResponse(DiscoveryNode remoteNode) {
295321 assert remoteNode .isMasterNode () : remoteNode + " is not master-eligible" ;
296322 assert remoteNode .equals (getLocalNode ()) == false : remoteNode + " is the local node" ;
297323 synchronized (mutex ) {
298- if (active ) {
299- assert discoveryNode .get () == null : "discoveryNode unexpectedly already set to " + discoveryNode .get ();
300- discoveryNode .set (remoteNode );
301- requestPeers ();
324+ if (active == false ) {
325+ return ;
302326 }
327+
328+ assert discoveryNode .get () == null : "discoveryNode unexpectedly already set to " + discoveryNode .get ();
329+ discoveryNode .set (remoteNode );
330+ requestPeers ();
303331 }
332+
333+ assert holdsLock () == false : "PeerFinder mutex is held in error" ;
334+ onFoundPeersUpdated ();
304335 }
305336
306337 @ Override
0 commit comments