diff --git a/consensus/pbft/pbft-core_test.go b/consensus/pbft/pbft-core_test.go index 6cb656ff696..f42dd095720 100644 --- a/consensus/pbft/pbft-core_test.go +++ b/consensus/pbft/pbft-core_test.go @@ -1179,6 +1179,73 @@ func TestReplicaCrash3(t *testing.T) { } } +// TestReplicaCrash4 simulates the restart with no checkpoints +// in the store because they have been garbage collected +// the bug occurs because the low watermark is incorrectly set to +// be zero +func TestReplicaCrash4(t *testing.T) { + validatorCount := 4 + config := loadConfig() + config.Set("general.K", 2) + config.Set("general.logmultiplier", 2) + net := makePBFTNetwork(validatorCount, config) + defer net.stop() + + twoOffline := false + threeOffline := true + net.filterFn = func(src int, dst int, msg []byte) []byte { + if twoOffline && dst == 2 { // 2 is 'offline' + return nil + } + if threeOffline && dst == 3 { // 3 is 'offline' + return nil + } + return msg + } + + for i := int64(1); i <= 8; i++ { + net.pbftEndpoints[0].manager.Queue() <- createPbftReqBatch(i, uint64(generateBroadcaster(validatorCount))) + } + net.process() // vp0,1,2 should have a stable checkpoint for seqNo 8 + net.process() // this second time is necessary for garbage collection it seams + + // Now vp0,1,2 should be in sync with 8 executions in view 0, and vp4 should be offline + for i, pep := range net.pbftEndpoints { + + if i == 3 { + // 3 is offline for this test + continue + } + + if pep.pbft.view != 0 { + t.Errorf("Expected replica %d to be in view 1, got %d", pep.id, pep.pbft.view) + } + + expectedExecutions := uint64(8) + if pep.sc.executions != expectedExecutions { + t.Errorf("Expected %d executions on replica %d, got %d", expectedExecutions, pep.id, pep.sc.executions) + } + } + + // Create new pbft instances to restore from persistence + for id := 0; id < 3; id++ { + pe := net.pbftEndpoints[id] + config := loadConfig() + config.Set("general.K", "2") + pe.pbft.close() + pe.pbft = newPbftCore(uint64(id), config, pe.sc, events.NewTimerFactoryImpl(pe.manager)) + pe.manager.SetReceiver(pe.pbft) + pe.pbft.N = 4 + pe.pbft.f = (4 - 1) / 3 + pe.pbft.requestTimeout = 200 * time.Millisecond + + expected := uint64(8) + if pe.pbft.h != expected { + t.Errorf("Low watermark should have been %d, got %d", expected, pe.pbft.h) + } + } + +} func TestReplicaPersistQSet(t *testing.T) { persist := make(map[string][]byte) diff --git a/consensus/pbft/pbft-persist.go b/consensus/pbft/pbft-persist.go index 52e0f6dfbcd..a5b672ea26d 100644 --- a/consensus/pbft/pbft-persist.go +++ b/consensus/pbft/pbft-persist.go @@ -148,9 +148,11 @@ func (instance *pbftCore) restoreState() { logger.Warningf("Replica %d could not restore reqBatchStore: %s", instance.id, err) } + instance.restoreLastSeqNo() + chkpts, err := instance.consumer.ReadStateSet("chkpt.") if err == nil { - highSeq := uint64(0) + lowWatermark := instance.lastExec // This is safe because we will round down in moveWatermarks for key, id := range chkpts { var seqNo uint64 if _, err = fmt.Sscanf(key, "chkpt.%d", &seqNo); err != nil { @@ -159,20 +161,18 @@ func (instance *pbftCore) restoreState() { idAsString := base64.StdEncoding.EncodeToString(id) logger.Debugf("Replica %d found checkpoint %s for seqNo %d", instance.id, idAsString, seqNo) instance.chkpts[seqNo] = idAsString - if seqNo > highSeq { - highSeq = seqNo + if seqNo < lowWatermark { + lowWatermark = seqNo } } } - instance.moveWatermarks(highSeq) + instance.moveWatermarks(lowWatermark) } else { logger.Warningf("Replica %d could not restore checkpoints: %s", instance.id, err) } - instance.restoreLastSeqNo() - - logger.Infof("Replica %d restored state: view: %d, seqNo: %d, pset: %d, qset: %d, reqBatches: %d, chkpts: %d", - instance.id, instance.view, instance.seqNo, len(instance.pset), len(instance.qset), len(instance.reqBatchStore), len(instance.chkpts)) + logger.Infof("Replica %d restored state: view: %d, seqNo: %d, pset: %d, qset: %d, reqBatches: %d, chkpts: %d h: %d", + instance.id, instance.view, instance.seqNo, len(instance.pset), len(instance.qset), len(instance.reqBatchStore), len(instance.chkpts), instance.h) } func (instance *pbftCore) restoreLastSeqNo() {