Skip to content

Commit

Permalink
sbft basic request retransmission
Browse files Browse the repository at this point in the history
sbft was so far losing request in some cases during view change (e.g., new
test TestViewChangeWithRetransmission in simplebft_test.go). Two tests were
modified as they were expecting lost requests.

This is a basic implementation that will be replaced by a fix to FAB-474.

Change-Id: I366881d71fdd4ac69b7ab6dc88229d2bc2ec18c3
Signed-off-by: Marko Vukolic <mvu@zurich.ibm.com>
  • Loading branch information
Marko Vukolic committed Nov 26, 2016
1 parent f046f3c commit eb71cfe
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 10 deletions.
2 changes: 1 addition & 1 deletion orderer/sbft/simplebft/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) {
// ignore null requests
batch := *s.cur.preprep.Batch
batch.Signatures = cpset
s.sys.Deliver(&batch)
s.deliverBatch(&batch)

s.cur.timeout.Cancel()
log.Infof("request %s %s completed on %d", s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id)
Expand Down
2 changes: 1 addition & 1 deletion orderer/sbft/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *SBFT) handleHello(h *Hello, src uint64) {
}

if s.sys.LastBatch().DecodeHeader().Seq < bh.Seq {
s.sys.Deliver(h.Batch)
s.deliverBatch(h.Batch)
}

if h.NewView != nil {
Expand Down
2 changes: 1 addition & 1 deletion orderer/sbft/simplebft/newview.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,6 @@ func (s *SBFT) maybeDeliverUsingXset(nv *NewView) {
prevBatch.Payloads = s.cur.preprep.Batch.Payloads
}
s.cur.checkpointDone = true
s.sys.Deliver(prevBatch)
s.deliverBatch(prevBatch)
}
}
10 changes: 9 additions & 1 deletion orderer/sbft/simplebft/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func (s *SBFT) Request(req []byte) {
}

func (s *SBFT) handleRequest(req *Request, src uint64) {
key := hash2str(hash(req.Payload))
log.Infof("replica %d inserting %x into pending", s.id, key)
s.pending[key] = req
if s.isPrimary() && s.activeView {
s.batch = append(s.batch, req)
if s.batchSize() >= s.config.BatchSizeBytes {
Expand Down Expand Up @@ -65,7 +68,12 @@ func (s *SBFT) maybeSendNextBatch() {
}

if len(s.batch) == 0 {
return
for _, req := range s.pending {
s.batch = append(s.batch, req)
}
if len(s.batch) == 0 {
return
}
}

batch := s.batch
Expand Down
12 changes: 12 additions & 0 deletions orderer/sbft/simplebft/simplebft.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type SBFT struct {
viewChangeTimeout time.Duration
viewChangeTimer Canceller
replicaState []replicaInfo
pending map[string]*Request
}

type reqInfo struct {
Expand Down Expand Up @@ -106,6 +107,7 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) {
id: id,
viewChangeTimer: dummyCanceller{},
replicaState: make([]replicaInfo, config.N),
pending: make(map[string]*Request),
}
s.sys.SetReceiver(s)

Expand Down Expand Up @@ -226,3 +228,13 @@ func (s *SBFT) handleQueueableMessage(m *Msg, src uint64) {

log.Warningf("received invalid message from %d", src)
}

func (s *SBFT) deliverBatch(batch *Batch) {
s.sys.Deliver(batch)

for _, req := range batch.Payloads {
key := hash2str(hash(req))
log.Infof("replica %d attempting to remove %x from pending", s.id, key)
delete(s.pending, key)
}
}
57 changes: 51 additions & 6 deletions orderer/sbft/simplebft/simplebft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,14 @@ func TestByzPrimary(t *testing.T) {
repls[0].Request(r1)
sys.Run()
for _, a := range adapters {
if len(a.batches) != 1 {
t.Fatal("expected execution of 1 batch")
if len(a.batches) != 2 {
t.Fatal("expected execution of 2 batches")
}
if !reflect.DeepEqual([][]byte{r2}, a.batches[0].Payloads) {
t.Error("wrong request executed")
t.Error("wrong request executed first")
}
if !reflect.DeepEqual([][]byte{r1}, a.batches[1].Payloads) {
t.Error("wrong request executed second")
}
}
}
Expand Down Expand Up @@ -256,6 +259,45 @@ func TestViewChange(t *testing.T) {
}
}

func TestViewChangeWithRetransmission(t *testing.T) {
N := uint64(4)
sys := newTestSystem(N)
var repls []*SBFT
var adapters []*testSystemAdapter
for i := uint64(0); i < N; i++ {
a := sys.NewAdapter(i)
s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a)
if err != nil {
t.Fatal(err)
}
repls = append(repls, s)
adapters = append(adapters, a)
}

// network outage after prepares are received
sys.filterFn = func(e testElem) (testElem, bool) {
if msg, ok := e.ev.(*testMsgEvent); ok {
if c := msg.msg.GetPrepare(); c != nil && c.Seq.View == 0 {
return e, false
}
}
return e, true
}

connectAll(sys)
r1 := []byte{1, 2, 3}
repls[0].Request(r1)
sys.Run()
for _, a := range adapters {
if len(a.batches) != 1 {
t.Fatal("expected execution of 1 batch")
}
if !reflect.DeepEqual([][]byte{r1}, a.batches[0].Payloads) {
t.Error("wrong request executed (1)")
}
}
}

func TestViewChangeXset(t *testing.T) {
N := uint64(4)
sys := newTestSystem(N)
Expand Down Expand Up @@ -320,11 +362,14 @@ func TestViewChangeXset(t *testing.T) {
if i == 3 {
continue
}
if len(a.batches) != 1 {
if len(a.batches) != 2 {
t.Fatalf("expected execution of 1 batch: %v", a.batches)
}
if !reflect.DeepEqual([][]byte{r2}, a.batches[0].Payloads) {
t.Error("wrong request executed")
if !reflect.DeepEqual([][]byte{r1}, a.batches[0].Payloads) {
t.Error("wrong request executed first")
}
if !reflect.DeepEqual([][]byte{r2}, a.batches[1].Payloads) {
t.Error("wrong request executed second")
}
}
}
Expand Down

0 comments on commit eb71cfe

Please sign in to comment.