From e6f7b4deb68e887ac001217191cab3c4068cabf5 Mon Sep 17 00:00:00 2001 From: Matej Pavlovic Date: Wed, 29 Jun 2022 11:40:33 +0200 Subject: [PATCH] Keep resending critical messages in ISS Signed-off-by: Matej Pavlovic --- pkg/iss/checkpoint.go | 21 ++++++++++++++++++--- pkg/iss/config.go | 4 ++++ pkg/iss/iss.go | 9 +++++++-- pkg/iss/pbftgoodcase.go | 10 ++++++++-- pkg/iss/pbftsegmentchkp.go | 1 + pkg/iss/pbftviewchange.go | 7 ++++++- pkg/iss/pbftviewchangestate.go | 10 ++++++++-- 7 files changed, 52 insertions(+), 10 deletions(-) diff --git a/pkg/iss/checkpoint.go b/pkg/iss/checkpoint.go index 01e7e7b18..a846768c1 100644 --- a/pkg/iss/checkpoint.go +++ b/pkg/iss/checkpoint.go @@ -13,6 +13,7 @@ package iss import ( "bytes" + "github.com/filecoin-project/mir/pkg/pb/eventpb" "github.com/filecoin-project/mir/pkg/events" "github.com/filecoin-project/mir/pkg/logging" @@ -57,15 +58,25 @@ type checkpointTracker struct { // Set of Checkpoint messages that were received ahead of time. pendingMessages map[t.NodeID]*isspb.Checkpoint + + // Time interval for repeated retransmission of checkpoint messages. + resendPeriod t.TimeDuration } // newCheckpointTracker allocates and returns a new instance of a checkpointTracker associated with sequence number sn. -func newCheckpointTracker(ownID t.NodeID, sn t.SeqNr, epoch t.EpochNr, logger logging.Logger) *checkpointTracker { +func newCheckpointTracker( + ownID t.NodeID, + sn t.SeqNr, + epoch t.EpochNr, + resendPeriod t.TimeDuration, + logger logging.Logger, +) *checkpointTracker { return &checkpointTracker{ Logger: logger, ownID: ownID, seqNr: sn, epoch: epoch, + resendPeriod: resendPeriod, signatures: make(map[t.NodeID][]byte), confirmations: make(map[t.NodeID]struct{}), pendingMessages: make(map[t.NodeID]*isspb.Checkpoint), @@ -125,9 +136,13 @@ func (ct *checkpointTracker) ProcessCheckpointSignResult(signature []byte) *even walEvent := events.WALAppend(walModuleName, persistEvent, t.WALRetIndex(ct.epoch)) // Send a checkpoint message to all nodes after persisting checkpoint to the WAL. - // TODO: Implement checkpoint message retransmission. m := CheckpointMessage(ct.epoch, ct.seqNr, ct.appSnapshotHash, signature) - walEvent.FollowUp(events.SendMessage(netModuleName, m, ct.membership)) + walEvent.FollowUp(events.TimerRepeat( + "timer", + []*eventpb.Event{events.SendMessage(netModuleName, m, ct.membership)}, + ct.resendPeriod, + t.TimerRetIndex(ct.epoch)), + ) // Apply pending Checkpoint messages for s, m := range ct.pendingMessages { diff --git a/pkg/iss/config.go b/pkg/iss/config.go index c30c8749d..bc1cc6904 100644 --- a/pkg/iss/config.go +++ b/pkg/iss/config.go @@ -95,6 +95,9 @@ type Config struct { // and, if so, sends them the latest state. CatchUpTimerPeriod time.Duration + // Time interval for repeated retransmission of checkpoint messages. + CheckpointResendPeriod time.Duration + // View change timeout for the PBFT sub-protocol, in ticks. // TODO: Separate this in a sub-group of the ISS config, maybe even use a field of type PBFTConfig in Config. PBFTDoneResendPeriod time.Duration @@ -194,6 +197,7 @@ func DefaultConfig(membership []t.NodeID) *Config { CatchUpTimerPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything PBFTDoneResendPeriod: maxProposeDelay, PBFTCatchUpDelay: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything + CheckpointResendPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything PBFTViewChangeBatchTimeout: 4 * maxProposeDelay, PBFTViewChangeSegmentTimeout: 2 * time.Duration(segmentLength) * maxProposeDelay, PBFTViewChangeResendPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything diff --git a/pkg/iss/iss.go b/pkg/iss/iss.go index b9ce97934..315cc6c0b 100644 --- a/pkg/iss/iss.go +++ b/pkg/iss/iss.go @@ -784,8 +784,13 @@ func (iss *ISS) initEpoch(newEpoch t.EpochNr) { epoch := &epochInfo{ Nr: newEpoch, Membership: iss.config.Membership, // TODO: Make a proper copy once reconfiguration is supported. - Checkpoint: newCheckpointTracker(iss.ownID, iss.nextDeliveredSN, newEpoch, - logging.Decorate(iss.logger, "CT: ", "epoch", newEpoch)), + Checkpoint: newCheckpointTracker( + iss.ownID, + iss.nextDeliveredSN, + newEpoch, + t.TimeDuration(iss.config.CheckpointResendPeriod), + logging.Decorate(iss.logger, "CT: ", "epoch", newEpoch), + ), } iss.epochs[newEpoch] = epoch iss.epoch = epoch diff --git a/pkg/iss/pbftgoodcase.go b/pkg/iss/pbftgoodcase.go index 300ec5ded..fe4920c14 100644 --- a/pkg/iss/pbftgoodcase.go +++ b/pkg/iss/pbftgoodcase.go @@ -134,6 +134,8 @@ func (pbft *pbftInstance) propose(batch *requestpb.Batch) *events.EventList { preprepare := pbftPreprepareMsg(sn, pbft.view, batch, false) // Create a Preprepare message send Event. + // No need for periodic re-transmission. + // In the worst case, dropping of these messages may result in a view change, but will not compromise correctness. msgSendEvent := pbft.eventService.SendMessage( PbftPreprepareSBMessage(preprepare), pbft.segment.Membership, @@ -234,7 +236,9 @@ func (pbft *pbftInstance) sendPrepare(prepare *isspbftpb.Prepare) *events.EventL // Create persist event. persistEvent := pbft.eventService.WALAppend(PbftPersistPrepare(prepare)) - // Append send event as a follow-up + // Append send event as a follow-up. + // No need for periodic re-transmission. + // In the worst case, dropping of these messages may result in a view change, but will not compromise correctness. persistEvent.FollowUp(pbft.eventService.SendMessage( PbftPrepareSBMessage(prepare), pbft.segment.Membership, @@ -279,7 +283,9 @@ func (pbft *pbftInstance) sendCommit(commit *isspbftpb.Commit) *events.EventList // Create persist event. persistEvent := pbft.eventService.WALAppend(PbftPersistCommit(commit)) - // Append send event as a follow-up + // Append send event as a follow-up. + // No need for periodic re-transmission. + // In the worst case, dropping of these messages may result in a view change, but will not compromise correctness. persistEvent.FollowUp(pbft.eventService.SendMessage( PbftCommitSBMessage(commit), pbft.segment.Membership, diff --git a/pkg/iss/pbftsegmentchkp.go b/pkg/iss/pbftsegmentchkp.go index 08cdec583..a3b9dd838 100644 --- a/pkg/iss/pbftsegmentchkp.go +++ b/pkg/iss/pbftsegmentchkp.go @@ -224,6 +224,7 @@ func (pbft *pbftInstance) applyMsgCatchUpRequest( if preprepare := pbft.lookUpPreprepare(t.SeqNr(catchUpReq.Sn), catchUpReq.Digest); preprepare != nil { // If the requested Preprepare message is available, send it to the originator of the request. + // No need for periodic re-transmission. The requester will re-transmit the request if needed. return events.ListOf(pbft.eventService.SendMessage(PbftCatchUpResponseSBMessage(preprepare), []t.NodeID{from})) } diff --git a/pkg/iss/pbftviewchange.go b/pkg/iss/pbftviewchange.go index 6563b57dd..5f3ac28e7 100644 --- a/pkg/iss/pbftviewchange.go +++ b/pkg/iss/pbftviewchange.go @@ -216,8 +216,8 @@ func (pbft *pbftInstance) applyEmptyPreprepareHashResult(digests [][]byte, view return pbft.sendNewView(view, state) } - pbft.logger.Log(logging.LevelDebug, "Some Preprepares missing. Asking for retransmission.") // If some Preprepares for re-proposing are still missing, fetch them from other nodes. + pbft.logger.Log(logging.LevelDebug, "Some Preprepares missing. Asking for retransmission.") return state.askForMissingPreprepares(pbft.eventService) } @@ -228,6 +228,9 @@ func (pbft *pbftInstance) applyMsgPreprepareRequest( if preprepare := pbft.lookUpPreprepare(t.SeqNr(preprepareRequest.Sn), preprepareRequest.Digest); preprepare != nil { // If the requested Preprepare message is available, send it to the originator of the request. + // No need for periodic re-transmission. + // In the worst case, dropping of these messages may result in another view change, + // but will not compromise correctness. return events.ListOf( pbft.eventService.SendMessage(PbftMissingPreprepareSBMessage(preprepare), []t.NodeID{from}), ) @@ -313,6 +316,8 @@ func (pbft *pbftInstance) sendNewView(view t.PBFTViewNr, vcState *pbftViewChange }) // Construct, persist and send the NewView message. + // No need for periodic re-transmission. + // In the worst case, dropping of these messages may result in a view change, but will not compromise correctness. newView := pbftNewViewMsg(view, viewChangeSenders, signedViewChanges, preprepareSeqNrs, preprepares) persistEvent := pbft.eventService.WALAppend(PbftPersistNewView(newView)) persistEvent.FollowUp(pbft.eventService.SendMessage(PbftNewViewSBMessage(newView), pbft.segment.Membership)) diff --git a/pkg/iss/pbftviewchangestate.go b/pkg/iss/pbftviewchangestate.go index dd8949465..ff8a3c037 100644 --- a/pkg/iss/pbftviewchangestate.go +++ b/pkg/iss/pbftviewchangestate.go @@ -146,10 +146,16 @@ func (vcState *pbftViewChangeState) SetLocalPreprepares(pbft *pbftInstance, view } } +// askForMissingPreprepares requests the Preprepare messages that are part of a new view. +// The new primary might have received a prepare certificate from other nodes in the ViewChange messages they sent +// and thus the new primary has to re-propose the corresponding batch by including the corresponding Preprepare message +// the NewView message. However, the new primary might not have all the corresponding Preprepare messages, +// in which case it calls this function. +// Note that the requests for missing Preprepare messages need not necessarily be periodically re-transmitted. +// If they are dropped, the new primary will simply never send a NewView message +// and will be succeeded by another primary after another view change. func (vcState *pbftViewChangeState) askForMissingPreprepares(eventService *sbEventService) *events.EventList { - // TODO: Do this periodically, not just once. Messages might get lost! - eventsOut := events.EmptyList() for sn, digest := range vcState.reproposals { if len(digest) > 0 && vcState.preprepares[sn] == nil {