From 6657459b845a7a038e1a7f3d902e55b9649da792 Mon Sep 17 00:00:00 2001 From: Marko Vukolic Date: Thu, 26 Jan 2017 15:18:33 +0100 Subject: [PATCH] [FAB-1840] Refactor SBFT blockcutter support This commit - renames Ordered to Validate in System, - uses a string -> bool map to store validated requests, - adds requests to that map in pending iteration (this was missing), - changes block cutter induced panics into viewchanges (for non-primary peers). Change-Id: Ie108f9cc9cc20a26253764d29745c281ad841357 Signed-off-by: Marko Vukolic Signed-off-by: Gabor Hosszu --- orderer/sbft/backend/backend.go | 2 +- orderer/sbft/simplebft/batch.go | 29 +++++++++++++ orderer/sbft/simplebft/connection.go | 2 +- orderer/sbft/simplebft/newview.go | 7 ++-- orderer/sbft/simplebft/preprepare.go | 56 ++++---------------------- orderer/sbft/simplebft/request.go | 23 ++++++----- orderer/sbft/simplebft/simplebft.go | 10 ++--- orderer/sbft/simplebft/testsys_test.go | 2 +- 8 files changed, 61 insertions(+), 70 deletions(-) diff --git a/orderer/sbft/backend/backend.go b/orderer/sbft/backend/backend.go index 25400778731..99a052e8f67 100644 --- a/orderer/sbft/backend/backend.go +++ b/orderer/sbft/backend/backend.go @@ -247,7 +247,7 @@ func (b *Backend) AddSbftPeer(chainID string, support multichain.ConsenterSuppor return s.New(b.GetMyId(), chainID, config, b) } -func (b *Backend) Ordered(chainID string, req *s.Request) ([][]*s.Request, [][]filter.Committer, bool) { +func (b *Backend) Validate(chainID string, req *s.Request) ([][]*s.Request, [][]filter.Committer, bool) { // ([][]*cb.Envelope, [][]filter.Committer, bool) { // If the message is a valid normal message and fills a batch, the batch, committers, true is returned // If the message is a valid special message (like a config message) it terminates the current batch diff --git a/orderer/sbft/simplebft/batch.go b/orderer/sbft/simplebft/batch.go index b1e5413e37a..1b7ec1873d2 100644 --- a/orderer/sbft/simplebft/batch.go +++ b/orderer/sbft/simplebft/batch.go @@ -21,6 +21,7 @@ import ( "reflect" "github.com/golang/protobuf/proto" + "github.com/hyperledger/fabric/orderer/common/filter" ) func (s *SBFT) makeBatch(seq uint64, prevHash []byte, data [][]byte) *Batch { @@ -90,3 +91,31 @@ func (b *Batch) DecodeHeader() *BatchHeader { return batchheader } + +func (s *SBFT) getCommittersFromBatch(reqBatch *Batch) (bool, []filter.Committer) { + reqs := make([]*Request, 0, len(reqBatch.Payloads)) + for _, pl := range reqBatch.Payloads { + req := &Request{Payload: pl} + reqs = append(reqs, req) + } + batches := make([][]*Request, 0, 1) + comms := [][]filter.Committer{} + for _, r := range reqs { + b, c, valid := s.sys.Validate(s.chainId, r) + if !valid { + return false, nil + } + batches = append(batches, b...) + comms = append(comms, c...) + } + if len(batches) > 1 || len(batches) != len(comms) { + return false, nil + } + + if len(batches) == 0 { + _, committer := s.sys.Cut(s.chainId) + return true, committer + } else { + return true, comms[0] + } +} diff --git a/orderer/sbft/simplebft/connection.go b/orderer/sbft/simplebft/connection.go index 36b66697fcd..e8bc816230f 100644 --- a/orderer/sbft/simplebft/connection.go +++ b/orderer/sbft/simplebft/connection.go @@ -75,7 +75,7 @@ func (s *SBFT) handleHello(h *Hello, src uint64) { if s.sys.LastBatch(s.chainId).DecodeHeader().Seq < bh.Seq { log.Debugf("replica %d: delivering batches %d after hello from replica %d", s.id, bh.Seq, src) - blockOK, committers := s.getCommittersFromBlockCutter(h.Batch) + blockOK, committers := s.getCommittersFromBatch(h.Batch) if blockOK { s.deliverBatch(h.Batch, committers) } else { diff --git a/orderer/sbft/simplebft/newview.go b/orderer/sbft/simplebft/newview.go index 1d7312e143a..da028f22bfd 100644 --- a/orderer/sbft/simplebft/newview.go +++ b/orderer/sbft/simplebft/newview.go @@ -159,7 +159,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) { } } // TODO we should not do this here, as prevBatch was already delivered - blockOK, committers := s.getCommittersFromBlockCutter(prevBatch) + blockOK, committers := s.getCommittersFromBatch(prevBatch) if !blockOK { log.Panic("Replica %d: our last checkpointed batch is erroneous (block cutter).", s.id) } @@ -180,9 +180,10 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) { Seq: &SeqView{Seq: nv.Batch.DecodeHeader().Seq, View: s.view}, Batch: nv.Batch, } - blockOK, committers := s.getCommittersFromBlockCutter(nv.Batch) + blockOK, committers := s.getCommittersFromBatch(nv.Batch) if !blockOK { - log.Panic("Replica %d: new view %d batch erroneous (block cutter).", s.id, nv.View) + log.Debugf("Replica %d: new view %d batch erroneous (block cutter).", s.id, nv.View) + s.sendViewChange() } s.handleCheckedPreprepare(pp, committers) diff --git a/orderer/sbft/simplebft/preprepare.go b/orderer/sbft/simplebft/preprepare.go index 7f0f10594f1..5d3cb97aff2 100644 --- a/orderer/sbft/simplebft/preprepare.go +++ b/orderer/sbft/simplebft/preprepare.go @@ -78,7 +78,13 @@ func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) { log.Infof("replica %d: preprepare batches prev hash does not match expected %s, got %s", s.id, hash2str(batchheader.PrevHash), hash2str(prevhash)) return } - committers := s.getCommitters(pp) + + blockOK, committers := s.getCommittersFromBatch(pp.Batch) + if !blockOK { + log.Debugf("Replica %d found Byzantine block in preprepare, Seq: %d View: %d", s.id, pp.Seq.Seq, pp.Seq.View) + s.sendViewChange() + return + } log.Infof("replica %d: handlePrepare", s.id) s.handleCheckedPreprepare(pp, committers) } @@ -100,26 +106,6 @@ func (s *SBFT) acceptPreprepare(pp *Preprepare, committers []filter.Committer) { } } -func (s *SBFT) getCommitters(pp *Preprepare) []filter.Committer { - // if we are the primary, we can be sure the block is OK - // and we also have the committers - // TODO what to do with the remaining ones??? - // how to mantain the mapping between batches and committers? - var committers []filter.Committer - - if !s.isPrimary() { - blockOK, allcommitters := s.getCommittersFromBlockCutter(pp.Batch) - if !blockOK { - log.Panicf("Replica %d found Byzantine block, Seq: %d View: %d", s.id, pp.Seq.Seq, pp.Seq.View) - } - committers = allcommitters - } else { - committers = s.primarycommitters[0] - s.primarycommitters = s.primarycommitters[1:] - } - return committers -} - func (s *SBFT) handleCheckedPreprepare(pp *Preprepare, committers []filter.Committer) { s.acceptPreprepare(pp, committers) if !s.isPrimary() { @@ -130,34 +116,6 @@ func (s *SBFT) handleCheckedPreprepare(pp *Preprepare, committers []filter.Commi s.maybeSendCommit() } -func (s *SBFT) getCommittersFromBlockCutter(reqBatch *Batch) (bool, []filter.Committer) { - reqs := make([]*Request, 0, len(reqBatch.Payloads)) - for _, pl := range reqBatch.Payloads { - req := &Request{Payload: pl} - reqs = append(reqs, req) - } - batches := make([][]*Request, 0, 1) - comms := [][]filter.Committer{} - for _, r := range reqs { - b, c, accepted := s.sys.Ordered(s.chainId, r) - if !accepted { - return false, nil - } - batches = append(batches, b...) - comms = append(comms, c...) - } - if len(batches) > 1 || len(batches) != len(comms) { - return false, nil - } - - if len(batches) == 0 { - _, committer := s.sys.Cut(s.chainId) - return true, committer - } else { - return true, comms[0] - } -} - //////////////////////////////////////////////// func (s *SBFT) requestTimeout() { diff --git a/orderer/sbft/simplebft/request.go b/orderer/sbft/simplebft/request.go index f3e6b17a1da..b41acb06dca 100644 --- a/orderer/sbft/simplebft/request.go +++ b/orderer/sbft/simplebft/request.go @@ -31,12 +31,13 @@ func (s *SBFT) handleRequest(req *Request, src uint64) { log.Infof("replica %d: inserting %x into pending", s.id, key) s.pending[key] = req if s.isPrimary() && s.activeView { - s.passedToBC[key] = req - batches, committers, accepted := s.sys.Ordered(s.chainId, req) - if !accepted { + batches, committers, valid := s.sys.Validate(s.chainId, req) + if !valid { // this one is problematic, lets skip it + delete(s.pending, key) return } + s.validated[key] = valid if len(batches) == 0 { s.startBatchTimer() } else { @@ -90,23 +91,25 @@ func (s *SBFT) maybeSendNextBatch() { if len(s.batches) == 0 { hasPending := len(s.pending) != 0 for k, req := range s.pending { - if s.passedToBC[k] == nil { - batches, committers, accepted := s.sys.Ordered(s.chainId, req) + if s.validated[k] == false { + batches, committers, valid := s.sys.Validate(s.chainId, req) s.batches = append(s.batches, batches...) s.primarycommitters = append(s.primarycommitters, committers...) - if !accepted { + if !valid { log.Panicf("Replica %d: one of our own pending requests is erroneous.", s.id) + delete(s.pending, k) + continue } + s.validated[k] = true } } if len(s.batches) == 0 { - // if we have no pending, every req was included - // in a batches + // if we have no pending, every req was included in batches if !hasPending { return } - // we have pending reqs that were just send to BC or - // were already sent (they are in passedToBC) + // we have pending reqs that were just sent for validation or + // were already sent (they are in s.validated) batch, committers := s.sys.Cut(s.chainId) s.batches = append(s.batches, batch) s.primarycommitters = append(s.primarycommitters, committers) diff --git a/orderer/sbft/simplebft/simplebft.go b/orderer/sbft/simplebft/simplebft.go index d35bf6c03aa..a426ab8c17a 100644 --- a/orderer/sbft/simplebft/simplebft.go +++ b/orderer/sbft/simplebft/simplebft.go @@ -52,7 +52,7 @@ type System interface { Sign(data []byte) []byte CheckSig(data []byte, src uint64, sig []byte) error Reconnect(chainId string, replica uint64) - Ordered(chainID string, req *Request) ([][]*Request, [][]filter.Committer, bool) + Validate(chainID string, req *Request) ([][]*Request, [][]filter.Committer, bool) Cut(chainID string) ([]*Request, []filter.Committer) } @@ -77,7 +77,7 @@ type SBFT struct { viewChangeTimer Canceller replicaState []replicaInfo pending map[string]*Request - passedToBC map[string]*Request + validated map[string]bool chainId string primarycommitters [][]filter.Committer } @@ -122,7 +122,7 @@ func New(id uint64, chainID string, config *Config, sys System) (*SBFT, error) { viewChangeTimer: dummyCanceller{}, replicaState: make([]replicaInfo, config.N), pending: make(map[string]*Request), - passedToBC: make(map[string]*Request), + validated: make(map[string]bool), batches: make([][]*Request, 0, 3), primarycommitters: make([][]filter.Committer, 0), } @@ -155,7 +155,7 @@ func New(id uint64, chainID string, config *Config, sys System) (*SBFT, error) { s.activeView = true if pp.Seq.Seq > s.seq() { // TODO double add to BC? - _, committers := s.getCommittersFromBlockCutter(pp.Batch) + _, committers := s.getCommittersFromBatch(pp.Batch) s.acceptPreprepare(pp, committers) } } @@ -288,6 +288,6 @@ func (s *SBFT) deliverBatch(batch *Batch, committers []filter.Committer) { key := hash2str(hash(req)) log.Infof("replica %d: attempting to remove %x from pending", s.id, key) delete(s.pending, key) - delete(s.passedToBC, key) + delete(s.validated, key) } } diff --git a/orderer/sbft/simplebft/testsys_test.go b/orderer/sbft/simplebft/testsys_test.go index 7904f5599a3..ccc2876539d 100644 --- a/orderer/sbft/simplebft/testsys_test.go +++ b/orderer/sbft/simplebft/testsys_test.go @@ -168,7 +168,7 @@ func (t *testSystemAdapter) Deliver(chainId string, batch *Batch, committer []fi t.batches[chainId] = append(t.batches[chainId], batch) } -func (t *testSystemAdapter) Ordered(chainID string, req *Request) ([][]*Request, [][]filter.Committer, bool) { +func (t *testSystemAdapter) Validate(chainID string, req *Request) ([][]*Request, [][]filter.Committer, bool) { r := t.reqs if t.reqs == nil || uint64(len(t.reqs)) == maxReqCount-uint64(1) { t.reqs = make([]*Request, 0, maxReqCount-1)