29
29
import java .util .ArrayList ;
30
30
import java .util .Arrays ;
31
31
import java .util .List ;
32
+ import java .util .Map ;
32
33
import java .util .Random ;
33
34
import java .util .concurrent .CompletableFuture ;
34
35
import java .util .concurrent .CountDownLatch ;
@@ -111,6 +112,13 @@ public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest req
111
112
return CompletableFuture .completedFuture (new HeartBeatResponse ().term (memberState .currTerm ()).code (DLedgerResponseCode .UNEXPECTED_MEMBER .getCode ()));
112
113
}
113
114
115
+ if (memberState .isCandidate () && request .isNeedCheckMemberState ()) {
116
+ logger .warn ("[CHECK_MEMBER_STATE] [HandleHeartBeat] remoteId={} need check member state" , request .getLeaderId ());
117
+ if (request .getTerm () < memberState .currTerm ()) {
118
+ memberState .recoveryToFollower (request .getTerm (), request .getLeaderId ());
119
+ }
120
+ }
121
+
114
122
if (request .getTerm () < memberState .currTerm ()) {
115
123
return CompletableFuture .completedFuture (new HeartBeatResponse ().term (memberState .currTerm ()).code (DLedgerResponseCode .EXPIRED_TERM .getCode ()));
116
124
} else if (request .getTerm () == memberState .currTerm ()) {
@@ -283,10 +291,12 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
283
291
break ;
284
292
}
285
293
286
- if (x .getCode () == DLedgerResponseCode .NETWORK_ERROR .getCode ())
294
+ if (x .getCode () == DLedgerResponseCode .NETWORK_ERROR .getCode ()) {
287
295
memberState .getPeersLiveTable ().put (id , Boolean .FALSE );
288
- else
296
+ } else {
289
297
memberState .getPeersLiveTable ().put (id , Boolean .TRUE );
298
+ memberState .getPeersTermTable ().put (id , x .getTerm ());
299
+ }
290
300
291
301
if (memberState .isQuorum (succNum .get ())
292
302
|| memberState .isQuorum (succNum .get () + notReadyNum .get ())) {
@@ -305,6 +315,7 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
305
315
beatLatch .await (heartBeatTimeIntervalMs , TimeUnit .MILLISECONDS );
306
316
if (memberState .isQuorum (succNum .get ())) {
307
317
lastSuccHeartBeatTime = System .currentTimeMillis ();
318
+ checkPeersTermTable ();
308
319
} else {
309
320
logger .info ("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}" ,
310
321
memberState .getSelfId (), DLedgerUtils .elapsed (startHeartbeatTimeMs ), term , allNum .get (), succNum .get (), notReadyNum .get (), inconsistLeader .get (), maxTerm .get (), memberState .peerSize (), new Timestamp (lastSuccHeartBeatTime ));
@@ -320,6 +331,28 @@ private void sendHeartbeats(long term, String leaderId) throws Exception {
320
331
}
321
332
}
322
333
334
+ private void checkPeersTermTable () throws Exception {
335
+ if (memberState .getSelfId ().equals (memberState .getLeaderId ())) {
336
+ long leaderTerm = memberState .getPeersTermTable ().getOrDefault (memberState .getLeaderId (), -1L );
337
+ for (Map .Entry <String , Long > entryTerm : memberState .getPeersTermTable ().entrySet ()) {
338
+ if (entryTerm .getKey ().equals (memberState .getSelfId ())) {
339
+ continue ;
340
+ }
341
+
342
+ if (entryTerm .getValue () > leaderTerm ) {
343
+ HeartBeatRequest heartBeatRequest = new HeartBeatRequest ();
344
+ heartBeatRequest .setGroup (memberState .getGroup ());
345
+ heartBeatRequest .setLocalId (memberState .getSelfId ());
346
+ heartBeatRequest .setRemoteId (memberState .getSelfId ());
347
+ heartBeatRequest .setLeaderId (memberState .getLeaderId ());
348
+ heartBeatRequest .setNeedCheckMemberState (true );
349
+ heartBeatRequest .setTerm (leaderTerm );
350
+ dLedgerRpcService .heartBeat (heartBeatRequest );
351
+ }
352
+ }
353
+ }
354
+ }
355
+
323
356
private void maintainAsLeader () throws Exception {
324
357
if (DLedgerUtils .elapsed (lastSendHeartBeatTime ) > heartBeatTimeIntervalMs ) {
325
358
long term ;
0 commit comments