Skip to content

Commit

Permalink
Implement end-of-segment checkpoints in PBFT
Browse files Browse the repository at this point in the history
These checkpoints are necessary for liveness.
Each SB instance must be live independently of the other instances.
In particular, even instances that delivered all slots in a segment
must keep running until a global stable checkpoint is reached.
Otherwise the system might get stuck.

Signed-off-by: Matej Pavlovic <matopavlovic@gmail.com>
  • Loading branch information
matejpavlovic committed Jun 23, 2022
1 parent 3f24aee commit 6089c25
Show file tree
Hide file tree
Showing 13 changed files with 1,072 additions and 475 deletions.
4 changes: 2 additions & 2 deletions mir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var _ = Describe("Basic test", func() {
Directory: "",
Duration: 4 * time.Second,
}),
Entry("Does nothing with 4 nodes", &deploytest.TestConfig{
Entry("Does nothing with 4 nodes, one of them slow", &deploytest.TestConfig{
NumReplicas: 4,
Transport: "fake",
Directory: "",
Expand All @@ -185,7 +185,7 @@ var _ = Describe("Basic test", func() {
Directory: "mirbft-deployment-test",
Duration: 4 * time.Second,
}),
Entry("Submits 100 fake requests with 4 nodes", &deploytest.TestConfig{
Entry("Submits 100 fake requests with 4 nodes, one of them slow", &deploytest.TestConfig{
NumReplicas: 4,
NumClients: 0,
Transport: "fake",
Expand Down
4 changes: 4 additions & 0 deletions pkg/iss/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type Config struct {

// View change timeout for the PBFT sub-protocol, in ticks.
// TODO: Separate this in a sub-group of the ISS config, maybe even use a field of type PBFTConfig in Config.
PBFTDoneResendPeriod time.Duration
PBFTCatchUpDelay time.Duration
PBFTViewChangeBatchTimeout time.Duration
PBFTViewChangeSegmentTimeout time.Duration
PBFTViewChangeResendPeriod time.Duration
Expand Down Expand Up @@ -177,6 +179,8 @@ func DefaultConfig(membership []t.NodeID) *Config {
LeaderPolicy: &SimpleLeaderPolicy{Membership: membership},
RequestNAckTimeout: 16,
MsgBufCapacity: 32 * 1024 * 1024, // 32 MiB
PBFTDoneResendPeriod: maxProposeDelay,
PBFTCatchUpDelay: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
PBFTViewChangeBatchTimeout: 4 * maxProposeDelay,
PBFTViewChangeSegmentTimeout: 2 * time.Duration(segmentLength) * maxProposeDelay,
PBFTViewChangeResendPeriod: maxProposeDelay, // maxProposeDelay is picked quite arbitrarily, could be anything
Expand Down
2 changes: 2 additions & 0 deletions pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,8 @@ func newPBFTConfig(issConfig *Config) *PBFTConfig {
MaxProposeDelay: issConfig.MaxProposeDelay,
MsgBufCapacity: issConfig.MsgBufCapacity,
MaxBatchSize: issConfig.MaxBatchSize,
DoneResendPeriod: issConfig.PBFTDoneResendPeriod,
CatchUpDelay: issConfig.PBFTCatchUpDelay,
ViewChangeBatchTimeout: issConfig.PBFTViewChangeBatchTimeout,
ViewChangeSegmentTimeout: issConfig.PBFTViewChangeSegmentTimeout,
ViewChangeResendPeriod: issConfig.PBFTViewChangeResendPeriod,
Expand Down
20 changes: 16 additions & 4 deletions pkg/iss/pbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type pbftInstance struct {
// Each slot tracks the state of the agreement protocol for one sequence number.
slots map[t.PBFTViewNr]map[t.SeqNr]*pbftSlot

// Tracks the state of the segment-local checkpoint.
segmentCheckpoint *pbftSegmentChkp

// Logger for outputting debugging messages.
logger logging.Logger

Expand Down Expand Up @@ -95,10 +98,11 @@ func newPbftInstance(

// Set all the necessary fields of the new instance and return it.
return &pbftInstance{
ownID: ownID,
segment: segment,
config: config,
slots: make(map[t.PBFTViewNr]map[t.SeqNr]*pbftSlot),
ownID: ownID,
segment: segment,
config: config,
slots: make(map[t.PBFTViewNr]map[t.SeqNr]*pbftSlot),
segmentCheckpoint: newPbftSegmentChkp(),
proposal: pbftProposalState{
proposalsMade: 0,
numPendingRequests: numPendingRequests,
Expand Down Expand Up @@ -174,6 +178,8 @@ func (pbft *pbftInstance) applyHashResult(result *isspb.SBHashResult) *events.Ev
return pbft.applyMissingPreprepareHashResult(result.Digests[0], origin.PbftMissingPreprepare)
case *isspb.SBInstanceHashOrigin_PbftNewView:
return pbft.applyNewViewHashResult(result.Digests, origin.PbftNewView)
case *isspb.SBInstanceHashOrigin_PbftCatchUpResponse:
return pbft.applyCatchUpResponseHashResult(result.Digests[0], origin.PbftCatchUpResponse)
default:
panic(fmt.Sprintf("unknown hash origin type: %T", origin))
}
Expand Down Expand Up @@ -231,6 +237,12 @@ func (pbft *pbftInstance) applyMessageReceived(message *isspb.SBInstanceMessage,
return pbft.applyMsgMissingPreprepare(msg.PbftMissingPreprepare, from)
case *isspb.SBInstanceMessage_PbftNewView:
return pbft.applyMsgNewView(msg.PbftNewView, from)
case *isspb.SBInstanceMessage_PbftDone:
return pbft.applyMsgDone(msg.PbftDone, from)
case *isspb.SBInstanceMessage_PbftCatchUpRequest:
return pbft.applyMsgCatchUpRequest(msg.PbftCatchUpRequest, from)
case *isspb.SBInstanceMessage_PbftCatchUpResponse:
return pbft.applyMsgCatchUpResponse(msg.PbftCatchUpResponse, from)
default:
panic(fmt.Sprintf("unknown ISS PBFT message type: %T", message.Type))
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/iss/pbftconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ type PBFTConfig struct {
// Must not be negative.
MaxProposeDelay time.Duration

// When a node has committed all batches in a segment, it will periodically send the Done message
// in intervals of DoneResendPeriod.
DoneResendPeriod time.Duration

// After a node learns about a quorum of other nodes finishing a segment,
// it waits for CatchUpDelay before requesting missing committed batches from other nodes.
CatchUpDelay time.Duration

// Maximal number of bytes used for message backlogging buffers
// (only message payloads are counted towards MsgBufCapacity).
// Same as Config.MsgBufCapacity, but used only for one instance of PBFT.
Expand Down
29 changes: 29 additions & 0 deletions pkg/iss/pbftprotobufs.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,29 @@ func PbftNewViewSBMessage(newView *isspbftpb.NewView) *isspb.SBInstanceMessage {
}}
}

func PbftDoneSBMessage(digests [][]byte) *isspb.SBInstanceMessage {
return &isspb.SBInstanceMessage{Type: &isspb.SBInstanceMessage_PbftDone{
PbftDone: &isspbftpb.Done{
Digests: digests,
},
}}
}

func PbftCatchUpRequestSBMessage(sn t.SeqNr, digest []byte) *isspb.SBInstanceMessage {
return &isspb.SBInstanceMessage{Type: &isspb.SBInstanceMessage_PbftCatchUpRequest{
PbftCatchUpRequest: &isspbftpb.CatchUpRequest{
Digest: digest,
Sn: sn.Pb(),
},
}}
}

func PbftCatchUpResponseSBMessage(preprepare *isspbftpb.Preprepare) *isspb.SBInstanceMessage {
return &isspb.SBInstanceMessage{Type: &isspb.SBInstanceMessage_PbftCatchUpResponse{
PbftCatchUpResponse: preprepare,
}}
}

// ============================================================
// PBFT Message
// ============================================================
Expand Down Expand Up @@ -252,6 +275,12 @@ func newViewHashOrigin(newView *isspbftpb.NewView) *isspb.SBInstanceHashOrigin {
}}
}

func catchUpResponseHashOrigin(preprepare *isspbftpb.Preprepare) *isspb.SBInstanceHashOrigin {
return &isspb.SBInstanceHashOrigin{Type: &isspb.SBInstanceHashOrigin_PbftCatchUpResponse{
PbftCatchUpResponse: preprepare,
}}
}

// ============================================================
// Serialization
// ============================================================
Expand Down
Loading

0 comments on commit 6089c25

Please sign in to comment.