From cbd1ea0c148f51fd36cb85a36626f81d3e83eb17 Mon Sep 17 00:00:00 2001 From: Simon Schubert Date: Tue, 4 Oct 2016 15:19:56 +0200 Subject: [PATCH] sbft: refactor + document future directions Squashed commit of the following: commit ad540eeb38effe086851c6d9c58a0192e7034b10 Author: Simon Schubert Date: Tue Oct 4 15:09:38 2016 +0200 address golint concerns Change-Id: I59ae215d771a128cac740489a1d6308ebd1fb2a5 Signed-off-by: Simon Schubert commit d60d5f54bb5ca04e5857be13cd5dbfd3d67e5ac6 Author: Simon Schubert Date: Tue Oct 4 14:51:14 2016 +0200 add comments on backlog and state transfer strategy Change-Id: Iedc84f196c76e1cd12c88f2e74505cae64d80752 Signed-off-by: Simon Schubert commit 6169fd180918940e622a580a55d3be4243ccd919 Author: Simon Schubert Date: Mon Oct 3 15:35:00 2016 +0200 properly process queued checkpoint messages Change-Id: If6d8e58f8a02178a4a435542162cba52b233df92 Signed-off-by: Simon Schubert commit 017c174700e0a47015a6dfc8967576cd8872de79 Author: Simon Schubert Date: Mon Oct 3 15:33:57 2016 +0200 send hello message on connect Change-Id: Id76553a7403d730182ca5f8bd445053c28969f91 Signed-off-by: Simon Schubert commit b11ed71a695a96f103626d1db8629a39d39db7c4 Author: Simon Schubert Date: Mon Oct 3 14:28:52 2016 +0200 rename message Seq to SeqView Change-Id: I2cb6b4082b72642342ca09eeb4c0be7eb382356d Signed-off-by: Simon Schubert commit 12b1d77eb8e7e0e4e8f1a68e04c5183ddf6a40c0 Author: Simon Schubert Date: Mon Oct 3 14:24:22 2016 +0200 record signature origin in batch Change-Id: I9416e90ae01c35d548eeedb56ac6f345e9a0b63b Signed-off-by: Simon Schubert commit f18a7896183b94edd0ed1285fe2d8d827435e5bc Author: Simon Schubert Date: Mon Oct 3 12:50:47 2016 +0200 change info to warning Change-Id: If25917123731484784deec45af589e84a17d9c26 Signed-off-by: Simon Schubert commit 59e79929ffb18f7fd834f9bf9bd667109f6c9e5d Author: Simon Schubert Date: Mon Oct 3 12:50:28 2016 +0200 conform to protobuf style guide Change-Id: I851476b6449c17c88f31539c78221fc81338cadd Signed-off-by: Simon Schubert Change-Id: Iaa289455d6ebc7c31a1e4130b9d07f8759d05c88 Signed-off-by: Simon Schubert --- consensus/simplebft/backlog.go | 64 +++++++--- consensus/simplebft/batch.go | 1 + consensus/simplebft/checkpoint.go | 15 +-- consensus/simplebft/commit.go | 2 +- consensus/simplebft/connection.go | 42 +++++++ consensus/simplebft/crypto.go | 5 +- consensus/simplebft/newview.go | 8 +- consensus/simplebft/newview_test.go | 58 ++++----- consensus/simplebft/preprepare.go | 2 +- consensus/simplebft/request.go | 1 + consensus/simplebft/simplebft.go | 33 +++-- consensus/simplebft/simplebft.pb.go | 170 +++++++++++--------------- consensus/simplebft/simplebft.proto | 69 +++++------ consensus/simplebft/simplebft_test.go | 45 +++++++ consensus/simplebft/xset.go | 4 +- 15 files changed, 307 insertions(+), 212 deletions(-) create mode 100644 consensus/simplebft/connection.go diff --git a/consensus/simplebft/backlog.go b/consensus/simplebft/backlog.go index ebd23226f09..5c8d67a36d2 100644 --- a/consensus/simplebft/backlog.go +++ b/consensus/simplebft/backlog.go @@ -49,17 +49,33 @@ func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) { if src == s.id { panic("should never have to backlog my own message") } - // TODO prevent DoS by limiting the number of messages per replica + // TODO + // + // Prevent DoS by limiting the number of messages per replica. + // + // If the backlog limit is exceeded, discard all messages with + // Seq before the replica's hello message (we can, because we + // can play forward to this batch via state transfer). If + // there is no hello message, we must be really slow or the + // replica must be byzantine. In this case we probably should + // re-establish the connection. + // + // After the connection has been re-established, we will + // receive a hello, and the following messages will trigger + // the pruning of old messages. If this pruning lead us not + // to make progress, the backlog processing algorithm as lined + // out below will take care of starting a state transfer, + // using the hello message we received on reconnect. s.backLog[src] = append(s.backLog[src], m) } func (s *SBFT) processBacklog() { processed := true + notReady := uint64(0) for processed { processed = false - notReady := uint64(0) - for src, _ := range s.backLog { + for src := range s.backLog { for len(s.backLog[src]) > 0 { m, rest := s.backLog[src][0], s.backLog[src][1:] if s.testBacklog2(m, src) { @@ -74,16 +90,36 @@ func (s *SBFT) processBacklog() { processed = true } } - - // all minus us - if notReady >= s.config.N-1 { - // This is a problem - we consider all other replicas - // too far ahead for us. We need to do a state transfer - // to get out of this rut. - for src := range s.backLog { - delete(s.backLog, src) - } - // TODO trigger state transfer - } } + + // TODO + // + // Detect when we need to reconsider our options. + // + // We arrived here because either all is fine, we're with the + // pack. Or we have messages in the backlog because we're + // connected asymmetrically, and a close replica already + // started talking about the next batch while we're still + // waiting for rounds to arrive for our current batch. That's + // still fine. + // + // We might also be here because we lost connectivity, and we + // either missed some messages, or our connection is bad and + // we should reconnect to get a working connection going + // again. + // + // If a noFaultyQuorum (-1, because we're not faulty, just + // were disconnected) is backlogged, we know that we need to + // perform a state transfer. Of course, f of these might be + // byzantine, and the remaining f that are not backlogged will + // allow us to get unstuck. To check against that, we need to + // only consider backlogged replicas of which we have a hello + // message that talks about a future Seq. + // + // We need to pick the highest Seq of all the hello messages + // we received, perform a state transfer to that Batch, and + // discard all backlogged messages that refer to a lower Seq. + // + // Do we need to detect that a connection is stuck and we + // should reconnect? } diff --git a/consensus/simplebft/batch.go b/consensus/simplebft/batch.go index a070fb24e4e..df54e6c4791 100644 --- a/consensus/simplebft/batch.go +++ b/consensus/simplebft/batch.go @@ -59,6 +59,7 @@ func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) { //////////////////////////////////////// +// Hash returns the hash of the Batch. func (b *Batch) Hash() []byte { return hash(b.Header) } diff --git a/consensus/simplebft/checkpoint.go b/consensus/simplebft/checkpoint.go index 2abcd9319b3..abff06e049f 100644 --- a/consensus/simplebft/checkpoint.go +++ b/consensus/simplebft/checkpoint.go @@ -75,12 +75,10 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) { // got a weak checkpoint - cpset := &CheckpointSet{make(map[uint64]*Checkpoint)} - var sigs [][]byte + cpset := make(map[uint64][]byte) for _, r := range replicas { cp := s.cur.checkpoint[r] - cpset.CheckpointSet[r] = cp - sigs = append(sigs, cp.Signature) + cpset[r] = cp.Signature } s.cur.checkpointDone = true @@ -93,11 +91,10 @@ func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) { } // ignore null requests - if s.cur.preprep.Batch != nil { - batch := *s.cur.preprep.Batch - batch.Signatures = sigs - s.sys.Deliver(&batch) - } + batch := *s.cur.preprep.Batch + batch.Signatures = cpset + s.sys.Deliver(&batch) + s.cur.timeout.Cancel() log.Infof("request %s %s completed on %d", s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id) diff --git a/consensus/simplebft/commit.go b/consensus/simplebft/commit.go index 65470b65ad8..eb448f7901c 100644 --- a/consensus/simplebft/commit.go +++ b/consensus/simplebft/commit.go @@ -39,7 +39,7 @@ func (s *SBFT) handleCommit(c *Subject, src uint64) { } if !reflect.DeepEqual(c, &s.cur.subject) { - log.Infof("commit does not match expected subject %v, got %v", &s.cur.subject, c) + log.Warningf("commit does not match expected subject %v, got %v", &s.cur.subject, c) return } if _, ok := s.cur.commit[src]; ok { diff --git a/consensus/simplebft/connection.go b/consensus/simplebft/connection.go new file mode 100644 index 00000000000..a6985816379 --- /dev/null +++ b/consensus/simplebft/connection.go @@ -0,0 +1,42 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +// Connection is an event from system to notify a new connection with +// replica. +// On connection, we send our latest (weak) checkpoint, and we expect +// to receive one from replica. +func (s *SBFT) Connection(replica uint64) { + batch := *s.sys.LastBatch() + batch.Payloads = nil // don't send the big payload + s.sys.Send(&Msg{&Msg_Hello{&batch}}, replica) + + // TODO + // + // A reconnecting replica can play forward its blockchain to + // the batch listed in the hello message. However, the + // currently in-flight batch will not be reflected in the + // Hello message, nor will all messages be present to actually + // commit the in-flight batch at the reconnecting replica. + // + // Therefore we also send the most recent (pre)prepare, + // commit, checkpoint so that the reconnecting replica can + // catch up on the in-flight batch. +} + +func (s *SBFT) handleHello(h *Batch, src uint64) { +} diff --git a/consensus/simplebft/crypto.go b/consensus/simplebft/crypto.go index ca172b44346..574e133f267 100644 --- a/consensus/simplebft/crypto.go +++ b/consensus/simplebft/crypto.go @@ -61,11 +61,10 @@ func merkleHashDigests(digests [][]byte) []byte { digests = nextDigests } - if len(digests) > 0 { - return digests[0] - } else { + if len(digests) == 0 { return nil } + return digests[0] } //////////////////////////////////////////////// diff --git a/consensus/simplebft/newview.go b/consensus/simplebft/newview.go index 06cd1b1d5dc..857323b9b76 100644 --- a/consensus/simplebft/newview.go +++ b/consensus/simplebft/newview.go @@ -68,12 +68,12 @@ func (s *SBFT) maybeSendNewView() { } func (s *SBFT) handleNewView(nv *NewView, src uint64) { - if src != s.primaryIdView(nv.View) { + if src != s.primaryIDView(nv.View) { log.Warningf("invalid new view from %d for %d", src, nv.View) return } - if onv, ok := s.newview[s.primaryIdView(nv.View)]; ok && onv.View >= nv.View { + if onv, ok := s.newview[s.primaryIDView(nv.View)]; ok && onv.View >= nv.View { log.Debugf("discarding duplicate new view for %d", nv.View) return } @@ -128,7 +128,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) { return } - s.newview[s.primaryIdView(nv.View)] = nv + s.newview[s.primaryIDView(nv.View)] = nv s.processNewView() } @@ -138,7 +138,7 @@ func (s *SBFT) processNewView() { return } - nv, ok := s.newview[s.primaryIdView(s.seq.View)] + nv, ok := s.newview[s.primaryIDView(s.seq.View)] if !ok || nv.View != s.seq.View { return } diff --git a/consensus/simplebft/newview_test.go b/consensus/simplebft/newview_test.go index a312b0f4c3e..34b5b421070 100644 --- a/consensus/simplebft/newview_test.go +++ b/consensus/simplebft/newview_test.go @@ -22,26 +22,26 @@ import ( ) func TestXsetNoByz(t *testing.T) { - s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}} + s := &SBFT{config: Config{N: 4, F: 1}, seq: SeqView{3, 1}} vcs := []*ViewChange{ &ViewChange{ View: 3, Pset: nil, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, - &Subject{&Seq{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}, + &Subject{&SeqView{2, 2}, []byte("val2")}}, Executed: 1, }, &ViewChange{ View: 3, - Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Pset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, Executed: 1, }, &ViewChange{ View: 3, - Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}}, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, - &Subject{&Seq{2, 2}, []byte("val2")}}, + Pset: []*Subject{&Subject{&SeqView{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}, + &Subject{&SeqView{2, 2}, []byte("val2")}}, Executed: 1, }, } @@ -51,13 +51,13 @@ func TestXsetNoByz(t *testing.T) { t.Fatal("no xset") } - if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val2")}) { + if !reflect.DeepEqual(xset, &Subject{&SeqView{3, 2}, []byte("val2")}) { t.Error(xset) } } func TestXsetByz0(t *testing.T) { - s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}} + s := &SBFT{config: Config{N: 4, F: 1}, seq: SeqView{3, 1}} vcs := []*ViewChange{ &ViewChange{ View: 3, @@ -67,15 +67,15 @@ func TestXsetByz0(t *testing.T) { }, &ViewChange{ View: 3, - Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Pset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, Executed: 1, }, &ViewChange{ View: 3, - Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}}, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, - &Subject{&Seq{2, 2}, []byte("val2")}}, + Pset: []*Subject{&Subject{&SeqView{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}, + &Subject{&SeqView{2, 2}, []byte("val2")}}, Executed: 1, }, } @@ -87,9 +87,9 @@ func TestXsetByz0(t *testing.T) { vcs = append(vcs, &ViewChange{ View: 3, - Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}}, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, - &Subject{&Seq{2, 2}, []byte("val2")}}, + Pset: []*Subject{&Subject{&SeqView{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}, + &Subject{&SeqView{2, 2}, []byte("val2")}}, Executed: 2, }) @@ -97,31 +97,31 @@ func TestXsetByz0(t *testing.T) { if !ok { t.Error("no xset") } - if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val2")}) { + if !reflect.DeepEqual(xset, &Subject{&SeqView{3, 2}, []byte("val2")}) { t.Error(xset) } } func TestXsetByz2(t *testing.T) { - s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}} + s := &SBFT{config: Config{N: 4, F: 1}, seq: SeqView{3, 1}} vcs := []*ViewChange{ &ViewChange{ View: 3, Pset: nil, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, Executed: 1, }, &ViewChange{ View: 3, - Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Pset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, Executed: 1, }, &ViewChange{ View: 3, - Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}}, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, - &Subject{&Seq{2, 2}, []byte("val2")}}, + Pset: []*Subject{&Subject{&SeqView{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}, + &Subject{&SeqView{2, 2}, []byte("val2")}}, Executed: 1, }, } @@ -133,8 +133,8 @@ func TestXsetByz2(t *testing.T) { vcs = append(vcs, &ViewChange{ View: 3, - Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, - Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Pset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&SeqView{1, 2}, []byte("val1")}}, Executed: 2, }) @@ -142,7 +142,7 @@ func TestXsetByz2(t *testing.T) { if !ok { t.Error("no xset") } - if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val1")}) { + if !reflect.DeepEqual(xset, &Subject{&SeqView{3, 2}, []byte("val1")}) { t.Error(xset) } } diff --git a/consensus/simplebft/preprepare.go b/consensus/simplebft/preprepare.go index 921c2891bc9..544185573d2 100644 --- a/consensus/simplebft/preprepare.go +++ b/consensus/simplebft/preprepare.go @@ -41,7 +41,7 @@ func (s *SBFT) sendPreprepare(batch []*Request) { } func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) { - if src != s.primaryId() { + if src != s.primaryID() { log.Infof("preprepare from non-primary %d", src) return } diff --git a/consensus/simplebft/request.go b/consensus/simplebft/request.go index 776fc6a4fdc..c1a269409cd 100644 --- a/consensus/simplebft/request.go +++ b/consensus/simplebft/request.go @@ -18,6 +18,7 @@ package simplebft import "time" +// Request proposes a new request to the BFT network. func (s *SBFT) Request(req []byte) { s.broadcast(&Msg{&Msg_Request{&Request{req}}}) } diff --git a/consensus/simplebft/simplebft.go b/consensus/simplebft/simplebft.go index d4f01ada4e3..b52561306cd 100644 --- a/consensus/simplebft/simplebft.go +++ b/consensus/simplebft/simplebft.go @@ -25,10 +25,14 @@ import ( "github.com/op/go-logging" ) +// Receiver defines the API that is exposed by SBFT to the system. type Receiver interface { Receive(msg *Msg, src uint64) + Request(req []byte) + Connection(replica uint64) } +// System defines the API that needs to be provided for SBFT. type System interface { Send(msg *Msg, dest uint64) Timer(d time.Duration, f func()) Canceller @@ -41,16 +45,18 @@ type System interface { CheckSig(data []byte, src uint64, sig []byte) error } +// Canceller allows cancelling of a scheduled timer event. type Canceller interface { Cancel() } +// SBFT is a simplified PBFT implementation. type SBFT struct { sys System config Config id uint64 - seq Seq + seq SeqView batch []*Request batchTimer Canceller cur reqInfo @@ -87,6 +93,7 @@ type dummyCanceller struct{} func (d dummyCanceller) Cancel() {} +// New creates a new SBFT instance. func New(id uint64, config *Config, sys System) (*SBFT, error) { if config.F*3+1 > config.N { return nil, fmt.Errorf("invalid combination of N and F") @@ -123,7 +130,7 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) { if pp.Seq.Seq > bh.Seq { s.seq = *pp.Seq s.seq.Seq -= 1 - s.handlePreprepare(pp, s.primaryIdView(pp.Seq.View)) + s.handlePreprepare(pp, s.primaryIDView(pp.Seq.View)) } } c := &Subject{} @@ -146,19 +153,19 @@ func New(id uint64, config *Config, sys System) (*SBFT, error) { //////////////////////////////////////////////// -func (s *SBFT) primaryIdView(v uint64) uint64 { +func (s *SBFT) primaryIDView(v uint64) uint64 { return v % s.config.N } -func (s *SBFT) primaryId() uint64 { - return s.primaryIdView(s.seq.View) +func (s *SBFT) primaryID() uint64 { + return s.primaryIDView(s.seq.View) } func (s *SBFT) isPrimary() bool { - return s.primaryId() == s.id + return s.primaryID() == s.id } -func (s *SBFT) nextSeq() Seq { +func (s *SBFT) nextSeq() SeqView { seq := s.seq seq.Seq += 1 return seq @@ -184,6 +191,7 @@ func (s *SBFT) broadcast(m *Msg) { //////////////////////////////////////////////// +// Receive is the ingress method for SBFT messages. func (s *SBFT) Receive(m *Msg, src uint64) { log.Debugf("received message from %d: %s", src, m) @@ -199,9 +207,6 @@ func (s *SBFT) Receive(m *Msg, src uint64) { } else if vs := m.GetViewChange(); vs != nil { s.handleViewChange(vs, src) return - } else if c := m.GetCheckpoint(); c != nil { - s.handleCheckpoint(c, src) - return } else if nv := m.GetNewView(); nv != nil { s.handleNewView(nv, src) return @@ -217,7 +222,10 @@ func (s *SBFT) Receive(m *Msg, src uint64) { } func (s *SBFT) handleQueueableMessage(m *Msg, src uint64) { - if pp := m.GetPreprepare(); pp != nil { + if h := m.GetHello(); h != nil { + s.handleHello(h, src) + return + } else if pp := m.GetPreprepare(); pp != nil { s.handlePreprepare(pp, src) return } else if p := m.GetPrepare(); p != nil { @@ -226,6 +234,9 @@ func (s *SBFT) handleQueueableMessage(m *Msg, src uint64) { } else if c := m.GetCommit(); c != nil { s.handleCommit(c, src) return + } else if c := m.GetCheckpoint(); c != nil { + s.handleCheckpoint(c, src) + return } log.Warningf("received invalid message from %d", src) diff --git a/consensus/simplebft/simplebft.pb.go b/consensus/simplebft/simplebft.pb.go index 62b8c1040e2..0c863a8b3e4 100644 --- a/consensus/simplebft/simplebft.pb.go +++ b/consensus/simplebft/simplebft.pb.go @@ -12,9 +12,7 @@ It has these top-level messages: Config Msg Request - FetchRequest - QueryState - Seq + SeqView BatchHeader Batch Preprepare @@ -23,7 +21,6 @@ It has these top-level messages: Signed NewView Checkpoint - CheckpointSet */ package simplebft @@ -51,13 +48,13 @@ func (*Config) ProtoMessage() {} type Msg struct { // Types that are valid to be assigned to Type: // *Msg_Request - // *Msg_FetchRequest // *Msg_Preprepare // *Msg_Prepare // *Msg_Commit // *Msg_ViewChange // *Msg_NewView // *Msg_Checkpoint + // *Msg_Hello Type isMsg_Type `protobuf_oneof:"type"` } @@ -72,36 +69,36 @@ type isMsg_Type interface { type Msg_Request struct { Request *Request `protobuf:"bytes,1,opt,name=request,oneof"` } -type Msg_FetchRequest struct { - FetchRequest *FetchRequest `protobuf:"bytes,2,opt,name=fetch_request,oneof"` -} type Msg_Preprepare struct { - Preprepare *Preprepare `protobuf:"bytes,3,opt,name=preprepare,oneof"` + Preprepare *Preprepare `protobuf:"bytes,2,opt,name=preprepare,oneof"` } type Msg_Prepare struct { - Prepare *Subject `protobuf:"bytes,4,opt,name=prepare,oneof"` + Prepare *Subject `protobuf:"bytes,3,opt,name=prepare,oneof"` } type Msg_Commit struct { - Commit *Subject `protobuf:"bytes,5,opt,name=commit,oneof"` + Commit *Subject `protobuf:"bytes,4,opt,name=commit,oneof"` } type Msg_ViewChange struct { - ViewChange *Signed `protobuf:"bytes,6,opt,name=view_change,oneof"` + ViewChange *Signed `protobuf:"bytes,5,opt,name=view_change,oneof"` } type Msg_NewView struct { - NewView *NewView `protobuf:"bytes,7,opt,name=new_view,oneof"` + NewView *NewView `protobuf:"bytes,6,opt,name=new_view,oneof"` } type Msg_Checkpoint struct { - Checkpoint *Checkpoint `protobuf:"bytes,8,opt,name=checkpoint,oneof"` + Checkpoint *Checkpoint `protobuf:"bytes,7,opt,name=checkpoint,oneof"` +} +type Msg_Hello struct { + Hello *Batch `protobuf:"bytes,8,opt,name=hello,oneof"` } -func (*Msg_Request) isMsg_Type() {} -func (*Msg_FetchRequest) isMsg_Type() {} -func (*Msg_Preprepare) isMsg_Type() {} -func (*Msg_Prepare) isMsg_Type() {} -func (*Msg_Commit) isMsg_Type() {} -func (*Msg_ViewChange) isMsg_Type() {} -func (*Msg_NewView) isMsg_Type() {} -func (*Msg_Checkpoint) isMsg_Type() {} +func (*Msg_Request) isMsg_Type() {} +func (*Msg_Preprepare) isMsg_Type() {} +func (*Msg_Prepare) isMsg_Type() {} +func (*Msg_Commit) isMsg_Type() {} +func (*Msg_ViewChange) isMsg_Type() {} +func (*Msg_NewView) isMsg_Type() {} +func (*Msg_Checkpoint) isMsg_Type() {} +func (*Msg_Hello) isMsg_Type() {} func (m *Msg) GetType() isMsg_Type { if m != nil { @@ -117,13 +114,6 @@ func (m *Msg) GetRequest() *Request { return nil } -func (m *Msg) GetFetchRequest() *FetchRequest { - if x, ok := m.GetType().(*Msg_FetchRequest); ok { - return x.FetchRequest - } - return nil -} - func (m *Msg) GetPreprepare() *Preprepare { if x, ok := m.GetType().(*Msg_Preprepare); ok { return x.Preprepare @@ -166,17 +156,24 @@ func (m *Msg) GetCheckpoint() *Checkpoint { return nil } +func (m *Msg) GetHello() *Batch { + if x, ok := m.GetType().(*Msg_Hello); ok { + return x.Hello + } + return nil +} + // XXX_OneofFuncs is for the internal use of the proto package. func (*Msg) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), []interface{}) { return _Msg_OneofMarshaler, _Msg_OneofUnmarshaler, []interface{}{ (*Msg_Request)(nil), - (*Msg_FetchRequest)(nil), (*Msg_Preprepare)(nil), (*Msg_Prepare)(nil), (*Msg_Commit)(nil), (*Msg_ViewChange)(nil), (*Msg_NewView)(nil), (*Msg_Checkpoint)(nil), + (*Msg_Hello)(nil), } } @@ -189,41 +186,41 @@ func _Msg_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { if err := b.EncodeMessage(x.Request); err != nil { return err } - case *Msg_FetchRequest: - b.EncodeVarint(2<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.FetchRequest); err != nil { - return err - } case *Msg_Preprepare: - b.EncodeVarint(3<<3 | proto.WireBytes) + b.EncodeVarint(2<<3 | proto.WireBytes) if err := b.EncodeMessage(x.Preprepare); err != nil { return err } case *Msg_Prepare: - b.EncodeVarint(4<<3 | proto.WireBytes) + b.EncodeVarint(3<<3 | proto.WireBytes) if err := b.EncodeMessage(x.Prepare); err != nil { return err } case *Msg_Commit: - b.EncodeVarint(5<<3 | proto.WireBytes) + b.EncodeVarint(4<<3 | proto.WireBytes) if err := b.EncodeMessage(x.Commit); err != nil { return err } case *Msg_ViewChange: - b.EncodeVarint(6<<3 | proto.WireBytes) + b.EncodeVarint(5<<3 | proto.WireBytes) if err := b.EncodeMessage(x.ViewChange); err != nil { return err } case *Msg_NewView: - b.EncodeVarint(7<<3 | proto.WireBytes) + b.EncodeVarint(6<<3 | proto.WireBytes) if err := b.EncodeMessage(x.NewView); err != nil { return err } case *Msg_Checkpoint: - b.EncodeVarint(8<<3 | proto.WireBytes) + b.EncodeVarint(7<<3 | proto.WireBytes) if err := b.EncodeMessage(x.Checkpoint); err != nil { return err } + case *Msg_Hello: + b.EncodeVarint(8<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Hello); err != nil { + return err + } case nil: default: return fmt.Errorf("Msg.Type has unexpected type %T", x) @@ -242,15 +239,7 @@ func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (b err := b.DecodeMessage(msg) m.Type = &Msg_Request{msg} return true, err - case 2: // type.fetch_request - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(FetchRequest) - err := b.DecodeMessage(msg) - m.Type = &Msg_FetchRequest{msg} - return true, err - case 3: // type.preprepare + case 2: // type.preprepare if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -258,7 +247,7 @@ func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (b err := b.DecodeMessage(msg) m.Type = &Msg_Preprepare{msg} return true, err - case 4: // type.prepare + case 3: // type.prepare if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -266,7 +255,7 @@ func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (b err := b.DecodeMessage(msg) m.Type = &Msg_Prepare{msg} return true, err - case 5: // type.commit + case 4: // type.commit if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -274,7 +263,7 @@ func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (b err := b.DecodeMessage(msg) m.Type = &Msg_Commit{msg} return true, err - case 6: // type.view_change + case 5: // type.view_change if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -282,7 +271,7 @@ func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (b err := b.DecodeMessage(msg) m.Type = &Msg_ViewChange{msg} return true, err - case 7: // type.new_view + case 6: // type.new_view if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -290,7 +279,7 @@ func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (b err := b.DecodeMessage(msg) m.Type = &Msg_NewView{msg} return true, err - case 8: // type.checkpoint + case 7: // type.checkpoint if wire != proto.WireBytes { return true, proto.ErrInternalBadWireType } @@ -298,6 +287,14 @@ func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (b err := b.DecodeMessage(msg) m.Type = &Msg_Checkpoint{msg} return true, err + case 8: // type.hello + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Batch) + err := b.DecodeMessage(msg) + m.Type = &Msg_Hello{msg} + return true, err default: return false, nil } @@ -311,29 +308,14 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} -type FetchRequest struct { - Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"` -} - -func (m *FetchRequest) Reset() { *m = FetchRequest{} } -func (m *FetchRequest) String() string { return proto.CompactTextString(m) } -func (*FetchRequest) ProtoMessage() {} - -type QueryState struct { -} - -func (m *QueryState) Reset() { *m = QueryState{} } -func (m *QueryState) String() string { return proto.CompactTextString(m) } -func (*QueryState) ProtoMessage() {} - -type Seq struct { +type SeqView struct { View uint64 `protobuf:"varint,1,opt,name=view" json:"view,omitempty"` Seq uint64 `protobuf:"varint,2,opt,name=seq" json:"seq,omitempty"` } -func (m *Seq) Reset() { *m = Seq{} } -func (m *Seq) String() string { return proto.CompactTextString(m) } -func (*Seq) ProtoMessage() {} +func (m *SeqView) Reset() { *m = SeqView{} } +func (m *SeqView) String() string { return proto.CompactTextString(m) } +func (*SeqView) ProtoMessage() {} type BatchHeader struct { Seq uint64 `protobuf:"varint,1,opt,name=seq" json:"seq,omitempty"` @@ -346,25 +328,32 @@ func (m *BatchHeader) String() string { return proto.CompactTextString(m) } func (*BatchHeader) ProtoMessage() {} type Batch struct { - Header []byte `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` - Payloads [][]byte `protobuf:"bytes,2,rep,name=payloads,proto3" json:"payloads,omitempty"` - Signatures [][]byte `protobuf:"bytes,3,rep,name=signatures,proto3" json:"signatures,omitempty"` + Header []byte `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` + Payloads [][]byte `protobuf:"bytes,2,rep,name=payloads,proto3" json:"payloads,omitempty"` + Signatures map[uint64][]byte `protobuf:"bytes,3,rep,name=signatures" json:"signatures,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (m *Batch) Reset() { *m = Batch{} } func (m *Batch) String() string { return proto.CompactTextString(m) } func (*Batch) ProtoMessage() {} +func (m *Batch) GetSignatures() map[uint64][]byte { + if m != nil { + return m.Signatures + } + return nil +} + type Preprepare struct { - Seq *Seq `protobuf:"bytes,1,opt,name=seq" json:"seq,omitempty"` - Batch *Batch `protobuf:"bytes,2,opt,name=batch" json:"batch,omitempty"` + Seq *SeqView `protobuf:"bytes,1,opt,name=seq" json:"seq,omitempty"` + Batch *Batch `protobuf:"bytes,2,opt,name=batch" json:"batch,omitempty"` } func (m *Preprepare) Reset() { *m = Preprepare{} } func (m *Preprepare) String() string { return proto.CompactTextString(m) } func (*Preprepare) ProtoMessage() {} -func (m *Preprepare) GetSeq() *Seq { +func (m *Preprepare) GetSeq() *SeqView { if m != nil { return m.Seq } @@ -379,15 +368,15 @@ func (m *Preprepare) GetBatch() *Batch { } type Subject struct { - Seq *Seq `protobuf:"bytes,1,opt,name=seq" json:"seq,omitempty"` - Digest []byte `protobuf:"bytes,2,opt,name=digest,proto3" json:"digest,omitempty"` + Seq *SeqView `protobuf:"bytes,1,opt,name=seq" json:"seq,omitempty"` + Digest []byte `protobuf:"bytes,2,opt,name=digest,proto3" json:"digest,omitempty"` } func (m *Subject) Reset() { *m = Subject{} } func (m *Subject) String() string { return proto.CompactTextString(m) } func (*Subject) ProtoMessage() {} -func (m *Subject) GetSeq() *Seq { +func (m *Subject) GetSeq() *SeqView { if m != nil { return m.Seq } @@ -469,18 +458,3 @@ type Checkpoint struct { func (m *Checkpoint) Reset() { *m = Checkpoint{} } func (m *Checkpoint) String() string { return proto.CompactTextString(m) } func (*Checkpoint) ProtoMessage() {} - -type CheckpointSet struct { - CheckpointSet map[uint64]*Checkpoint `protobuf:"bytes,1,rep,name=checkpoint_set" json:"checkpoint_set,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` -} - -func (m *CheckpointSet) Reset() { *m = CheckpointSet{} } -func (m *CheckpointSet) String() string { return proto.CompactTextString(m) } -func (*CheckpointSet) ProtoMessage() {} - -func (m *CheckpointSet) GetCheckpointSet() map[uint64]*Checkpoint { - if m != nil { - return m.CheckpointSet - } - return nil -} diff --git a/consensus/simplebft/simplebft.proto b/consensus/simplebft/simplebft.proto index c7228335f81..b5b8370e985 100644 --- a/consensus/simplebft/simplebft.proto +++ b/consensus/simplebft/simplebft.proto @@ -18,7 +18,7 @@ syntax = "proto3"; package simplebft; -message config { +message Config { uint64 n = 1; uint64 f = 2; uint64 batch_duration_nsec = 3; @@ -26,82 +26,71 @@ message config { uint64 request_timeout_nsec = 5; }; -message msg { +message Msg { oneof type { - request request = 1; - fetch_request fetch_request = 2; - preprepare preprepare = 3; - subject prepare = 4; - subject commit = 5; - signed view_change = 6; - new_view new_view = 7; - checkpoint checkpoint = 8; + Request request = 1; + Preprepare preprepare = 2; + Subject prepare = 3; + Subject commit = 4; + Signed view_change = 5; + NewView new_view = 6; + Checkpoint checkpoint = 7; + Batch hello = 8; }; }; -message request { +message Request { bytes payload = 1; }; -message fetch_request { - bytes digest = 1; -}; - -message query_state { -}; - -message seq { +message SeqView { uint64 view = 1; uint64 seq = 2; }; -message batch_header { +message BatchHeader { uint64 seq = 1; bytes prev_hash = 2; bytes data_hash = 3; }; -message batch { +message Batch { bytes header = 1; repeated bytes payloads = 2; - repeated bytes signatures = 3; + map signatures = 3; }; -message preprepare { - seq seq = 1; - batch batch = 2; +message Preprepare { + SeqView seq = 1; + Batch batch = 2; }; -message subject { - seq seq = 1; +message Subject { + SeqView seq = 1; bytes digest = 2; }; -message view_change { +message ViewChange { uint64 view = 1; - repeated subject pset = 2; - repeated subject qset = 3; + repeated Subject pset = 2; + repeated Subject qset = 3; uint64 executed = 4; }; -message signed { +message Signed { bytes data = 1; bytes signature = 2; }; -message new_view { +message NewView { uint64 view = 1; - map vset = 2; - subject xset = 3; - batch batch = 4; + map vset = 2; + Subject xset = 3; + Batch batch = 4; }; -message checkpoint { +message Checkpoint { uint64 seq = 1; bytes digest = 2; bytes signature = 3; }; - -message checkpoint_set { - map checkpoint_set = 1; -}; diff --git a/consensus/simplebft/simplebft_test.go b/consensus/simplebft/simplebft_test.go index be7d9531628..c509f54327d 100644 --- a/consensus/simplebft/simplebft_test.go +++ b/consensus/simplebft/simplebft_test.go @@ -33,6 +33,17 @@ func init() { logging.SetLevel(logging.DEBUG, "sbft") } +func connectAll(sys *testSystem) { + for _, a := range sys.adapters { + for _, b := range sys.adapters { + if a.id != b.id { + a.receiver.Connection(b.id) + } + } + } + sys.Run() +} + func TestSBFT(t *testing.T) { N := uint64(4) sys := newTestSystem(N) @@ -47,6 +58,7 @@ func TestSBFT(t *testing.T) { repls = append(repls, s) adapters = append(adapters, a) } + connectAll(sys) r1 := []byte{1, 2, 3} repls[0].Request(r1) sys.Run() @@ -89,6 +101,7 @@ func TestSBFTDelayed(t *testing.T) { adapters[3].arrivals[i] = 200 * time.Millisecond } + connectAll(sys) r1 := []byte{1, 2, 3} r2 := []byte{3, 1, 2} repls[0].Request(r1) @@ -122,6 +135,7 @@ func TestN1(t *testing.T) { repls = append(repls, s) adapters = append(adapters, a) } + connectAll(sys) r1 := []byte{1, 2, 3} repls[0].Request(r1) sys.Run() @@ -173,6 +187,7 @@ func TestByzPrimary(t *testing.T) { return e, true } + connectAll(sys) repls[0].Request(r1) sys.Run() for _, a := range adapters { @@ -210,6 +225,7 @@ func TestViewChange(t *testing.T) { return e, true } + connectAll(sys) r1 := []byte{1, 2, 3} repls[0].Request(r1) sys.Run() @@ -272,6 +288,7 @@ func TestViewChangeXset(t *testing.T) { return e, true } + connectAll(sys) r1 := []byte{1, 2, 3} repls[0].Request(r1) sys.Run() @@ -313,6 +330,7 @@ func TestRestart(t *testing.T) { adapters = append(adapters, a) } + connectAll(sys) // move to view 1 for _, r := range repls { r.sendViewChange() @@ -324,6 +342,12 @@ func TestRestart(t *testing.T) { testLog.Notice("restarting 0") repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + for _, a := range sys.adapters { + if a.id != 0 { + a.receiver.Connection(0) + adapters[0].receiver.Connection(a.id) + } + } r2 := []byte{3, 1, 2} r3 := []byte{3, 5, 2} @@ -370,12 +394,19 @@ func TestRestartAfterPrepare(t *testing.T) { if p := msg.msg.GetPrepare(); p != nil && p.Seq.Seq == 3 && !restarted { restarted = true repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + for _, a := range sys.adapters { + if a.id != 0 { + a.receiver.Connection(0) + adapters[0].receiver.Connection(a.id) + } + } } } return e, true } + connectAll(sys) // move to view 1 for _, r := range repls { r.sendViewChange() @@ -431,12 +462,19 @@ func TestRestartAfterCommit(t *testing.T) { restarted = true testLog.Notice("restarting 0") repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + for _, a := range sys.adapters { + if a.id != 0 { + a.receiver.Connection(0) + adapters[0].receiver.Connection(a.id) + } + } } } return e, true } + connectAll(sys) // move to view 1 for _, r := range repls { r.sendViewChange() @@ -492,12 +530,19 @@ func TestRestartAfterCheckpoint(t *testing.T) { restarted = true testLog.Notice("restarting 0") repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + for _, a := range sys.adapters { + if a.id != 0 { + a.receiver.Connection(0) + adapters[0].receiver.Connection(a.id) + } + } } } return e, true } + connectAll(sys) // move to view 1 for _, r := range repls { r.sendViewChange() diff --git a/consensus/simplebft/xset.go b/consensus/simplebft/xset.go index 2cb94a5a1b9..5d8441c68c5 100644 --- a/consensus/simplebft/xset.go +++ b/consensus/simplebft/xset.go @@ -128,7 +128,7 @@ nextm: log.Debugf("selecting %d with %x", next, mtuple.Digest) xset = &Subject{ - Seq: &Seq{Seq: next, View: s.seq.View}, + Seq: &SeqView{Seq: next, View: s.seq.View}, Digest: mtuple.Digest, } break nextm @@ -141,7 +141,7 @@ nextm: if emptycount >= s.noFaultyQuorum() { log.Debugf("selecting null request for %d", next) xset = &Subject{ - Seq: &Seq{Seq: next, View: s.seq.View}, + Seq: &SeqView{Seq: next, View: s.seq.View}, Digest: nil, } }