Skip to content

Commit

Permalink
Fix viewchange threshold
Browse files Browse the repository at this point in the history
This implements the fix described in #2267

Also removes code from viewchange that made it go forward too fast.

Closes #2267
  • Loading branch information
ineiti committed May 13, 2020
1 parent 38ff4b7 commit 9b9c9a5
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 82 deletions.
5 changes: 3 additions & 2 deletions byzcoin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1732,7 +1732,8 @@ func (s *Service) updateTrieCallback(sbID skipchain.SkipBlockID) error {
// Start viewchange monitor that will fire if we don't get updates in time.
log.Lvlf2("%s started viewchangeMonitor for %x", s.ServerIdentity(), sb.SkipChainID())
s.viewChangeMan.add(s.sendViewChangeReq, s.sendNewView, s.isLeader, string(sb.SkipChainID()))
s.viewChangeMan.start(s.ServerIdentity().ID, sb.SkipChainID(), initialDur, s.getFaultThreshold(sb.Hash))
s.viewChangeMan.start(s.ServerIdentity().ID, sb.SkipChainID(), initialDur,
s.getSignatureThreshold(sb.Hash))
}
} else {
if s.heartbeats.exists(scIDstr) {
Expand Down Expand Up @@ -2822,7 +2823,7 @@ func (s *Service) startChain(genesisID skipchain.SkipBlockID) error {
s.viewChangeMan.add(s.sendViewChangeReq, s.sendNewView, s.isLeader,
string(genesisID))
s.viewChangeMan.start(s.ServerIdentity().ID, genesisID, initialDur,
s.getFaultThreshold(latest.Hash))
s.getSignatureThreshold(latest.Hash))

return nil
}
Expand Down
19 changes: 12 additions & 7 deletions byzcoin/viewchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ func (m *viewChangeManager) add(SendInitReq viewchange.SendInitReqFunc,

// actually starts the viewchange monitor. This should always be called after
// `add`, else `started` will not work
func (m *viewChangeManager) start(myID network.ServerIdentityID, scID skipchain.SkipBlockID, initialDuration time.Duration, f int) {
func (m *viewChangeManager) start(myID network.ServerIdentityID,
scID skipchain.SkipBlockID, initialDuration time.Duration, threshold int) {
k := string(scID)
m.Lock()
defer m.Unlock()
c, ok := m.controllers[k]
if !ok {
panic("never start without add first: " + log.Stack())
}
go c.Start(myID, scID, initialDuration, f)
go c.Start(myID, scID, initialDuration, threshold)
}

// started returns true if the monitor is started. This supposes that `start`
Expand Down Expand Up @@ -214,9 +215,9 @@ func (s *Service) computeInitialDuration(scID skipchain.SkipBlockID) (time.Durat
return s.rotationWindow * interval, nil
}

func (s *Service) getFaultThreshold(sbID skipchain.SkipBlockID) int {
func (s *Service) getSignatureThreshold(sbID skipchain.SkipBlockID) int {
sb := s.db().GetByID(sbID)
return (len(sb.Roster.List) - 1) / 3
return protocol.DefaultThreshold(len(sb.Roster.List))
}

// handleViewChangeReq should be registered as a handler for viewchange.InitReq
Expand Down Expand Up @@ -324,6 +325,7 @@ func (s *Service) verifyViewChange(msg []byte, data []byte) bool {
log.Error(s.ServerIdentity(), "digest doesn't verify")
return false
}

// Check that we know about the view and the new roster in the request
// matches the view-change proofs.
sb := s.db().GetByID(req.GetView().ID)
Expand All @@ -336,6 +338,7 @@ func (s *Service) verifyViewChange(msg []byte, data []byte) bool {
log.Error(s.ServerIdentity(), "invalid roster in request")
return false
}

// Check the signers are unique, they are in the roster and the
// signatures are correct.
uniqueSigners, uniqueViews := func() (int, int) {
Expand All @@ -354,16 +357,18 @@ func (s *Service) verifyViewChange(msg []byte, data []byte) bool {
}
return len(signers), len(views)
}()
f := s.getFaultThreshold(sb.Hash)
if uniqueSigners <= 2*f {

thr := s.getSignatureThreshold(sb.Hash)
if uniqueSigners < thr {
log.Errorf("%s: not enough proofs: %v < %v",
s.ServerIdentity(), uniqueSigners, 2*f+1)
s.ServerIdentity(), uniqueSigners, thr)
return false
}
if uniqueViews != 1 {
log.Error(s.ServerIdentity(), "conflicting views")
return false
}

// Put the roster in a map so that it's more efficient to search.
rosterMap := make(map[network.ServerIdentityID]*network.ServerIdentity)
for _, sid := range sb.Roster.List {
Expand Down
36 changes: 18 additions & 18 deletions byzcoin/viewchange/viewchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ func (c *Controller) Stop() {
}

// Start begins a blocking process that processes incoming view-requests.
func (c *Controller) Start(myID network.ServerIdentityID, genesis skipchain.SkipBlockID, initialDuration time.Duration, f int) {
func (c *Controller) Start(myID network.ServerIdentityID,
genesis skipchain.SkipBlockID, initialDuration time.Duration,
threshold int) {
meta := newStateLogs()
// Timer is only valid for the current ctr. It starts in the stopped
// state, we can't set it to nil because it gets de-referenced in
Expand All @@ -142,30 +144,28 @@ func (c *Controller) Start(myID network.ServerIdentityID, genesis skipchain.Skip
select {
case req := <-c.reqChan:
// Transition: received a view-change request, start
// the timer and move the state if there are 2f+1 valid
// the timer and move the state if there are 'threshold' valid
// requests.
if myID.Equal(req.SignerID) {
log.Lvl4("adding anomaly:", req.View.LeaderIndex, req.SignerID.String())
log.Lvl4("adding anomaly:", req.View.LeaderIndex,
req.SignerID.String())
meta.add(req)
ctr = c.processAnomaly(req, &meta, ctr)
} else {
log.Lvl4("adding req:", req.View.LeaderIndex, req.SignerID.String())
log.Lvl4("adding req:", req.View.LeaderIndex,
req.SignerID.String())
meta.add(req)
if meta.highest() > ctr && meta.countOf(meta.highest()) > f {
// To avoid starting view-change too late, if
// another honest node detects an anomaly,
// we'll report it too.
stopTimer(timer, c.stopTimerChan, ctr)
reqNew := InitReq{
View: req.View,
SignerID: myID,
}
ctr = c.processAnomaly(reqNew, &meta, ctr)
}
// Previously the node started to propagate view-changes
// already if enough other nodes sent a view-change.
// But this means that a node would participate in a view
// -change, even if it thinks it's not correct.
// So this code has been removed.
// But it might be breaking stuff :(
}
log.Lvlf2("counter: %d, f: %d, meta (ctr/state): %d/%d, "+
"req: %+v", ctr, f, meta.countOf(ctr), meta.stateOf(ctr), req)
if meta.countOf(ctr) > 2*f && meta.stateOf(ctr) < startedTimerState && meta.acceptOf(ctr) {
log.Lvlf2("counter: %d, thr: %d, meta[ctr] (#/state): %d/%d, "+
"req: %+v", ctr, threshold, meta.countOf(ctr), meta.stateOf(ctr), req)
if meta.countOf(ctr) >= threshold && meta.stateOf(
ctr) < startedTimerState && meta.acceptOf(ctr) {
// To avoid starting the next view-change too
// soon, start view-change timer after
// receiving 2*f+1 view-change messages.
Expand Down
45 changes: 1 addition & 44 deletions byzcoin/viewchange/viewchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ func TestViewChange_Timeout2(t *testing.T) {
testTimeout(t, 2)
}

func TestViewChange_AutoStart1(t *testing.T) {
testAutoStart(t, 1)
}

func TestViewChange_AutoStart2(t *testing.T) {
testAutoStart(t, 2)
}

// testSetupViewChangeF1 sets up the view-change log and sends f view-change
// messages. If anomaly is set then it sends one more message to the anomaly
// channel.
Expand All @@ -50,7 +42,7 @@ func testSetupViewChangeF1(t *testing.T, signerID [16]byte, dur time.Duration, f
nvChan <- true
}
vcl := NewController(vcF, nvF, func(v View) bool { return true })
go vcl.Start(signerID, []byte{}, dur, f)
go vcl.Start(signerID, []byte{}, dur, 2*f+1)

// We receive view-change requests and send our own because we detected
// an anomaly.
Expand Down Expand Up @@ -178,38 +170,3 @@ func testTimeout(t *testing.T, f int) {
require.Fail(t, "expected timer to expire")
}
}

// testAutoStart checks that we can start view-change not from sending in an
// anomaly (e.g., detected a timeout) but from receiving many view-change
// messages from other nodes.
func testAutoStart(t *testing.T, f int) {
dur := 100 * time.Millisecond
mySignerID := [16]byte{byte(255)}
vcChan, nvChan, view, vcl := testSetupViewChangeF1(t, mySignerID, dur, f, false)
defer vcl.Stop()
// Send a in regular request instead of an anomaly should trigger the
// anomaly case.
vcl.AddReq(InitReq{
SignerID: [16]byte{byte(f)},
View: view,
})
select {
case <-vcChan:
case <-time.After(10 * time.Millisecond):
require.Fail(t, "view change function should have been called")
}

// Then add more messages until we have 2f+1 should trigger a new-view
// message.
for i := f + 1; i < 2*f+1; i++ {
vcl.AddReq(InitReq{
SignerID: [16]byte{byte(i)},
View: view,
})
}
select {
case <-nvChan:
case <-time.After(10 * time.Millisecond):
require.Fail(t, "new view function should have been called")
}
}
35 changes: 24 additions & 11 deletions byzcoin/viewchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ func TestViewChange_LeaderIndex(t *testing.T) {
require.Error(t, err)
require.Equal(t, "leader index must be positive", err.Error())

view := viewchange.View{
ID: s.genesis.SkipChainID(),
Gen: s.genesis.SkipChainID(),
LeaderIndex: 7,
}
for i := 0; i < 5; i++ {
err := s.services[i].sendViewChangeReq(viewchange.View{
ID: s.genesis.SkipChainID(),
Gen: s.genesis.SkipChainID(),
LeaderIndex: 7,
s.services[i].viewChangeMan.addReq(viewchange.InitReq{
SignerID: s.services[i].ServerIdentity().ID,
View: view,
})
err := s.services[i].sendViewChangeReq(view)
require.NoError(t, err)
}

Expand Down Expand Up @@ -208,7 +213,7 @@ func TestViewChange_LostSync(t *testing.T) {
require.NoError(t, err)
require.NotEqual(t, sb.Hash, s.genesis.Hash)

// A new view change starts with a block ID different..
// Start a new view change with a different block ID
req = &viewchange.InitReq{
SignerID: s.services[0].ServerIdentity().ID,
View: viewchange.View{
Expand All @@ -230,15 +235,23 @@ func TestViewChange_LostSync(t *testing.T) {
log.OutputToOs()

// make sure a view change can still happen later
for i := 0; i < 2; i++ {
err := s.services[i].sendViewChangeReq(viewchange.View{
ID: sb.Hash,
Gen: s.genesis.SkipChainID(),
LeaderIndex: 3,
})
view := viewchange.View{
ID: sb.Hash,
Gen: s.genesis.SkipChainID(),
LeaderIndex: 3,
}
for i := 0; i < 4; i++ {
err := s.services[i].sendViewChangeReq(view)
require.NoError(t, err)
}
for i := 0; i < 4; i++ {
s.services[i].viewChangeMan.addReq(viewchange.InitReq{
SignerID: s.services[i].ServerIdentity().ID,
View: view,
})
}

log.Lvl1("Waiting for the new block to be propagated")
s.waitPropagation(t, 2)
for _, service := range s.services {
// everyone should have the same leader after the genesis block is stored
Expand Down

0 comments on commit 9b9c9a5

Please sign in to comment.