Skip to content

Commit

Permalink
Merge changes I3e3bd029,Iaff9f6de,I6e1c70b5,Icab2222c
Browse files Browse the repository at this point in the history
* changes:
  sbft: only send next batch once previous is done
  sbft: send view change message on reconnect
  sbft: do not blindly go active in view on hello message
  sbft: don't act as primary if we're not active in view
  • Loading branch information
Simon Schubert authored and Gerrit Code Review committed Nov 22, 2016
2 parents 12c4dd4 + 1fd9f62 commit fbb06ba
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
9 changes: 7 additions & 2 deletions orderer/sbft/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func (s *SBFT) Connection(replica uint64) {
}
s.sys.Send(&Msg{&Msg_Hello{hello}}, replica)

svc := s.replicaState[s.id].signedViewchange
if svc != nil {
s.sys.Send(&Msg{&Msg_ViewChange{svc}}, replica)
}

// A reconnecting replica can play forward its blockchain to
// the batch listed in the hello message. However, the
// currently in-flight batch will not be reflected in the
Expand Down Expand Up @@ -88,10 +93,10 @@ func (s *SBFT) handleHello(h *Hello, src uint64) {
return
}

if s.view < h.NewView.View {
if s.view <= h.NewView.View {
s.view = h.NewView.View
s.activeView = true
}
s.activeView = true
}

s.replicaState[src].hello = h
Expand Down
6 changes: 3 additions & 3 deletions orderer/sbft/simplebft/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (s *SBFT) Request(req []byte) {
}

func (s *SBFT) handleRequest(req *Request, src uint64) {
if s.isPrimary() {
if s.isPrimary() && s.activeView {
s.batch = append(s.batch, req)
if s.batchSize() >= s.config.BatchSizeBytes {
s.maybeSendNextBatch()
Expand Down Expand Up @@ -56,11 +56,11 @@ func (s *SBFT) maybeSendNextBatch() {
s.batchTimer = nil
}

if !s.isPrimary() {
if !s.isPrimary() || !s.activeView {
return
}

if !s.cur.executed {
if !s.cur.checkpointDone {
return
}

Expand Down
65 changes: 65 additions & 0 deletions orderer/sbft/simplebft/simplebft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func TestRestart(t *testing.T) {
for _, r := range repls {
r.sendViewChange()
}
sys.Run()

r1 := []byte{1, 2, 3}
repls[0].Request(r1)
Expand Down Expand Up @@ -429,6 +430,7 @@ func TestRestartAfterPrepare(t *testing.T) {
for _, r := range repls {
r.sendViewChange()
}
sys.Run()

r1 := []byte{1, 2, 3}
repls[0].Request(r1)
Expand Down Expand Up @@ -497,6 +499,7 @@ func TestRestartAfterCommit(t *testing.T) {
for _, r := range repls {
r.sendViewChange()
}
sys.Run()

r1 := []byte{1, 2, 3}
repls[0].Request(r1)
Expand Down Expand Up @@ -565,6 +568,7 @@ func TestRestartAfterCheckpoint(t *testing.T) {
for _, r := range repls {
r.sendViewChange()
}
sys.Run()

r1 := []byte{1, 2, 3}
repls[0].Request(r1)
Expand Down Expand Up @@ -653,6 +657,7 @@ func TestErroneousViewChange(t *testing.T) {
for _, r := range repls {
r.sendViewChange()
}
sys.Run()

r1 := []byte{1, 2, 3}
repls[0].Request(r1)
Expand Down Expand Up @@ -717,6 +722,7 @@ func TestRestartMissedViewChange(t *testing.T) {
r.sendViewChange()
}
}
sys.Run()

r2 := []byte{3, 1, 2}
repls[1].Request(r2)
Expand Down Expand Up @@ -879,3 +885,62 @@ func TestViewChangeTimer(t *testing.T) {
}
}
}

func TestResendViewChange(t *testing.T) {
N := uint64(4)
sys := newTestSystem(N)
var repls []*SBFT
var adapters []*testSystemAdapter
for i := uint64(0); i < N; i++ {
a := sys.NewAdapter(i)
s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a)
if err != nil {
t.Fatal(err)
}
repls = append(repls, s)
adapters = append(adapters, a)
}

phase := make(map[uint64]int)

// prevent first view change from being delivered
sys.filterFn = func(e testElem) (testElem, bool) {
if msg, ok := e.ev.(*testMsgEvent); ok {
if msg.dst == msg.src {
return e, true
} else if phase[msg.src] == 0 && msg.msg.GetViewChange() != nil {
return e, false
} else if msg.msg.GetHello() != nil {
phase[msg.src] = 1
}
}

return e, true
}

for _, r := range repls {
r.sendViewChange()
}
sys.Run()

connectAll(sys)
r1 := []byte{1, 2, 3}
repls[0].Request(r1)
sys.Run()
r2 := []byte{3, 1, 2}
r3 := []byte{3, 5, 2}
repls[1].Request(r2)
repls[1].Request(r3)
sys.Run()
for _, a := range adapters {
if len(a.batches) != 3 {
t.Fatal("expected execution of 2 batches")
}
if !reflect.DeepEqual([][]byte{r1}, a.batches[1].Payloads) {
t.Error("wrong request executed (1)")
}
if !reflect.DeepEqual([][]byte{r2, r3}, a.batches[2].Payloads) {
t.Error("wrong request executed (2)")
}
}
}

0 comments on commit fbb06ba

Please sign in to comment.