From 52c8407ed9da43767561a9a6356ad11cddef6c42 Mon Sep 17 00:00:00 2001 From: Simon Schubert Date: Tue, 27 Sep 2016 15:46:17 +0200 Subject: [PATCH] Basic implementation of simple BFT Simple BFT is a variant of Castro's PBFT, with as little parallelism as possible. This package also defines a deterministic system interface and a deterministic event based testing framework. Squashed commit of the following: commit f7b20b2f0a65a59e91d332e13e5dd2016264b7d1 Author: Simon Schubert Date: Tue Sep 27 15:33:05 2016 +0200 checkpoint: fix comment Signed-off-by: Simon Schubert commit 3f370c36f61daf0e5845d8e82dacb43ea546377f Author: Simon Schubert Date: Mon Sep 26 15:02:22 2016 +0200 add performance benchmark for rsa Signed-off-by: Simon Schubert commit b247320d7448dc4c6a7985c1b08c011a1eb4d260 Author: Simon Schubert Date: Fri Sep 23 15:02:11 2016 +0200 turn null request into empty batch Signed-off-by: Simon Schubert commit 830ed7b080dc04c2b31c079b36f7a87c4b98d4ae Author: Simon Schubert Date: Fri Sep 23 13:10:51 2016 +0200 on restart, remove future events to old instance Signed-off-by: Simon Schubert commit 2fbbe1d6d25a72bd9a184c0badf66a46df574857 Author: Simon Schubert Date: Thu Sep 22 18:06:28 2016 +0200 test for various restart situations Signed-off-by: Simon Schubert commit 5df10ae03eb861ed68238aa8bafc3b2e0147fa82 Author: Simon Schubert Date: Wed Sep 21 17:00:18 2016 +0200 work off existing chain state Signed-off-by: Simon Schubert commit 1ba6905539fc94411273608a72c689ca6c0afb91 Author: Simon Schubert Date: Wed Sep 21 17:00:07 2016 +0200 add crypto test Signed-off-by: Simon Schubert commit 75cea80716ff56216a418cb65fb70431f9793c09 Author: Simon Schubert Date: Wed Sep 21 16:32:13 2016 +0200 remove timerFunc Spotted-by: Gabor Hosszu Signed-off-by: Simon Schubert commit b5a71f9f98cb7862c1d725639490f8c8ac8912a4 Author: Simon Schubert Date: Wed Sep 21 16:31:14 2016 +0200 add ecdsa benchmark Signed-off-by: Simon Schubert commit d063c4487b4cd6f008480bdee0ef8280775b021f Author: Simon Schubert Date: Wed Sep 21 14:50:44 2016 +0200 hash is no method Signed-off-by: Simon Schubert commit d6988a2b22b02adaa64351482d17c8a5849eec9b Author: Simon Schubert Date: Wed Sep 21 12:57:34 2016 +0200 transmit batch instead of separate items, use merkle tree hash Signed-off-by: Simon Schubert commit a64d3875399b935df3bbbdfdafc4c9e736efc099 Author: Simon Schubert Date: Wed Sep 21 11:52:55 2016 +0200 deliver takes batch Signed-off-by: Simon Schubert commit 8faee7e4bde2471b6f88d50d09fa86f944b4fdfc Author: Simon Schubert Date: Tue Sep 20 18:07:48 2016 +0200 sign over batch Signed-off-by: Simon Schubert commit 865966071d3726f54e46d76d35b1abcd9a725d84 Author: Simon Schubert Date: Tue Sep 20 13:32:09 2016 +0200 resend messages when coming back up Signed-off-by: Simon Schubert commit f22d84eed8cc5f0441dd59ee46539ebcd9c97a5d Author: Simon Schubert Date: Tue Sep 20 12:49:06 2016 +0200 deliver on checkpoint Signed-off-by: Simon Schubert commit 766ee7131cd5a3423258d267a57cbe496c3419c2 Author: Simon Schubert Date: Mon Sep 19 15:42:05 2016 +0200 implement signatures in system Signed-off-by: Simon Schubert commit 4826f7c00c81ccb94582aaa83b69aa0a0b988e5c Author: Simon Schubert Date: Fri Sep 16 16:34:36 2016 +0200 convert checkpoint to signed Signed-off-by: Simon Schubert commit 36c96039d059bd94592aa7741091f288457d5a64 Author: Simon Schubert Date: Fri Sep 16 16:12:42 2016 +0200 start adding persistence Signed-off-by: Simon Schubert commit cb4ce421a3194d7799bba703d062fe06807aab8f Author: Simon Schubert Date: Thu Sep 8 11:37:35 2016 +0200 make sure we don't get stuck in backlog Signed-off-by: Simon Schubert commit ad425301d7a4f926ec91822e5b6f463e107be071 Author: Simon Schubert Date: Wed Sep 7 16:16:25 2016 +0200 add comment about DoS Signed-off-by: Simon Schubert commit c76035a6d2b75f348be26333180f31d5496d55b3 Author: Simon Schubert Date: Wed Sep 7 15:16:46 2016 +0200 keep backlog of messages referring to the future Signed-off-by: Simon Schubert commit 15c6d7260227df829fe84854faefca6b50483cdd Author: Simon Schubert Date: Wed Sep 7 13:47:21 2016 +0200 send next batch only after previous round has finished Signed-off-by: Simon Schubert commit fde05a7aca7a710ab4e1ff02b4d108c9a5794318 Author: Simon Schubert Date: Tue Sep 6 10:46:21 2016 +0200 add network viewchange test Signed-off-by: Simon Schubert commit 3883f44bff742df0ff34ef79fd4183ca81d1db5e Author: Simon Schubert Date: Tue Sep 6 10:46:04 2016 +0200 add/fix status logging Signed-off-by: Simon Schubert commit 99775383247bec76c4d3e700c1b6b8f55ab57614 Author: Simon Schubert Date: Mon Sep 5 10:48:04 2016 +0200 newview progress Signed-off-by: Simon Schubert commit 7ecdf27fb89f14feb166726034b32e91a4132ff6 Author: Simon Schubert Date: Mon Sep 5 10:37:48 2016 +0200 move benchmarks to separate file Signed-off-by: Simon Schubert commit e9f5d6be14a739de154c9454efff4d8a4954f4a7 Author: Simon Schubert Date: Mon Sep 5 10:34:50 2016 +0200 switch hash result to slice Signed-off-by: Simon Schubert commit 1063637861c24e50dc9017f1cf5e50f0b542c68a Author: Simon Schubert Date: Fri Sep 2 16:09:45 2016 +0200 properly reset view change timer Signed-off-by: Simon Schubert commit 927059b467fc51bc5679675e76e411779c1dc584 Author: Simon Schubert Date: Fri Sep 2 15:57:29 2016 +0200 reduce logging Signed-off-by: Simon Schubert commit 884b81998643d2e4310010bb0c404051a06c20f9 Author: Simon Schubert Date: Fri Sep 2 15:47:28 2016 +0200 add view change timeout Signed-off-by: Simon Schubert commit 2e9084b6ef86b8a2bccbfe0017a8970a6c5bdc63 Author: Simon Schubert Date: Fri Sep 2 15:35:11 2016 +0200 more new view work Signed-off-by: Simon Schubert commit cd1e2df54ce4a1739d461963ced2b950b2f5ee00 Author: Simon Schubert Date: Fri Sep 2 11:10:42 2016 +0200 start newview Signed-off-by: Simon Schubert commit 6f3ef498a37baf2b52ccf0ca929f6040ea100b52 Author: Simon Schubert Date: Tue Aug 23 16:55:36 2016 +0200 implement calendar queue for simulation Signed-off-by: Simon Schubert commit df7b4ba25d00aacce677292a0452abdc64b54d8c Author: Simon Schubert Date: Tue Aug 23 11:51:23 2016 +0200 more newview, large network benchmark Signed-off-by: Simon Schubert commit f7ceb7849c94b9c6c7697f599eb6b5c793f0fdb2 Author: Simon Schubert Date: Fri Aug 19 18:51:27 2016 +0200 split into smaller files Signed-off-by: Simon Schubert commit b1cbfe3850f53f297891406cfcd4bb1de421d67c Author: Simon Schubert Date: Fri Aug 19 18:09:03 2016 +0200 more simple bft Signed-off-by: Simon Schubert commit 0e24a238ae574f9aa292d88f278fde6c606eab0d Author: Simon Schubert Date: Tue Aug 16 10:09:54 2016 +0200 basic simplebft Signed-off-by: Simon Schubert commit f0fb9e212d8eb86f95b20433d06c019d3ac2d8b5 Author: Simon Schubert Date: Fri Aug 12 17:00:37 2016 +0200 start simplebft Signed-off-by: Simon Schubert Change-Id: Id3e7ff1a67ca3d64011bce4e149e35be09f15f34 Signed-off-by: Simon Schubert --- consensus/simplebft/backlog.go | 89 ++++ consensus/simplebft/batch.go | 64 +++ consensus/simplebft/calendarqueue_test.go | 134 +++++ consensus/simplebft/checkpoint.go | 106 ++++ consensus/simplebft/commit.go | 51 ++ consensus/simplebft/crypto.go | 96 ++++ consensus/simplebft/crypto_test.go | 32 ++ consensus/simplebft/execute.go | 30 ++ consensus/simplebft/newview.go | 163 ++++++ consensus/simplebft/newview_test.go | 148 ++++++ consensus/simplebft/prepare.go | 42 ++ consensus/simplebft/preprepare.go | 98 ++++ consensus/simplebft/request.go | 73 +++ consensus/simplebft/simplebft.go | 232 +++++++++ consensus/simplebft/simplebft.pb.go | 486 ++++++++++++++++++ consensus/simplebft/simplebft.proto | 107 ++++ consensus/simplebft/simplebft_bench_test.go | 84 ++++ consensus/simplebft/simplebft_test.go | 526 ++++++++++++++++++++ consensus/simplebft/testsys_test.go | 280 +++++++++++ consensus/simplebft/testsys_test_test.go | 96 ++++ consensus/simplebft/viewchange.go | 101 ++++ consensus/simplebft/xset.go | 154 ++++++ 22 files changed, 3192 insertions(+) create mode 100644 consensus/simplebft/backlog.go create mode 100644 consensus/simplebft/batch.go create mode 100644 consensus/simplebft/calendarqueue_test.go create mode 100644 consensus/simplebft/checkpoint.go create mode 100644 consensus/simplebft/commit.go create mode 100644 consensus/simplebft/crypto.go create mode 100644 consensus/simplebft/crypto_test.go create mode 100644 consensus/simplebft/execute.go create mode 100644 consensus/simplebft/newview.go create mode 100644 consensus/simplebft/newview_test.go create mode 100644 consensus/simplebft/prepare.go create mode 100644 consensus/simplebft/preprepare.go create mode 100644 consensus/simplebft/request.go create mode 100644 consensus/simplebft/simplebft.go create mode 100644 consensus/simplebft/simplebft.pb.go create mode 100644 consensus/simplebft/simplebft.proto create mode 100644 consensus/simplebft/simplebft_bench_test.go create mode 100644 consensus/simplebft/simplebft_test.go create mode 100644 consensus/simplebft/testsys_test.go create mode 100644 consensus/simplebft/testsys_test_test.go create mode 100644 consensus/simplebft/viewchange.go create mode 100644 consensus/simplebft/xset.go diff --git a/consensus/simplebft/backlog.go b/consensus/simplebft/backlog.go new file mode 100644 index 00000000000..ebd23226f09 --- /dev/null +++ b/consensus/simplebft/backlog.go @@ -0,0 +1,89 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +func (s *SBFT) testBacklog(m *Msg, src uint64) bool { + if len(s.backLog[src]) > 0 { + return true + } + + return s.testBacklog2(m, src) +} + +func (s *SBFT) testBacklog2(m *Msg, src uint64) bool { + record := func(seq uint64) bool { + if seq > s.cur.subject.Seq.Seq { + return true + } + return false + } + + if pp := m.GetPreprepare(); pp != nil && !s.cur.executed { + return true + } else if p := m.GetPrepare(); p != nil { + return record(p.Seq.Seq) + } else if c := m.GetCommit(); c != nil { + return record(c.Seq.Seq) + } else if cs := m.GetCheckpoint(); cs != nil { + c := &Checkpoint{} + return record(c.Seq) + } + return false +} + +func (s *SBFT) recordBacklogMsg(m *Msg, src uint64) { + if src == s.id { + panic("should never have to backlog my own message") + } + // TODO prevent DoS by limiting the number of messages per replica + s.backLog[src] = append(s.backLog[src], m) +} + +func (s *SBFT) processBacklog() { + processed := true + + for processed { + processed = false + notReady := uint64(0) + for src, _ := range s.backLog { + for len(s.backLog[src]) > 0 { + m, rest := s.backLog[src][0], s.backLog[src][1:] + if s.testBacklog2(m, src) { + notReady++ + break + } + s.backLog[src] = rest + + log.Debugf("processing stored message from %d: %s", src, m) + + s.handleQueueableMessage(m, src) + processed = true + } + } + + // all minus us + if notReady >= s.config.N-1 { + // This is a problem - we consider all other replicas + // too far ahead for us. We need to do a state transfer + // to get out of this rut. + for src := range s.backLog { + delete(s.backLog, src) + } + // TODO trigger state transfer + } + } +} diff --git a/consensus/simplebft/batch.go b/consensus/simplebft/batch.go new file mode 100644 index 00000000000..a070fb24e4e --- /dev/null +++ b/consensus/simplebft/batch.go @@ -0,0 +1,64 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "fmt" + "reflect" + + "github.com/golang/protobuf/proto" +) + +func (s *SBFT) makeBatch(seq uint64, prevHash []byte, data [][]byte) *Batch { + datahash := merkleHashData(data) + + batchhead := &BatchHeader{ + Seq: seq, + PrevHash: prevHash, + DataHash: datahash, + } + rawHeader, err := proto.Marshal(batchhead) + if err != nil { + panic(err) + } + return &Batch{ + Header: rawHeader, + Payloads: data, + } +} + +func (s *SBFT) checkBatch(b *Batch) (*BatchHeader, error) { + datahash := merkleHashData(b.Payloads) + + batchheader := &BatchHeader{} + err := proto.Unmarshal(b.Header, batchheader) + if err != nil { + return nil, err + } + + if !reflect.DeepEqual(datahash, batchheader.DataHash) { + return nil, fmt.Errorf("malformed batch: invalid hash") + } + + return batchheader, nil +} + +//////////////////////////////////////// + +func (b *Batch) Hash() []byte { + return hash(b.Header) +} diff --git a/consensus/simplebft/calendarqueue_test.go b/consensus/simplebft/calendarqueue_test.go new file mode 100644 index 00000000000..cd2635b5821 --- /dev/null +++ b/consensus/simplebft/calendarqueue_test.go @@ -0,0 +1,134 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "sort" + "time" +) + +type calendarQueue struct { + dayLength time.Duration + yearLength time.Duration + today time.Duration + nextYear time.Duration + slots [][]testElem + maxLen int +} + +func newCalendarQueue(dayLength time.Duration, days int) *calendarQueue { + return &calendarQueue{ + dayLength: dayLength, + yearLength: dayLength * time.Duration(days), + nextYear: dayLength * time.Duration(days), + slots: make([][]testElem, days), + } +} + +func (t *calendarQueue) slot(d time.Duration) int { + return int(d/t.dayLength) % len(t.slots) +} + +func (t *calendarQueue) Add(e testElem) { + sl := t.slot(e.at) + t.slots[sl] = append(t.slots[sl], e) + l := len(t.slots[sl]) + if l > t.maxLen { + t.maxLen = l + } + sort.Sort(testElemQueue(t.slots[sl])) +} + +func (t *calendarQueue) Pop() (testElem, bool) { + var lowest *time.Duration + sl := t.slot(t.today) + start := sl + today := t.today + for ; today < t.nextYear; today, sl = today+t.dayLength, sl+1 { + if len(t.slots[sl]) == 0 { + continue + } + e := t.slots[sl][0] + if e.at >= t.nextYear { + if lowest == nil || *lowest > e.at { + lowest = &e.at + } + continue + } + t.slots[sl] = t.slots[sl][1:] + t.today = today + return e, true + } + + // next deadline is after this year, but we only + // searched part of the calendar so far. Search the + // remaining prefix. + for i := 0; i < start; i++ { + if len(t.slots[i]) == 0 { + continue + } + e := t.slots[i][0] + if e.at >= t.nextYear { + if lowest == nil || *lowest > e.at { + lowest = &e.at + } + } + } + + if lowest == nil { + return testElem{}, false + } + + t.today = *lowest / t.dayLength * t.dayLength + t.nextYear = (t.today/t.yearLength + 1) * t.yearLength + return t.Pop() // retry! +} + +func (t *calendarQueue) filter(fn func(testElem) bool) { + for sli, sl := range t.slots { + var del []int + for i, e := range sl { + if !fn(e) { + del = append(del, i) + } + } + + // now delete + for i, e := range del { + correctedPos := e - i + // in-place remove + sl = sl[:correctedPos+copy(sl[correctedPos:], sl[correctedPos+1:])] + } + t.slots[sli] = sl + } +} + +///////////////////////////////////////// + +type testElemQueue []testElem + +func (q testElemQueue) Len() int { + return len(q) +} + +func (q testElemQueue) Less(i, j int) bool { + return q[i].at < q[j].at +} + +func (q testElemQueue) Swap(i, j int) { + q[i], q[j] = q[j], q[i] +} diff --git a/consensus/simplebft/checkpoint.go b/consensus/simplebft/checkpoint.go new file mode 100644 index 00000000000..2abcd9319b3 --- /dev/null +++ b/consensus/simplebft/checkpoint.go @@ -0,0 +1,106 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "fmt" + "reflect" +) + +func (s *SBFT) sendCheckpoint() { + sig := s.sys.Sign(s.cur.subject.Digest) + c := &Checkpoint{ + Seq: s.cur.subject.Seq.Seq, + Digest: s.cur.subject.Digest, + Signature: sig, + } + s.broadcast(&Msg{&Msg_Checkpoint{c}}) +} + +func (s *SBFT) handleCheckpoint(c *Checkpoint, src uint64) { + if s.cur.checkpointDone { + return + } + + if c.Seq < s.cur.subject.Seq.Seq { + // old message + return + } + + err := s.checkBytesSig(c.Digest, src, c.Signature) + if err != nil { + log.Infof("checkpoint signature invalid for %d from %d", c.Seq, src) + return + } + + // TODO should we always accept checkpoints? + if c.Seq != s.cur.subject.Seq.Seq { + log.Infof("checkpoint does not match expected subject %v, got %v", &s.cur.subject, c) + return + } + if _, ok := s.cur.checkpoint[src]; ok { + log.Infof("duplicate checkpoint for %d from %d", c.Seq, src) + } + s.cur.checkpoint[src] = c + + max := "_" + sums := make(map[string][]uint64) + for csrc, c := range s.cur.checkpoint { + sum := fmt.Sprintf("%x", c.Digest) + sums[sum] = append(sums[sum], csrc) + + if len(sums[sum]) >= s.oneCorrectQuorum() { + max = sum + } + } + + replicas, ok := sums[max] + if !ok { + return + } + + // got a weak checkpoint + + cpset := &CheckpointSet{make(map[uint64]*Checkpoint)} + var sigs [][]byte + for _, r := range replicas { + cp := s.cur.checkpoint[r] + cpset.CheckpointSet[r] = cp + sigs = append(sigs, cp.Signature) + } + s.cur.checkpointDone = true + + c = s.cur.checkpoint[replicas[0]] + + if !reflect.DeepEqual(c.Digest, s.cur.subject.Digest) { + log.Fatalf("weak checkpoint %x does not match our state %x", + c.Digest, s.cur.subject.Digest) + // NOT REACHED + } + + // ignore null requests + if s.cur.preprep.Batch != nil { + batch := *s.cur.preprep.Batch + batch.Signatures = sigs + s.sys.Deliver(&batch) + } + s.cur.timeout.Cancel() + log.Infof("request %s %s completed on %d", s.cur.subject.Seq, hash2str(s.cur.subject.Digest), s.id) + + s.maybeSendNextBatch() + s.processBacklog() +} diff --git a/consensus/simplebft/commit.go b/consensus/simplebft/commit.go new file mode 100644 index 00000000000..65470b65ad8 --- /dev/null +++ b/consensus/simplebft/commit.go @@ -0,0 +1,51 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import "reflect" + +func (s *SBFT) maybeSendCommit() { + if s.cur.sentCommit || len(s.cur.prep) < s.noFaultyQuorum()-1 { + return + } + s.sendCommit() +} + +func (s *SBFT) sendCommit() { + s.cur.sentCommit = true + c := s.cur.subject + s.sys.Persist("commit", &c) + s.broadcast(&Msg{&Msg_Commit{&c}}) +} + +func (s *SBFT) handleCommit(c *Subject, src uint64) { + if c.Seq.Seq < s.cur.subject.Seq.Seq { + // old message + return + } + + if !reflect.DeepEqual(c, &s.cur.subject) { + log.Infof("commit does not match expected subject %v, got %v", &s.cur.subject, c) + return + } + if _, ok := s.cur.commit[src]; ok { + log.Infof("duplicate commit for %v from %d", *c.Seq, src) + return + } + s.cur.commit[src] = c + s.maybeExecute() +} diff --git a/consensus/simplebft/crypto.go b/consensus/simplebft/crypto.go new file mode 100644 index 00000000000..ca172b44346 --- /dev/null +++ b/consensus/simplebft/crypto.go @@ -0,0 +1,96 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "crypto/sha256" + "encoding/base64" + + "github.com/golang/protobuf/proto" +) + +func hash2str(h []byte) string { + return base64.RawStdEncoding.EncodeToString(h) +} + +func hash(data []byte) []byte { + h := sha256.Sum256(data) + return h[:] +} + +func merkleHashData(data [][]byte) []byte { + var digests [][]byte + for _, d := range data { + digests = append(digests, hash(d)) + } + return merkleHashDigests(digests) +} + +func merkleHashDigests(digests [][]byte) []byte { + for len(digests) > 1 { + var nextDigests [][]byte + var prev []byte + for _, d := range digests { + if prev == nil { + prev = d + } else { + h := sha256.New() + h.Write(prev) + h.Write(d) + nextDigests = append(nextDigests, h.Sum(nil)) + prev = nil + } + } + if prev != nil { + nextDigests = append(nextDigests, prev) + } + digests = nextDigests + } + + if len(digests) > 0 { + return digests[0] + } else { + return nil + } +} + +//////////////////////////////////////////////// + +func (s *SBFT) sign(msg proto.Message) *Signed { + bytes, err := proto.Marshal(msg) + if err != nil { + panic(err) + } + sig := s.sys.Sign(bytes) + return &Signed{Data: bytes, Signature: []byte(sig)} +} + +func (s *SBFT) checkSig(sig *Signed, signer uint64, msg proto.Message) error { + err := s.checkBytesSig(sig.Data, signer, sig.Signature) + if err != nil { + return err + } + err = proto.Unmarshal(sig.Data, msg) + if err != nil { + return err + } + return nil +} + +func (s *SBFT) checkBytesSig(digest []byte, signer uint64, sig []byte) error { + return s.sys.CheckSig(digest, signer, sig) +} diff --git a/consensus/simplebft/crypto_test.go b/consensus/simplebft/crypto_test.go new file mode 100644 index 00000000000..966a216f571 --- /dev/null +++ b/consensus/simplebft/crypto_test.go @@ -0,0 +1,32 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "encoding/base64" + "testing" +) + +func TestMerkleHash(t *testing.T) { + data := [][]byte{[]byte("A"), []byte("B"), []byte("C")} + digest := "2+EeNqqJqWMQPef4rQnBEAwGzNXFrUJMp0HvsGidxCc=" + h := merkleHashData(data) + hs := base64.StdEncoding.EncodeToString(h) + if hs != digest { + t.Errorf("wrong digest, expected %s, got %s", digest, hs) + } +} diff --git a/consensus/simplebft/execute.go b/consensus/simplebft/execute.go new file mode 100644 index 00000000000..0ffe4134bef --- /dev/null +++ b/consensus/simplebft/execute.go @@ -0,0 +1,30 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +func (s *SBFT) maybeExecute() { + if s.cur.executed || len(s.cur.commit) < s.noFaultyQuorum() { + return + } + s.cur.executed = true + s.seq = *s.cur.subject.Seq + log.Noticef("executing %v", s.seq) + + s.sys.Persist("execute", &s.cur.subject) + + s.sendCheckpoint() +} diff --git a/consensus/simplebft/newview.go b/consensus/simplebft/newview.go new file mode 100644 index 00000000000..06cd1b1d5dc --- /dev/null +++ b/consensus/simplebft/newview.go @@ -0,0 +1,163 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "fmt" + "reflect" +) + +func (s *SBFT) maybeSendNewView() { + if s.lastNewViewSent == s.seq.View { + return + } + + vset := make(map[uint64]*Signed) + var vcs []*ViewChange + + for src, vc := range s.viewchange { + if vc.vc.View == s.seq.View { + vset[src] = vc.svc + vcs = append(vcs, vc.vc) + } + } + + xset, ok := s.makeXset(vcs) + if !ok { + log.Debug("xset not yet sufficient") + return + } + + var batch *Batch + if xset.Digest != nil { + if reflect.DeepEqual(s.cur.subject.Digest, xset.Digest) { + batch = s.cur.preprep.Batch + } else { + log.Warningf("forfeiting primary - do not have request in store for %d %x", xset.Seq.Seq, xset.Digest) + xset = nil + } + } else { + batch = s.makeBatch(xset.Seq.Seq, s.sys.LastBatch().Hash(), nil) + xset.Digest = batch.Hash() + } + + nv := &NewView{ + View: s.seq.View, + Vset: vset, + Xset: xset, + Batch: batch, + } + + log.Noticef("sending new view for %d", nv.View) + s.lastNewViewSent = nv.View + s.broadcast(&Msg{&Msg_NewView{nv}}) +} + +func (s *SBFT) handleNewView(nv *NewView, src uint64) { + if src != s.primaryIdView(nv.View) { + log.Warningf("invalid new view from %d for %d", src, nv.View) + return + } + + if onv, ok := s.newview[s.primaryIdView(nv.View)]; ok && onv.View >= nv.View { + log.Debugf("discarding duplicate new view for %d", nv.View) + return + } + + var vcs []*ViewChange + for vcsrc, svc := range nv.Vset { + vc := &ViewChange{} + err := s.checkSig(svc, vcsrc, vc) + if err == nil { + if vc.View != nv.View { + err = fmt.Errorf("view does not match") + } + } + if err != nil { + log.Warningf("invalid new view from %d: view change for %d: %s", src, vcsrc, err) + s.sendViewChange() + return + } + vcs = append(vcs, vc) + } + + xset, ok := s.makeXset(vcs) + if xset.Digest == nil { + // null request special treatment + xset.Digest = s.makeBatch(nv.Xset.Seq.Seq, s.sys.LastBatch().Hash(), nil).Hash() + } + + if !ok || !reflect.DeepEqual(nv.Xset, xset) { + log.Warningf("invalid new view from %d: xset incorrect: %v, %v", src, nv.Xset, xset) + s.sendViewChange() + return + } + + if nv.Batch == nil { + log.Warningf("invalid new view from %d: batch empty", src) + s.sendViewChange() + return + } + + if !reflect.DeepEqual(hash(nv.Batch.Header), nv.Xset.Digest) { + log.Warningf("invalid new view from %d: batch head hash does not match xset: %x, %x, %v", + src, hash(nv.Batch.Header), nv.Xset.Digest, nv) + s.sendViewChange() + return + } + + _, err := s.checkBatch(nv.Batch) + if err != nil { + log.Warningf("invalid new view from %d: invalid batch, %s", + src, err) + s.sendViewChange() + return + } + + s.newview[s.primaryIdView(nv.View)] = nv + + s.processNewView() +} + +func (s *SBFT) processNewView() { + if s.activeView { + return + } + + nv, ok := s.newview[s.primaryIdView(s.seq.View)] + if !ok || nv.View != s.seq.View { + return + } + + nextSeq := s.nextSeq() + if *nv.Xset.Seq != nextSeq { + log.Infof("we are outdated") + return + } + + pp := &Preprepare{ + Seq: nv.Xset.Seq, + Batch: nv.Batch, + } + + s.activeView = true + var h []byte + if nv.Batch != nil { + h = hash(nv.Batch.Header) + } + s.acceptPreprepare(Subject{Seq: &nextSeq, Digest: h}, pp) +} diff --git a/consensus/simplebft/newview_test.go b/consensus/simplebft/newview_test.go new file mode 100644 index 00000000000..a312b0f4c3e --- /dev/null +++ b/consensus/simplebft/newview_test.go @@ -0,0 +1,148 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "reflect" + "testing" +) + +func TestXsetNoByz(t *testing.T) { + s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}} + vcs := []*ViewChange{ + &ViewChange{ + View: 3, + Pset: nil, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, + &Subject{&Seq{2, 2}, []byte("val2")}}, + Executed: 1, + }, + &ViewChange{ + View: 3, + Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Executed: 1, + }, + &ViewChange{ + View: 3, + Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, + &Subject{&Seq{2, 2}, []byte("val2")}}, + Executed: 1, + }, + } + + xset, ok := s.makeXset(vcs) + if !ok { + t.Fatal("no xset") + } + + if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val2")}) { + t.Error(xset) + } +} + +func TestXsetByz0(t *testing.T) { + s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}} + vcs := []*ViewChange{ + &ViewChange{ + View: 3, + Pset: nil, + Qset: nil, + Executed: 1, + }, + &ViewChange{ + View: 3, + Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Executed: 1, + }, + &ViewChange{ + View: 3, + Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, + &Subject{&Seq{2, 2}, []byte("val2")}}, + Executed: 1, + }, + } + + xset, ok := s.makeXset(vcs) + if ok { + t.Error("should not have received an xset") + } + + vcs = append(vcs, &ViewChange{ + View: 3, + Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, + &Subject{&Seq{2, 2}, []byte("val2")}}, + Executed: 2, + }) + + xset, ok = s.makeXset(vcs) + if !ok { + t.Error("no xset") + } + if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val2")}) { + t.Error(xset) + } +} + +func TestXsetByz2(t *testing.T) { + s := &SBFT{config: Config{N: 4, F: 1}, seq: Seq{3, 1}} + vcs := []*ViewChange{ + &ViewChange{ + View: 3, + Pset: nil, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Executed: 1, + }, + &ViewChange{ + View: 3, + Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Executed: 1, + }, + &ViewChange{ + View: 3, + Pset: []*Subject{&Subject{&Seq{2, 2}, []byte("val2")}}, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}, + &Subject{&Seq{2, 2}, []byte("val2")}}, + Executed: 1, + }, + } + + xset, ok := s.makeXset(vcs) + if ok { + t.Error("should not have received an xset") + } + + vcs = append(vcs, &ViewChange{ + View: 3, + Pset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Qset: []*Subject{&Subject{&Seq{1, 2}, []byte("val1")}}, + Executed: 2, + }) + + xset, ok = s.makeXset(vcs) + if !ok { + t.Error("no xset") + } + if !reflect.DeepEqual(xset, &Subject{&Seq{3, 2}, []byte("val1")}) { + t.Error(xset) + } +} diff --git a/consensus/simplebft/prepare.go b/consensus/simplebft/prepare.go new file mode 100644 index 00000000000..4ab4e1b2343 --- /dev/null +++ b/consensus/simplebft/prepare.go @@ -0,0 +1,42 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import "reflect" + +func (s *SBFT) sendPrepare() { + p := s.cur.subject + s.broadcast(&Msg{&Msg_Prepare{&p}}) +} + +func (s *SBFT) handlePrepare(p *Subject, src uint64) { + if p.Seq.Seq < s.cur.subject.Seq.Seq { + // old message + return + } + + if !reflect.DeepEqual(p, &s.cur.subject) { + log.Infof("prepare does not match expected subject %v, got %v", &s.cur.subject, p) + return + } + if _, ok := s.cur.prep[src]; ok { + log.Infof("duplicate prepare for %v from %d", *p.Seq, src) + return + } + s.cur.prep[src] = p + s.maybeSendCommit() +} diff --git a/consensus/simplebft/preprepare.go b/consensus/simplebft/preprepare.go new file mode 100644 index 00000000000..921c2891bc9 --- /dev/null +++ b/consensus/simplebft/preprepare.go @@ -0,0 +1,98 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "bytes" + "time" +) + +func (s *SBFT) sendPreprepare(batch []*Request) { + seq := s.nextSeq() + + data := make([][]byte, len(batch)) + for i, req := range batch { + data[i] = req.Payload + } + + lasthash := hash(s.sys.LastBatch().Header) + + m := &Preprepare{ + Seq: &seq, + Batch: s.makeBatch(seq.Seq, lasthash, data), + } + + s.sys.Persist("preprepare", m) + s.broadcast(&Msg{&Msg_Preprepare{m}}) +} + +func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) { + if src != s.primaryId() { + log.Infof("preprepare from non-primary %d", src) + return + } + nextSeq := s.nextSeq() + if *pp.Seq != nextSeq { + log.Infof("preprepare does not match expected %v, got %v", nextSeq, *pp.Seq) + return + } + var blockhash []byte + if pp.Batch != nil { + blockhash = hash(pp.Batch.Header) + + batchheader, err := s.checkBatch(pp.Batch) + if err != nil || batchheader.Seq != pp.Seq.Seq { + log.Infof("preprepare %v batch head inconsistent from %d", pp.Seq, src) + return + } + + prevhash := hash(s.sys.LastBatch().Header) + if !bytes.Equal(batchheader.PrevHash, prevhash) { + log.Infof("preprepare batch prev hash does not match expected %s, got %s", hash2str(batchheader.PrevHash), hash2str(prevhash)) + return + } + } + + s.acceptPreprepare(Subject{Seq: &nextSeq, Digest: blockhash}, pp) +} + +func (s *SBFT) acceptPreprepare(sub Subject, pp *Preprepare) { + s.cur = reqInfo{ + subject: sub, + timeout: s.sys.Timer(time.Duration(s.config.RequestTimeoutNsec)*time.Nanosecond, s.requestTimeout), + preprep: pp, + prep: make(map[uint64]*Subject), + commit: make(map[uint64]*Subject), + checkpoint: make(map[uint64]*Checkpoint), + } + + log.Infof("accepting preprepare for %v, %x", sub.Seq, sub.Digest) + s.sys.Persist("preprepare", pp) + s.cancelViewChangeTimer() + if !s.isPrimary() { + s.sendPrepare() + } + + s.maybeSendCommit() +} + +//////////////////////////////////////////////// + +func (s *SBFT) requestTimeout() { + log.Infof("request timed out: %s", s.cur.subject.Seq) + s.sendViewChange() +} diff --git a/consensus/simplebft/request.go b/consensus/simplebft/request.go new file mode 100644 index 00000000000..776fc6a4fdc --- /dev/null +++ b/consensus/simplebft/request.go @@ -0,0 +1,73 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import "time" + +func (s *SBFT) Request(req []byte) { + s.broadcast(&Msg{&Msg_Request{&Request{req}}}) +} + +func (s *SBFT) handleRequest(req *Request, src uint64) { + if s.isPrimary() { + s.batch = append(s.batch, req) + if s.batchSize() >= s.config.BatchSizeBytes { + s.maybeSendNextBatch() + } else { + s.startBatchTimer() + } + } +} + +//////////////////////////////////////////////// + +func (s *SBFT) startBatchTimer() { + if s.batchTimer == nil { + s.batchTimer = s.sys.Timer(time.Duration(s.config.BatchDurationNsec), s.maybeSendNextBatch) + } +} + +func (s *SBFT) batchSize() uint64 { + size := uint64(0) + for _, req := range s.batch { + size += uint64(len(req.Payload)) + } + return size +} + +func (s *SBFT) maybeSendNextBatch() { + if s.batchTimer != nil { + s.batchTimer.Cancel() + s.batchTimer = nil + } + + if !s.isPrimary() { + return + } + + if !s.cur.executed { + return + } + + if len(s.batch) == 0 { + return + } + + batch := s.batch + s.batch = nil + s.sendPreprepare(batch) +} diff --git a/consensus/simplebft/simplebft.go b/consensus/simplebft/simplebft.go new file mode 100644 index 00000000000..d4f01ada4e3 --- /dev/null +++ b/consensus/simplebft/simplebft.go @@ -0,0 +1,232 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "fmt" + "reflect" + "time" + + "github.com/golang/protobuf/proto" + "github.com/op/go-logging" +) + +type Receiver interface { + Receive(msg *Msg, src uint64) +} + +type System interface { + Send(msg *Msg, dest uint64) + Timer(d time.Duration, f func()) Canceller + Deliver(batch *Batch) + SetReceiver(receiver Receiver) + Persist(key string, data proto.Message) + Restore(key string, out proto.Message) bool + LastBatch() *Batch + Sign(data []byte) []byte + CheckSig(data []byte, src uint64, sig []byte) error +} + +type Canceller interface { + Cancel() +} + +type SBFT struct { + sys System + + config Config + id uint64 + seq Seq + batch []*Request + batchTimer Canceller + cur reqInfo + activeView bool + viewchange map[uint64]*viewChangeInfo + newview map[uint64]*NewView + lastNewViewSent uint64 + viewChangeTimeout time.Duration + viewChangeTimer Canceller + + backLog map[uint64][]*Msg +} + +type reqInfo struct { + subject Subject + timeout Canceller + preprep *Preprepare + prep map[uint64]*Subject + commit map[uint64]*Subject + sentCommit bool + executed bool + checkpoint map[uint64]*Checkpoint + checkpointDone bool +} + +type viewChangeInfo struct { + svc *Signed + vc *ViewChange +} + +var log = logging.MustGetLogger("sbft") + +type dummyCanceller struct{} + +func (d dummyCanceller) Cancel() {} + +func New(id uint64, config *Config, sys System) (*SBFT, error) { + if config.F*3+1 > config.N { + return nil, fmt.Errorf("invalid combination of N and F") + } + + s := &SBFT{ + config: *config, + sys: sys, + id: id, + viewchange: make(map[uint64]*viewChangeInfo), + newview: make(map[uint64]*NewView), + viewChangeTimer: dummyCanceller{}, + backLog: make(map[uint64][]*Msg), + } + s.sys.SetReceiver(s) + + lastBatch := s.sys.LastBatch() + bh, err := s.checkBatch(lastBatch) + if err != nil { + panic(err) + } + + s.seq.View = 0 + s.seq.Seq = bh.Seq + s.cur.subject.Seq = &s.seq + s.cur.sentCommit = true + s.cur.executed = true + s.cur.checkpointDone = true + s.cur.timeout = dummyCanceller{} + + pp := &Preprepare{} + if s.sys.Restore("preprepare", pp) { + s.seq.View = pp.Seq.View + if pp.Seq.Seq > bh.Seq { + s.seq = *pp.Seq + s.seq.Seq -= 1 + s.handlePreprepare(pp, s.primaryIdView(pp.Seq.View)) + } + } + c := &Subject{} + if s.sys.Restore("commit", c) && reflect.DeepEqual(c, &s.cur.subject) { + s.cur.sentCommit = true + s.sendCommit() + } + ex := &Subject{} + if s.sys.Restore("execute", ex) && reflect.DeepEqual(c, &s.cur.subject) { + s.cur.executed = true + s.sendCheckpoint() + } + + // XXX set active after checking with the network + s.activeView = true + + s.cancelViewChangeTimer() + return s, nil +} + +//////////////////////////////////////////////// + +func (s *SBFT) primaryIdView(v uint64) uint64 { + return v % s.config.N +} + +func (s *SBFT) primaryId() uint64 { + return s.primaryIdView(s.seq.View) +} + +func (s *SBFT) isPrimary() bool { + return s.primaryId() == s.id +} + +func (s *SBFT) nextSeq() Seq { + seq := s.seq + seq.Seq += 1 + return seq +} + +func (s *SBFT) nextView() uint64 { + return s.seq.View + 1 +} + +func (s *SBFT) noFaultyQuorum() int { + return int(s.config.N - s.config.F) +} + +func (s *SBFT) oneCorrectQuorum() int { + return int(s.config.F + 1) +} + +func (s *SBFT) broadcast(m *Msg) { + for i := uint64(0); i < s.config.N; i++ { + s.sys.Send(m, i) + } +} + +//////////////////////////////////////////////// + +func (s *SBFT) Receive(m *Msg, src uint64) { + log.Debugf("received message from %d: %s", src, m) + + if s.testBacklog(m, src) { + log.Debugf("message for future seq, storing for later") + s.recordBacklogMsg(m, src) + return + } + + if req := m.GetRequest(); req != nil { + s.handleRequest(req, src) + return + } else if vs := m.GetViewChange(); vs != nil { + s.handleViewChange(vs, src) + return + } else if c := m.GetCheckpoint(); c != nil { + s.handleCheckpoint(c, src) + return + } else if nv := m.GetNewView(); nv != nil { + s.handleNewView(nv, src) + return + } + + if !s.activeView { + log.Infof("we are not active in view %d, discarding message from %d", + s.seq.View, src) + return + } + + s.handleQueueableMessage(m, src) +} + +func (s *SBFT) handleQueueableMessage(m *Msg, src uint64) { + if pp := m.GetPreprepare(); pp != nil { + s.handlePreprepare(pp, src) + return + } else if p := m.GetPrepare(); p != nil { + s.handlePrepare(p, src) + return + } else if c := m.GetCommit(); c != nil { + s.handleCommit(c, src) + return + } + + log.Warningf("received invalid message from %d", src) +} diff --git a/consensus/simplebft/simplebft.pb.go b/consensus/simplebft/simplebft.pb.go new file mode 100644 index 00000000000..62b8c1040e2 --- /dev/null +++ b/consensus/simplebft/simplebft.pb.go @@ -0,0 +1,486 @@ +// Code generated by protoc-gen-go. +// source: simplebft.proto +// DO NOT EDIT! + +/* +Package simplebft is a generated protocol buffer package. + +It is generated from these files: + simplebft.proto + +It has these top-level messages: + Config + Msg + Request + FetchRequest + QueryState + Seq + BatchHeader + Batch + Preprepare + Subject + ViewChange + Signed + NewView + Checkpoint + CheckpointSet +*/ +package simplebft + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type Config struct { + N uint64 `protobuf:"varint,1,opt,name=n" json:"n,omitempty"` + F uint64 `protobuf:"varint,2,opt,name=f" json:"f,omitempty"` + BatchDurationNsec uint64 `protobuf:"varint,3,opt,name=batch_duration_nsec" json:"batch_duration_nsec,omitempty"` + BatchSizeBytes uint64 `protobuf:"varint,4,opt,name=batch_size_bytes" json:"batch_size_bytes,omitempty"` + RequestTimeoutNsec uint64 `protobuf:"varint,5,opt,name=request_timeout_nsec" json:"request_timeout_nsec,omitempty"` +} + +func (m *Config) Reset() { *m = Config{} } +func (m *Config) String() string { return proto.CompactTextString(m) } +func (*Config) ProtoMessage() {} + +type Msg struct { + // Types that are valid to be assigned to Type: + // *Msg_Request + // *Msg_FetchRequest + // *Msg_Preprepare + // *Msg_Prepare + // *Msg_Commit + // *Msg_ViewChange + // *Msg_NewView + // *Msg_Checkpoint + Type isMsg_Type `protobuf_oneof:"type"` +} + +func (m *Msg) Reset() { *m = Msg{} } +func (m *Msg) String() string { return proto.CompactTextString(m) } +func (*Msg) ProtoMessage() {} + +type isMsg_Type interface { + isMsg_Type() +} + +type Msg_Request struct { + Request *Request `protobuf:"bytes,1,opt,name=request,oneof"` +} +type Msg_FetchRequest struct { + FetchRequest *FetchRequest `protobuf:"bytes,2,opt,name=fetch_request,oneof"` +} +type Msg_Preprepare struct { + Preprepare *Preprepare `protobuf:"bytes,3,opt,name=preprepare,oneof"` +} +type Msg_Prepare struct { + Prepare *Subject `protobuf:"bytes,4,opt,name=prepare,oneof"` +} +type Msg_Commit struct { + Commit *Subject `protobuf:"bytes,5,opt,name=commit,oneof"` +} +type Msg_ViewChange struct { + ViewChange *Signed `protobuf:"bytes,6,opt,name=view_change,oneof"` +} +type Msg_NewView struct { + NewView *NewView `protobuf:"bytes,7,opt,name=new_view,oneof"` +} +type Msg_Checkpoint struct { + Checkpoint *Checkpoint `protobuf:"bytes,8,opt,name=checkpoint,oneof"` +} + +func (*Msg_Request) isMsg_Type() {} +func (*Msg_FetchRequest) isMsg_Type() {} +func (*Msg_Preprepare) isMsg_Type() {} +func (*Msg_Prepare) isMsg_Type() {} +func (*Msg_Commit) isMsg_Type() {} +func (*Msg_ViewChange) isMsg_Type() {} +func (*Msg_NewView) isMsg_Type() {} +func (*Msg_Checkpoint) isMsg_Type() {} + +func (m *Msg) GetType() isMsg_Type { + if m != nil { + return m.Type + } + return nil +} + +func (m *Msg) GetRequest() *Request { + if x, ok := m.GetType().(*Msg_Request); ok { + return x.Request + } + return nil +} + +func (m *Msg) GetFetchRequest() *FetchRequest { + if x, ok := m.GetType().(*Msg_FetchRequest); ok { + return x.FetchRequest + } + return nil +} + +func (m *Msg) GetPreprepare() *Preprepare { + if x, ok := m.GetType().(*Msg_Preprepare); ok { + return x.Preprepare + } + return nil +} + +func (m *Msg) GetPrepare() *Subject { + if x, ok := m.GetType().(*Msg_Prepare); ok { + return x.Prepare + } + return nil +} + +func (m *Msg) GetCommit() *Subject { + if x, ok := m.GetType().(*Msg_Commit); ok { + return x.Commit + } + return nil +} + +func (m *Msg) GetViewChange() *Signed { + if x, ok := m.GetType().(*Msg_ViewChange); ok { + return x.ViewChange + } + return nil +} + +func (m *Msg) GetNewView() *NewView { + if x, ok := m.GetType().(*Msg_NewView); ok { + return x.NewView + } + return nil +} + +func (m *Msg) GetCheckpoint() *Checkpoint { + if x, ok := m.GetType().(*Msg_Checkpoint); ok { + return x.Checkpoint + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*Msg) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), []interface{}) { + return _Msg_OneofMarshaler, _Msg_OneofUnmarshaler, []interface{}{ + (*Msg_Request)(nil), + (*Msg_FetchRequest)(nil), + (*Msg_Preprepare)(nil), + (*Msg_Prepare)(nil), + (*Msg_Commit)(nil), + (*Msg_ViewChange)(nil), + (*Msg_NewView)(nil), + (*Msg_Checkpoint)(nil), + } +} + +func _Msg_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*Msg) + // type + switch x := m.Type.(type) { + case *Msg_Request: + b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Request); err != nil { + return err + } + case *Msg_FetchRequest: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.FetchRequest); err != nil { + return err + } + case *Msg_Preprepare: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Preprepare); err != nil { + return err + } + case *Msg_Prepare: + b.EncodeVarint(4<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Prepare); err != nil { + return err + } + case *Msg_Commit: + b.EncodeVarint(5<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Commit); err != nil { + return err + } + case *Msg_ViewChange: + b.EncodeVarint(6<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.ViewChange); err != nil { + return err + } + case *Msg_NewView: + b.EncodeVarint(7<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.NewView); err != nil { + return err + } + case *Msg_Checkpoint: + b.EncodeVarint(8<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Checkpoint); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("Msg.Type has unexpected type %T", x) + } + return nil +} + +func _Msg_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*Msg) + switch tag { + case 1: // type.request + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Request) + err := b.DecodeMessage(msg) + m.Type = &Msg_Request{msg} + return true, err + case 2: // type.fetch_request + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(FetchRequest) + err := b.DecodeMessage(msg) + m.Type = &Msg_FetchRequest{msg} + return true, err + case 3: // type.preprepare + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Preprepare) + err := b.DecodeMessage(msg) + m.Type = &Msg_Preprepare{msg} + return true, err + case 4: // type.prepare + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Subject) + err := b.DecodeMessage(msg) + m.Type = &Msg_Prepare{msg} + return true, err + case 5: // type.commit + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Subject) + err := b.DecodeMessage(msg) + m.Type = &Msg_Commit{msg} + return true, err + case 6: // type.view_change + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Signed) + err := b.DecodeMessage(msg) + m.Type = &Msg_ViewChange{msg} + return true, err + case 7: // type.new_view + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(NewView) + err := b.DecodeMessage(msg) + m.Type = &Msg_NewView{msg} + return true, err + case 8: // type.checkpoint + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Checkpoint) + err := b.DecodeMessage(msg) + m.Type = &Msg_Checkpoint{msg} + return true, err + default: + return false, nil + } +} + +type Request struct { + Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} + +type FetchRequest struct { + Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"` +} + +func (m *FetchRequest) Reset() { *m = FetchRequest{} } +func (m *FetchRequest) String() string { return proto.CompactTextString(m) } +func (*FetchRequest) ProtoMessage() {} + +type QueryState struct { +} + +func (m *QueryState) Reset() { *m = QueryState{} } +func (m *QueryState) String() string { return proto.CompactTextString(m) } +func (*QueryState) ProtoMessage() {} + +type Seq struct { + View uint64 `protobuf:"varint,1,opt,name=view" json:"view,omitempty"` + Seq uint64 `protobuf:"varint,2,opt,name=seq" json:"seq,omitempty"` +} + +func (m *Seq) Reset() { *m = Seq{} } +func (m *Seq) String() string { return proto.CompactTextString(m) } +func (*Seq) ProtoMessage() {} + +type BatchHeader struct { + Seq uint64 `protobuf:"varint,1,opt,name=seq" json:"seq,omitempty"` + PrevHash []byte `protobuf:"bytes,2,opt,name=prev_hash,proto3" json:"prev_hash,omitempty"` + DataHash []byte `protobuf:"bytes,3,opt,name=data_hash,proto3" json:"data_hash,omitempty"` +} + +func (m *BatchHeader) Reset() { *m = BatchHeader{} } +func (m *BatchHeader) String() string { return proto.CompactTextString(m) } +func (*BatchHeader) ProtoMessage() {} + +type Batch struct { + Header []byte `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` + Payloads [][]byte `protobuf:"bytes,2,rep,name=payloads,proto3" json:"payloads,omitempty"` + Signatures [][]byte `protobuf:"bytes,3,rep,name=signatures,proto3" json:"signatures,omitempty"` +} + +func (m *Batch) Reset() { *m = Batch{} } +func (m *Batch) String() string { return proto.CompactTextString(m) } +func (*Batch) ProtoMessage() {} + +type Preprepare struct { + Seq *Seq `protobuf:"bytes,1,opt,name=seq" json:"seq,omitempty"` + Batch *Batch `protobuf:"bytes,2,opt,name=batch" json:"batch,omitempty"` +} + +func (m *Preprepare) Reset() { *m = Preprepare{} } +func (m *Preprepare) String() string { return proto.CompactTextString(m) } +func (*Preprepare) ProtoMessage() {} + +func (m *Preprepare) GetSeq() *Seq { + if m != nil { + return m.Seq + } + return nil +} + +func (m *Preprepare) GetBatch() *Batch { + if m != nil { + return m.Batch + } + return nil +} + +type Subject struct { + Seq *Seq `protobuf:"bytes,1,opt,name=seq" json:"seq,omitempty"` + Digest []byte `protobuf:"bytes,2,opt,name=digest,proto3" json:"digest,omitempty"` +} + +func (m *Subject) Reset() { *m = Subject{} } +func (m *Subject) String() string { return proto.CompactTextString(m) } +func (*Subject) ProtoMessage() {} + +func (m *Subject) GetSeq() *Seq { + if m != nil { + return m.Seq + } + return nil +} + +type ViewChange struct { + View uint64 `protobuf:"varint,1,opt,name=view" json:"view,omitempty"` + Pset []*Subject `protobuf:"bytes,2,rep,name=pset" json:"pset,omitempty"` + Qset []*Subject `protobuf:"bytes,3,rep,name=qset" json:"qset,omitempty"` + Executed uint64 `protobuf:"varint,4,opt,name=executed" json:"executed,omitempty"` +} + +func (m *ViewChange) Reset() { *m = ViewChange{} } +func (m *ViewChange) String() string { return proto.CompactTextString(m) } +func (*ViewChange) ProtoMessage() {} + +func (m *ViewChange) GetPset() []*Subject { + if m != nil { + return m.Pset + } + return nil +} + +func (m *ViewChange) GetQset() []*Subject { + if m != nil { + return m.Qset + } + return nil +} + +type Signed struct { + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"` +} + +func (m *Signed) Reset() { *m = Signed{} } +func (m *Signed) String() string { return proto.CompactTextString(m) } +func (*Signed) ProtoMessage() {} + +type NewView struct { + View uint64 `protobuf:"varint,1,opt,name=view" json:"view,omitempty"` + Vset map[uint64]*Signed `protobuf:"bytes,2,rep,name=vset" json:"vset,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Xset *Subject `protobuf:"bytes,3,opt,name=xset" json:"xset,omitempty"` + Batch *Batch `protobuf:"bytes,4,opt,name=batch" json:"batch,omitempty"` +} + +func (m *NewView) Reset() { *m = NewView{} } +func (m *NewView) String() string { return proto.CompactTextString(m) } +func (*NewView) ProtoMessage() {} + +func (m *NewView) GetVset() map[uint64]*Signed { + if m != nil { + return m.Vset + } + return nil +} + +func (m *NewView) GetXset() *Subject { + if m != nil { + return m.Xset + } + return nil +} + +func (m *NewView) GetBatch() *Batch { + if m != nil { + return m.Batch + } + return nil +} + +type Checkpoint struct { + Seq uint64 `protobuf:"varint,1,opt,name=seq" json:"seq,omitempty"` + Digest []byte `protobuf:"bytes,2,opt,name=digest,proto3" json:"digest,omitempty"` + Signature []byte `protobuf:"bytes,3,opt,name=signature,proto3" json:"signature,omitempty"` +} + +func (m *Checkpoint) Reset() { *m = Checkpoint{} } +func (m *Checkpoint) String() string { return proto.CompactTextString(m) } +func (*Checkpoint) ProtoMessage() {} + +type CheckpointSet struct { + CheckpointSet map[uint64]*Checkpoint `protobuf:"bytes,1,rep,name=checkpoint_set" json:"checkpoint_set,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *CheckpointSet) Reset() { *m = CheckpointSet{} } +func (m *CheckpointSet) String() string { return proto.CompactTextString(m) } +func (*CheckpointSet) ProtoMessage() {} + +func (m *CheckpointSet) GetCheckpointSet() map[uint64]*Checkpoint { + if m != nil { + return m.CheckpointSet + } + return nil +} diff --git a/consensus/simplebft/simplebft.proto b/consensus/simplebft/simplebft.proto new file mode 100644 index 00000000000..c7228335f81 --- /dev/null +++ b/consensus/simplebft/simplebft.proto @@ -0,0 +1,107 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +syntax = "proto3"; + +package simplebft; + +message config { + uint64 n = 1; + uint64 f = 2; + uint64 batch_duration_nsec = 3; + uint64 batch_size_bytes = 4; + uint64 request_timeout_nsec = 5; +}; + +message msg { + oneof type { + request request = 1; + fetch_request fetch_request = 2; + preprepare preprepare = 3; + subject prepare = 4; + subject commit = 5; + signed view_change = 6; + new_view new_view = 7; + checkpoint checkpoint = 8; + }; +}; + +message request { + bytes payload = 1; +}; + +message fetch_request { + bytes digest = 1; +}; + +message query_state { +}; + +message seq { + uint64 view = 1; + uint64 seq = 2; +}; + +message batch_header { + uint64 seq = 1; + bytes prev_hash = 2; + bytes data_hash = 3; +}; + +message batch { + bytes header = 1; + repeated bytes payloads = 2; + repeated bytes signatures = 3; +}; + +message preprepare { + seq seq = 1; + batch batch = 2; +}; + +message subject { + seq seq = 1; + bytes digest = 2; +}; + +message view_change { + uint64 view = 1; + repeated subject pset = 2; + repeated subject qset = 3; + uint64 executed = 4; +}; + +message signed { + bytes data = 1; + bytes signature = 2; +}; + +message new_view { + uint64 view = 1; + map vset = 2; + subject xset = 3; + batch batch = 4; +}; + +message checkpoint { + uint64 seq = 1; + bytes digest = 2; + bytes signature = 3; +}; + +message checkpoint_set { + map checkpoint_set = 1; +}; diff --git a/consensus/simplebft/simplebft_bench_test.go b/consensus/simplebft/simplebft_bench_test.go new file mode 100644 index 00000000000..d1902809052 --- /dev/null +++ b/consensus/simplebft/simplebft_bench_test.go @@ -0,0 +1,84 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "testing" + + "github.com/op/go-logging" +) + +func BenchmarkRequestN1(b *testing.B) { + logging.SetLevel(logging.WARNING, "sbft") + + sys := newTestSystem(1) + s, _ := New(0, &Config{N: 1, F: 0, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, sys.NewAdapter(0)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Request([]byte{byte(i), byte(i >> 8), byte(i >> 16)}) + sys.Run() + } + logging.SetLevel(logging.NOTICE, "sbft") +} + +func BenchmarkRequestN4(b *testing.B) { + logging.SetLevel(logging.WARNING, "sbft") + + N := uint64(4) + var repls []*SBFT + var adapters []*testSystemAdapter + sys := newTestSystem(N) + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 11, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + b.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + repls[0].Request([]byte{byte(i), byte(i >> 8), byte(i >> 16)}) + sys.Run() + } + logging.SetLevel(logging.NOTICE, "sbft") +} + +func BenchmarkRequestN80(b *testing.B) { + logging.SetLevel(logging.WARNING, "sbft") + + N := uint64(80) + var repls []*SBFT + var adapters []*testSystemAdapter + sys := newTestSystem(N) + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: (N - 1) / 3, BatchDurationNsec: 2000000000, BatchSizeBytes: 11, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + b.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + repls[0].Request([]byte{byte(i), byte(i >> 8), byte(i >> 16)}) + sys.Run() + } + logging.SetLevel(logging.NOTICE, "sbft") +} diff --git a/consensus/simplebft/simplebft_test.go b/consensus/simplebft/simplebft_test.go new file mode 100644 index 00000000000..be7d9531628 --- /dev/null +++ b/consensus/simplebft/simplebft_test.go @@ -0,0 +1,526 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "reflect" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/op/go-logging" +) + +var testLog = logging.MustGetLogger("test") + +func init() { + logging.SetLevel(logging.NOTICE, "") + logging.SetLevel(logging.NOTICE, "test") + logging.SetLevel(logging.DEBUG, "sbft") +} + +func TestSBFT(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + r2 := []byte{3, 1, 2} + r3 := []byte{3, 5, 2} + repls[1].Request(r2) + repls[1].Request(r3) + sys.Run() + for _, a := range adapters { + if len(a.batches) != 2 { + t.Fatal("expected execution of 2 batches") + } + if !reflect.DeepEqual([][]byte{r1}, a.batches[0].Payloads) { + t.Error("wrong request executed (1)") + } + if !reflect.DeepEqual([][]byte{r2, r3}, a.batches[1].Payloads) { + t.Error("wrong request executed (2)") + } + } +} + +func TestSBFTDelayed(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + // make replica 3 lag out against 1 and 2 + for i := uint64(1); i < 3; i++ { + adapters[i].arrivals[3] = 200 * time.Millisecond + adapters[3].arrivals[i] = 200 * time.Millisecond + } + + r1 := []byte{1, 2, 3} + r2 := []byte{3, 1, 2} + repls[0].Request(r1) + repls[1].Request(r2) + sys.Run() + for i, a := range adapters { + if len(a.batches) != 2 { + t.Errorf("expected execution of 2 batches on %d", i) + continue + } + if !reflect.DeepEqual([][]byte{r1}, a.batches[0].Payloads) { + t.Error("wrong request executed (1)") + } + if !reflect.DeepEqual([][]byte{r2}, a.batches[1].Payloads) { + t.Error("wrong request executed (2)") + } + } +} + +func TestN1(t *testing.T) { + N := uint64(1) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 0, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + for _, a := range adapters { + if len(a.batches) != 1 { + t.Fatal("expected execution of 1 batch") + } + if !reflect.DeepEqual([][]byte{r1}, a.batches[0].Payloads) { + t.Error("wrong request executed (1)") + } + } +} + +func TestByzPrimary(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + r1 := []byte{1, 2, 3} + r2 := []byte{5, 6, 7} + + // change preprepare to 2, 3 + sys.filterFn = func(e testElem) (testElem, bool) { + if msg, ok := e.ev.(*testMsgEvent); ok { + if pp := msg.msg.GetPreprepare(); pp != nil && msg.src == 0 && msg.dst >= 2 { + pp := *pp + batch := *pp.Batch + batch.Payloads = [][]byte{r2} + pp.Batch = &batch + h := merkleHashData(batch.Payloads) + bh := &BatchHeader{} + proto.Unmarshal(pp.Batch.Header, bh) + bh.DataHash = h + bhraw, _ := proto.Marshal(bh) + pp.Batch.Header = bhraw + msg.msg = &Msg{&Msg_Preprepare{&pp}} + } + } + return e, true + } + + repls[0].Request(r1) + sys.Run() + for _, a := range adapters { + if len(a.batches) != 1 { + t.Fatal("expected execution of 1 batch") + } + if !reflect.DeepEqual([][]byte{r2}, a.batches[0].Payloads) { + t.Error("wrong request executed") + } + } +} + +func TestViewChange(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + // network outage after prepares are received + sys.filterFn = func(e testElem) (testElem, bool) { + if msg, ok := e.ev.(*testMsgEvent); ok { + if c := msg.msg.GetCommit(); c != nil && c.Seq.View == 0 { + return e, false + } + } + return e, true + } + + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + for _, a := range adapters { + if len(a.batches) != 1 { + t.Fatal("expected execution of 1 batch") + } + if !reflect.DeepEqual([][]byte{r1}, a.batches[0].Payloads) { + t.Error("wrong request executed (1)") + } + } +} + +func TestViewChangeXset(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 1, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + phase := 1 + + // network outage after prepares are received + sys.filterFn = func(e testElem) (testElem, bool) { + if msg, ok := e.ev.(*testMsgEvent); ok { + if msg.src == msg.dst { + return e, true + } + + switch phase { + case 1: + if p := msg.msg.GetPrepare(); p != nil && p.Seq.View == 0 { + return e, false + } + case 2: + if nv := msg.msg.GetNewView(); nv != nil { + phase = 3 + return e, true + } + if msg.src == 3 || msg.dst == 3 { + return e, false + } + if c := msg.msg.GetCommit(); c != nil && c.Seq.View == 1 { + return e, false + } + case 3: + if msg.src == 3 || msg.dst == 3 { + return e, false + } + } + } + return e, true + } + + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + phase = 2 + + r2 := []byte{5, 6, 7} + repls[1].Request(r2) + sys.Run() + + for i, a := range adapters { + // 3 is disconnected + if i == 3 { + continue + } + if len(a.batches) != 2 { + t.Fatal("expected execution of 1 null request + 1 batch") + } + if len(a.batches[0].Payloads) != 0 { + t.Error("not a null request") + } + if !reflect.DeepEqual([][]byte{r2}, a.batches[1].Payloads) { + t.Error("wrong request executed") + } + } +} + +func TestRestart(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + // move to view 1 + for _, r := range repls { + r.sendViewChange() + } + + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + + testLog.Notice("restarting 0") + repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + + r2 := []byte{3, 1, 2} + r3 := []byte{3, 5, 2} + repls[1].Request(r2) + repls[1].Request(r3) + sys.Run() + for _, a := range adapters { + if len(a.batches) != 3 { + t.Fatal("expected execution of 3 batches") + } + if !reflect.DeepEqual([][]byte{r1}, a.batches[1].Payloads) { + t.Error("wrong request executed (1)") + } + if !reflect.DeepEqual([][]byte{r2, r3}, a.batches[2].Payloads) { + t.Error("wrong request executed (2)") + } + } +} + +func TestRestartAfterPrepare(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + restarted := false + + // network outage after prepares are received + sys.filterFn = func(e testElem) (testElem, bool) { + if msg, ok := e.ev.(*testMsgEvent); ok { + if msg.src == msg.dst || msg.src != 0 { + return e, true + } + + if p := msg.msg.GetPrepare(); p != nil && p.Seq.Seq == 3 && !restarted { + restarted = true + repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + } + } + + return e, true + } + + // move to view 1 + for _, r := range repls { + r.sendViewChange() + } + + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + + r2 := []byte{3, 1, 2} + r3 := []byte{3, 5, 2} + repls[1].Request(r2) + repls[1].Request(r3) + sys.Run() + for _, a := range adapters { + if len(a.batches) != 3 { + t.Fatal("expected execution of 3 batches") + } + if !reflect.DeepEqual([][]byte{r1}, a.batches[1].Payloads) { + t.Error("wrong request executed (1)") + } + if !reflect.DeepEqual([][]byte{r2, r3}, a.batches[2].Payloads) { + t.Error("wrong request executed (2)") + } + } +} + +func TestRestartAfterCommit(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + restarted := false + + // network outage after prepares are received + sys.filterFn = func(e testElem) (testElem, bool) { + if msg, ok := e.ev.(*testMsgEvent); ok { + if msg.src == msg.dst || msg.src != 0 { + return e, true + } + + if c := msg.msg.GetCommit(); c != nil && c.Seq.Seq == 3 && !restarted { + restarted = true + testLog.Notice("restarting 0") + repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + } + } + + return e, true + } + + // move to view 1 + for _, r := range repls { + r.sendViewChange() + } + + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + + r2 := []byte{3, 1, 2} + r3 := []byte{3, 5, 2} + repls[1].Request(r2) + repls[1].Request(r3) + sys.Run() + for _, a := range adapters { + if len(a.batches) != 3 { + t.Fatal("expected execution of 3 batches") + } + if !reflect.DeepEqual([][]byte{r1}, a.batches[1].Payloads) { + t.Error("wrong request executed (1)") + } + if !reflect.DeepEqual([][]byte{r2, r3}, a.batches[2].Payloads) { + t.Error("wrong request executed (2)") + } + } +} + +func TestRestartAfterCheckpoint(t *testing.T) { + N := uint64(4) + sys := newTestSystem(N) + var repls []*SBFT + var adapters []*testSystemAdapter + for i := uint64(0); i < N; i++ { + a := sys.NewAdapter(i) + s, err := New(i, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, a) + if err != nil { + t.Fatal(err) + } + repls = append(repls, s) + adapters = append(adapters, a) + } + + restarted := false + + // network outage after prepares are received + sys.filterFn = func(e testElem) (testElem, bool) { + if msg, ok := e.ev.(*testMsgEvent); ok { + if msg.src == msg.dst || msg.src != 0 { + return e, true + } + + if c := msg.msg.GetCheckpoint(); c != nil && c.Seq == 3 && !restarted { + restarted = true + testLog.Notice("restarting 0") + repls[0], _ = New(0, &Config{N: N, F: 1, BatchDurationNsec: 2000000000, BatchSizeBytes: 10, RequestTimeoutNsec: 20000000000}, adapters[0]) + } + } + + return e, true + } + + // move to view 1 + for _, r := range repls { + r.sendViewChange() + } + + r1 := []byte{1, 2, 3} + repls[0].Request(r1) + sys.Run() + + r2 := []byte{3, 1, 2} + r3 := []byte{3, 5, 2} + repls[1].Request(r2) + repls[1].Request(r3) + sys.Run() + for _, a := range adapters { + if len(a.batches) != 3 { + t.Fatal("expected execution of 3 batches") + } + if !reflect.DeepEqual([][]byte{r1}, a.batches[1].Payloads) { + t.Error("wrong request executed (1)") + } + if !reflect.DeepEqual([][]byte{r2, r3}, a.batches[2].Payloads) { + t.Error("wrong request executed (2)") + } + } +} diff --git a/consensus/simplebft/testsys_test.go b/consensus/simplebft/testsys_test.go new file mode 100644 index 00000000000..bfbdaab84d7 --- /dev/null +++ b/consensus/simplebft/testsys_test.go @@ -0,0 +1,280 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "crypto/ecdsa" + "crypto/elliptic" + crand "crypto/rand" + "crypto/sha256" + "encoding/asn1" + "fmt" + "math/big" + "math/rand" + "reflect" + "runtime" + "time" + + "github.com/golang/protobuf/proto" +) + +type testSystemAdapter struct { + id uint64 + sys *testSystem + receiver Receiver + + batches []*Batch + arrivals map[uint64]time.Duration + persistence map[string][]byte + + key *ecdsa.PrivateKey +} + +func (t *testSystemAdapter) SetReceiver(recv Receiver) { + if t.receiver != nil { + // remove all events for us + t.sys.queue.filter(func(e testElem) bool { + switch e := e.ev.(type) { + case *testTimer: + if e.id == t.id { + return false + } + case *testMsgEvent: + if e.dst == t.id { + return false + } + } + return true + }) + } + + t.receiver = recv +} + +func (t *testSystemAdapter) Send(msg *Msg, dest uint64) { + // XXX for now, define fixed variance per destination + arr, ok := t.arrivals[dest] + if !ok { + inflight := 20 * time.Millisecond + variance := 1 * time.Millisecond + if dest == t.id { + inflight = 0 + } + variance = time.Duration(t.sys.rand.Int31n(int32(variance))) + arr = inflight + variance + t.arrivals[dest] = arr + } + + ev := &testMsgEvent{ + inflight: arr, + src: t.id, + dst: dest, + msg: msg, + } + // simulate time for marshalling (and unmarshalling) + bytes, _ := proto.Marshal(msg) + m2 := &Msg{} + _ = proto.Unmarshal(bytes, m2) + t.sys.enqueue(arr, ev) +} + +type testMsgEvent struct { + inflight time.Duration + src, dst uint64 + msg *Msg +} + +func (ev *testMsgEvent) Exec(t *testSystem) { + r := t.adapters[ev.dst] + if r == nil { + testLog.Errorf("message to non-existing %s", ev) + return + } + r.receiver.Receive(ev.msg, ev.src) +} + +func (ev *testMsgEvent) String() string { + return fmt.Sprintf("Message", t.id, t.cancelled, fun) +} + +func (t *testSystemAdapter) Timer(d time.Duration, tf func()) Canceller { + tt := &testTimer{id: t.id, tf: tf} + t.sys.enqueue(d, tt) + return tt +} + +func (t *testSystemAdapter) Deliver(batch *Batch) { + t.batches = append(t.batches, batch) +} + +func (t *testSystemAdapter) Persist(key string, data proto.Message) { + if data == nil { + delete(t.persistence, key) + } else { + bytes, err := proto.Marshal(data) + if err != nil { + panic(err) + } + t.persistence[key] = bytes + } +} + +func (t *testSystemAdapter) Restore(key string, out proto.Message) bool { + val, ok := t.persistence[key] + if !ok { + return false + } + err := proto.Unmarshal(val, out) + return (err == nil) +} + +func (t *testSystemAdapter) LastBatch() *Batch { + if len(t.batches) == 0 { + return t.receiver.(*SBFT).makeBatch(0, []byte("ROOTHASH"), nil) + } else { + return t.batches[len(t.batches)-1] + } +} + +func (t *testSystemAdapter) Sign(data []byte) []byte { + hash := sha256.Sum256(data) + r, s, err := ecdsa.Sign(crand.Reader, t.key, hash[:]) + if err != nil { + panic(err) + } + sig, err := asn1.Marshal(struct{ R, S *big.Int }{r, s}) + if err != nil { + panic(err) + } + return sig +} + +func (t *testSystemAdapter) CheckSig(data []byte, src uint64, sig []byte) error { + rs := struct{ R, S *big.Int }{} + rest, err := asn1.Unmarshal(sig, &rs) + if err != nil { + return err + } + if len(rest) != 0 { + return fmt.Errorf("invalid signature") + } + hash := sha256.Sum256(data) + ok := ecdsa.Verify(&t.sys.adapters[src].key.PublicKey, hash[:], rs.R, rs.S) + if !ok { + return fmt.Errorf("invalid signature") + } + return nil +} + +// ============================================== + +type testEvent interface { + Exec(t *testSystem) +} + +// ============================================== + +type testSystem struct { + rand *rand.Rand + now time.Duration + queue *calendarQueue + adapters map[uint64]*testSystemAdapter + filterFn func(testElem) (testElem, bool) +} + +type testElem struct { + at time.Duration + ev testEvent +} + +func (t testElem) String() string { + return fmt.Sprintf("Event<%s: %s>", t.at, t.ev) +} + +func newTestSystem(n uint64) *testSystem { + return &testSystem{ + rand: rand.New(rand.NewSource(0)), + adapters: make(map[uint64]*testSystemAdapter), + queue: newCalendarQueue(time.Millisecond/time.Duration(n*n), int(n*n)), + } +} + +func (t *testSystem) NewAdapter(id uint64) *testSystemAdapter { + key, err := ecdsa.GenerateKey(elliptic.P256(), crand.Reader) + if err != nil { + panic(err) + } + a := &testSystemAdapter{ + id: id, + sys: t, + arrivals: make(map[uint64]time.Duration), + persistence: make(map[string][]byte), + key: key, + } + t.adapters[id] = a + return a +} + +func (t *testSystem) enqueue(d time.Duration, ev testEvent) { + e := testElem{at: t.now + d, ev: ev} + if t.filterFn != nil { + var keep bool + e, keep = t.filterFn(e) + if !keep { + return + } + } + testLog.Debugf("enqueuing %s\n", e) + t.queue.Add(e) +} + +func (t *testSystem) Run() { + for { + e, ok := t.queue.Pop() + if !ok { + break + } + t.now = e.at + testLog.Debugf("executing %s\n", e) + e.ev.Exec(t) + } + + testLog.Debugf("max len: %d", t.queue.maxLen) + t.queue.maxLen = 0 +} diff --git a/consensus/simplebft/testsys_test_test.go b/consensus/simplebft/testsys_test_test.go new file mode 100644 index 00000000000..e222aa25ad1 --- /dev/null +++ b/consensus/simplebft/testsys_test_test.go @@ -0,0 +1,96 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import ( + "crypto" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/rsa" + "crypto/sha256" + "testing" + "time" +) + +func TestSys(t *testing.T) { + s := newTestSystem(1) + called := false + s.enqueue(1*time.Second, &testTimer{tf: func() { + called = true + }}) + s.Run() + if !called { + t.Fatal("expected execution") + } +} + +// func TestMsg(t *testing.T) { +// s := newTestSystem() +// a := s.NewAdapter(0) +// called := false +// a.Send(nil, 0) +// s.enqueue(1*time.Second, &testTimer{tf: func() { +// called = true +// }}) +// s.Run() +// if !called { +// t.Fatal("expected execution") +// } +// } + +func BenchmarkEcdsaSign(b *testing.B) { + key, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + val := sha256.Sum256(make([]byte, 32, 32)) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ecdsa.Sign(rand.Reader, key, val[:]) + } +} + +func BenchmarkRsaSign(b *testing.B) { + key, _ := rsa.GenerateKey(rand.Reader, 2048) + val := sha256.Sum256(make([]byte, 32, 32)) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rsa.SignPSS(rand.Reader, key, crypto.SHA256, val[:], nil) + } +} + +func BenchmarkEcdsaVerify(b *testing.B) { + key, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + val := sha256.Sum256(make([]byte, 32, 32)) + r, s, _ := ecdsa.Sign(rand.Reader, key, val[:]) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ecdsa.Verify(&key.PublicKey, val[:], r, s) + } +} + +func BenchmarkRsaVerify(b *testing.B) { + key, _ := rsa.GenerateKey(rand.Reader, 2048) + val := sha256.Sum256(make([]byte, 32, 32)) + sig, _ := rsa.SignPSS(rand.Reader, key, crypto.SHA256, val[:], nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + rsa.VerifyPSS(&key.PublicKey, crypto.SHA256, val[:], sig, nil) + } +} diff --git a/consensus/simplebft/viewchange.go b/consensus/simplebft/viewchange.go new file mode 100644 index 00000000000..fea01ee5658 --- /dev/null +++ b/consensus/simplebft/viewchange.go @@ -0,0 +1,101 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import "time" + +func (s *SBFT) sendViewChange() { + s.seq.View = s.nextView() + s.cur.timeout.Cancel() + s.activeView = false + for r, vs := range s.viewchange { + if vs.vc.View < s.seq.View { + delete(s.viewchange, r) + } + } + log.Noticef("sending viewchange for view %d", s.seq.View) + + var q, p []*Subject + if s.cur.sentCommit { + p = append(p, &s.cur.subject) + } + if s.cur.preprep != nil { + q = append(q, &s.cur.subject) + } + + vc := &ViewChange{ + View: s.seq.View, + Qset: q, + Pset: p, + Executed: s.seq.Seq, + } + svc := s.sign(vc) + s.viewChangeTimer.Cancel() + s.viewChangeTimer = s.sys.Timer(s.viewChangeTimeout, func() { + s.viewChangeTimeout *= 2 + log.Notice("view change timed out, sending next") + s.sendViewChange() + }) + s.broadcast(&Msg{&Msg_ViewChange{svc}}) + + s.processNewView() +} + +func (s *SBFT) cancelViewChangeTimer() { + s.viewChangeTimer.Cancel() + s.viewChangeTimeout = time.Duration(s.config.RequestTimeoutNsec) * 2 +} + +func (s *SBFT) handleViewChange(svc *Signed, src uint64) { + vc := &ViewChange{} + err := s.checkSig(svc, src, vc) + if err != nil { + log.Noticef("invalid viewchange: %s", err) + return + } + if vc.View < s.seq.View { + log.Debugf("old view change from %s for view %d, we are in view %d", src, vc.View, s.seq.View) + return + } + if ovc, ok := s.viewchange[src]; ok && vc.View <= ovc.vc.View { + log.Noticef("duplicate view change for %d from %d", vc.View, src) + return + } + + log.Infof("viewchange from %d for view %d", src, vc.View) + s.viewchange[src] = &viewChangeInfo{svc: svc, vc: vc} + + if len(s.viewchange) == s.oneCorrectQuorum() { + min := vc.View + for _, vc := range s.viewchange { + if vc.vc.View < min { + min = vc.vc.View + } + } + // catch up to the minimum view + if s.seq.View < min { + log.Notice("we are behind on view change, resending for newer view") + s.seq.View = min - 1 + s.sendViewChange() + return + } + } + + if s.isPrimary() { + s.maybeSendNewView() + } +} diff --git a/consensus/simplebft/xset.go b/consensus/simplebft/xset.go new file mode 100644 index 00000000000..2cb94a5a1b9 --- /dev/null +++ b/consensus/simplebft/xset.go @@ -0,0 +1,154 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simplebft + +import "reflect" + +func (s *SBFT) makeXset(vcs []*ViewChange) (*Subject, bool) { + // first select base commit (equivalent to checkpoint/low water mark) + // 1. need weak quorum + quora := make(map[uint64]int) + for _, vc := range vcs { + quora[vc.Executed] += 1 + } + best := uint64(0) + found := false + for seq, count := range quora { + if count < s.oneCorrectQuorum() { + continue + } + // 2. need 2f+1 from S below (or equal to) seq + sum := 0 + for seq2, count2 := range quora { + if seq2 <= seq { + sum += count2 + } + } + if sum < s.noFaultyQuorum() { + continue + } + found = true + if seq > best { + best = seq + } + } + if !found { + return nil, false + } + + log.Debugf("xset starts at commit %d", best) + + // now determine which request could have executed for best+1 + next := best + 1 + var xset *Subject + + // find some message m in S, + emptycount := 0 +nextm: + for _, m := range vcs { + notfound := true + // which has in its Pset + for _, mtuple := range m.Pset { + log.Debugf("trying %v", mtuple) + if mtuple.Seq.Seq < next { + continue + } + + // we found an entry for next + notfound = false + + // A1. where 2f+1 messages mp from S + count := 0 + nextmp: + for _, mp := range vcs { + // "low watermark" is less than n + if mp.Executed > mtuple.Seq.Seq { + continue + } + // and all in its Pset + for _, mptuple := range mp.Pset { + log.Debugf(" matching %v", mptuple) + if mptuple.Seq.Seq != mtuple.Seq.Seq { + continue + } + + // either v' < v or (v' == v and d' == d) + if mptuple.Seq.View < mtuple.Seq.View || + (mptuple.Seq.View == mtuple.Seq.View && reflect.DeepEqual(mptuple.Digest, mtuple.Digest)) { + continue + } else { + continue nextmp + } + } + count += 1 + } + if count < s.noFaultyQuorum() { + continue + } + log.Debugf("found %d replicas for Pset %d/%d", count, mtuple.Seq.Seq, mtuple.Seq.View) + + // A2. f+1 messages mp from S + count = 0 + for _, mp := range vcs { + // and all in its Qset + for _, mptuple := range mp.Qset { + if mptuple.Seq.Seq != mtuple.Seq.Seq { + continue + } + if mptuple.Seq.View < mtuple.Seq.View { + continue + } + // d' == d + if !reflect.DeepEqual(mptuple.Digest, mtuple.Digest) { + continue + } + count += 1 + // there exists one ... + break + } + } + if count < s.oneCorrectQuorum() { + continue + } + log.Debugf("found %d replicas for Qset %d", count, mtuple.Seq.Seq) + + log.Debugf("selecting %d with %x", next, mtuple.Digest) + xset = &Subject{ + Seq: &Seq{Seq: next, View: s.seq.View}, + Digest: mtuple.Digest, + } + break nextm + } + + if notfound { + emptycount += 1 + } + } + if emptycount >= s.noFaultyQuorum() { + log.Debugf("selecting null request for %d", next) + xset = &Subject{ + Seq: &Seq{Seq: next, View: s.seq.View}, + Digest: nil, + } + } + + if xset == nil { + return nil, false + } + + return xset, true +}