Skip to content

Commit

Permalink
streamline sbft new-view and commit handling
Browse files Browse the repository at this point in the history
streamlining and refactoring code.

Change-Id: Ib32a66ce9345ebfa0050bdf08aa12ba6b8ed7ba4
Signed-off-by: Marko Vukolic <mvu@zurich.ibm.com>
  • Loading branch information
Marko Vukolic committed Dec 17, 2016
1 parent ccb94c5 commit 21d471b
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 118 deletions.
19 changes: 15 additions & 4 deletions orderer/sbft/simplebft/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ package simplebft
import "reflect"

func (s *SBFT) maybeSendCommit() {
if s.cur.sentCommit || len(s.cur.prep) < s.noFaultyQuorum()-1 {
if s.cur.prepared || len(s.cur.prep) < s.noFaultyQuorum()-1 {
return
}
s.sendCommit()
s.processBacklog()
}

func (s *SBFT) sendCommit() {
s.cur.sentCommit = true
s.cur.prepared = true
c := s.cur.subject
s.sys.Persist("commit", &c)
s.sys.Persist(prepared, &c)
s.broadcast(&Msg{&Msg_Commit{&c}})
}

Expand All @@ -50,5 +50,16 @@ func (s *SBFT) handleCommit(c *Subject, src uint64) {
}
s.cur.commit[src] = c
s.cancelViewChangeTimer()
s.maybeExecute()

//maybe mark as comitted
if s.cur.committed || len(s.cur.commit) < s.noFaultyQuorum() {
return
}
s.cur.committed = true
log.Noticef("replica %d: executing %v %x", s.id, s.cur.subject.Seq, s.cur.subject.Digest)

s.sys.Persist(committed, &s.cur.subject)

s.sendCheckpoint()
s.processBacklog()
}
29 changes: 3 additions & 26 deletions orderer/sbft/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func (s *SBFT) Connection(replica uint64) {
} else {
s.sys.Send(&Msg{&Msg_Prepare{&s.cur.subject}}, replica)
}
if s.cur.sentCommit {
if s.cur.prepared {
s.sys.Send(&Msg{&Msg_Commit{&s.cur.subject}}, replica)
}
if s.cur.executed {
if s.cur.committed {
s.sys.Send(&Msg{&Msg_Checkpoint{s.makeCheckpoint()}}, replica)
}
}
Expand All @@ -78,32 +78,9 @@ func (s *SBFT) handleHello(h *Hello, src uint64) {
s.deliverBatch(h.Batch)
}

if h.NewView != nil && s.view <= h.NewView.View {
if s.primaryIDView(h.NewView.View) != src {
log.Warningf("replica %d: invalid hello with new view from non-primary %d", s.id, src)
return
}

vcs, err := s.checkNewViewSignatures(h.NewView)
if err != nil {
log.Warningf("replica %d: invalid hello new view from %d: %s", s.id, src, err)
return
}

_, _, ok := s.makeXset(vcs)
if !ok {
log.Warningf("replica %d: invalid hello new view xset from %d", s.id, src)
return
}

s.view = h.NewView.View
s.activeView = true

s.maybeDeliverUsingXset(h.NewView)
}
s.handleNewView(h.NewView, src)

s.replicaState[src].hello = h

s.discardBacklog(src)
s.processBacklog()
}
30 changes: 0 additions & 30 deletions orderer/sbft/simplebft/execute.go

This file was deleted.

56 changes: 15 additions & 41 deletions orderer/sbft/simplebft/newview.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (s *SBFT) checkNewViewSignatures(nv *NewView) ([]*ViewChange, error) {
}

func (s *SBFT) handleNewView(nv *NewView, src uint64) {
if nv == nil {
return
}

if nv.View < s.view {
log.Debugf("replica %d: discarding old new view from %d for %d, we are in %d", s.id, src, nv.View, s.view)
return
Expand All @@ -108,7 +112,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
return
}

xset, _, ok := s.makeXset(vcs)
xset, prevBatch, ok := s.makeXset(vcs)

if !ok || !reflect.DeepEqual(nv.Xset, xset) {
log.Warningf("replica %d: invalid new view from %d: xset incorrect: %v, %v", s.id, src, nv.Xset, xset)
Expand Down Expand Up @@ -139,36 +143,26 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
}
}

s.replicaState[s.primaryIDView(nv.View)].newview = nv
s.view = nv.View
s.activeView = false

s.processNewView()
}

func (s *SBFT) processNewView() {
if s.activeView {
return
}
s.discardBacklog(s.primaryID())

nv := s.replicaState[s.primaryIDView(s.view)].newview
if nv == nil || nv.View != s.view {
return
// maybe deliver using xset
if s.sys.LastBatch().DecodeHeader().Seq < prevBatch.DecodeHeader().Seq {
if prevBatch.DecodeHeader().Seq == s.cur.subject.Seq.Seq {
// we just received a signature set for a request which we preprepared, but never delivered.
prevBatch.Payloads = s.cur.preprep.Batch.Payloads
}
s.deliverBatch(prevBatch)
}

// after a new-view message, prepare to accept new requests.
s.activeView = true
s.discardBacklog(s.primaryID())

s.maybeDeliverUsingXset(nv)

// By now we cannot be waiting for any more outstanding
// messages. after a new-view message, by definition all
// activity has acquiesced. Prepare to accept a new request.
s.cur.checkpointDone = true
s.cur.subject.Seq.Seq = 0

log.Infof("replica %d now active in view %d; primary: %v", s.id, s.view, s.isPrimary())

//process pre-prepare if piggybacked to new-view
if nv.Batch != nil {
pp := &Preprepare{
Seq: &SeqView{Seq: nv.Batch.DecodeHeader().Seq, View: s.view},
Expand All @@ -184,23 +178,3 @@ func (s *SBFT) processNewView() {

s.processBacklog()
}

func (s *SBFT) maybeDeliverUsingXset(nv *NewView) {
// TODO we could cache vcs in replicaState
vcs, err := s.checkNewViewSignatures(nv)
if err != nil {
panic(err)
}

_, prevBatch, ok := s.makeXset(vcs)
if !ok {
panic("invalid newview")
}
if s.sys.LastBatch().DecodeHeader().Seq < prevBatch.DecodeHeader().Seq {
if prevBatch.DecodeHeader().Seq == s.cur.subject.Seq.Seq {
// we just received a signature set for a request which we preprepared, but never delivered.
prevBatch.Payloads = s.cur.preprep.Batch.Payloads
}
s.deliverBatch(prevBatch)
}
}
4 changes: 2 additions & 2 deletions orderer/sbft/simplebft/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *SBFT) sendPreprepare(batch []*Request) {
Batch: s.makeBatch(seq.Seq, lasthash, data),
}

s.sys.Persist("preprepare", m)
s.sys.Persist(preprepared, m)
s.broadcast(&Msg{&Msg_Preprepare{m}})
s.handleCheckedPreprepare(m)
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (s *SBFT) acceptPreprepare(pp *Preprepare) {
sub := Subject{Seq: pp.Seq, Digest: pp.Batch.Hash()}

log.Infof("replica %d: accepting preprepare for %v, %x", s.id, sub.Seq, sub.Digest)
s.sys.Persist("preprepare", pp)
s.sys.Persist(preprepared, pp)

s.cur = reqInfo{
subject: sub,
Expand Down
26 changes: 15 additions & 11 deletions orderer/sbft/simplebft/simplebft.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"github.com/op/go-logging"
)

const preprepared string = "preprepared"
const prepared string = "prepared"
const committed string = "committed"
const viewchange string = "viewchange"

// Receiver defines the API that is exposed by SBFT to the system.
type Receiver interface {
Receive(msg *Msg, src uint64)
Expand Down Expand Up @@ -75,9 +80,9 @@ type reqInfo struct {
preprep *Preprepare
prep map[uint64]*Subject
commit map[uint64]*Subject
sentCommit bool
executed bool
checkpoint map[uint64]*Checkpoint
prepared bool
committed bool
checkpointDone bool
}

Expand All @@ -86,7 +91,6 @@ type replicaInfo struct {
hello *Hello
signedViewchange *Signed
viewchange *ViewChange
newview *NewView
}

var log = logging.MustGetLogger("sbft")
Expand All @@ -113,14 +117,14 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) {

s.view = 0
s.cur.subject.Seq = &SeqView{}
s.cur.sentCommit = true
s.cur.executed = true
s.cur.prepared = true
s.cur.committed = true
s.cur.checkpointDone = true
s.cur.timeout = dummyCanceller{}
s.activeView = true

svc := &Signed{}
if s.sys.Restore("viewchange", svc) {
if s.sys.Restore(viewchange, svc) {
vc := &ViewChange{}
err := proto.Unmarshal(svc.Data, vc)
if err != nil {
Expand All @@ -132,20 +136,20 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) {
}

pp := &Preprepare{}
if s.sys.Restore("preprepare", pp) && pp.Seq.View >= s.view {
if s.sys.Restore(preprepared, pp) && pp.Seq.View >= s.view {
s.view = pp.Seq.View
s.activeView = true
if pp.Seq.Seq > s.seq() {
s.acceptPreprepare(pp)
}
}
c := &Subject{}
if s.sys.Restore("commit", c) && reflect.DeepEqual(c, &s.cur.subject) && c.Seq.View >= s.view {
s.cur.sentCommit = true
if s.sys.Restore(prepared, c) && reflect.DeepEqual(c, &s.cur.subject) && c.Seq.View >= s.view {
s.cur.prepared = true
}
ex := &Subject{}
if s.sys.Restore("execute", ex) && reflect.DeepEqual(c, &s.cur.subject) && ex.Seq.View >= s.view {
s.cur.executed = true
if s.sys.Restore(committed, ex) && reflect.DeepEqual(c, &s.cur.subject) && ex.Seq.View >= s.view {
s.cur.committed = true
}

s.cancelViewChangeTimer()
Expand Down
6 changes: 2 additions & 4 deletions orderer/sbft/simplebft/viewchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *SBFT) sendViewChange() {
log.Noticef("replica %d: sending viewchange for view %d", s.id, s.view)

var q, p []*Subject
if s.cur.sentCommit {
if s.cur.prepared {
p = append(p, &s.cur.subject)
}
if s.cur.preprep != nil {
Expand All @@ -51,10 +51,8 @@ func (s *SBFT) sendViewChange() {
s.viewChangeTimer.Cancel()
s.cur.timeout.Cancel()

s.sys.Persist("viewchange", svc)
s.sys.Persist(viewchange, svc)
s.broadcast(&Msg{&Msg_ViewChange{svc}})

s.processNewView()
}

func (s *SBFT) cancelViewChangeTimer() {
Expand Down

0 comments on commit 21d471b

Please sign in to comment.