Skip to content

Commit

Permalink
[FAB-1840] Refactor SBFT blockcutter support
Browse files Browse the repository at this point in the history
This commit
- renames Ordered to Validate in System,
- uses a string -> bool map to store
  validated requests,
- adds requests to that map in pending
  iteration (this was missing),
- changes block cutter induced panics into
  viewchanges (for non-primary peers).

Change-Id: Ie108f9cc9cc20a26253764d29745c281ad841357
Signed-off-by: Marko Vukolic <mvu@zurich.ibm.com>
Signed-off-by: Gabor Hosszu <gabor@digitalasset.com>
  • Loading branch information
Marko Vukolic authored and gaborh-da committed Feb 2, 2017
1 parent dffcaf4 commit 6657459
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 70 deletions.
2 changes: 1 addition & 1 deletion orderer/sbft/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (b *Backend) AddSbftPeer(chainID string, support multichain.ConsenterSuppor
return s.New(b.GetMyId(), chainID, config, b)
}

func (b *Backend) Ordered(chainID string, req *s.Request) ([][]*s.Request, [][]filter.Committer, bool) {
func (b *Backend) Validate(chainID string, req *s.Request) ([][]*s.Request, [][]filter.Committer, bool) {
// ([][]*cb.Envelope, [][]filter.Committer, bool) {
// If the message is a valid normal message and fills a batch, the batch, committers, true is returned
// If the message is a valid special message (like a config message) it terminates the current batch
Expand Down
29 changes: 29 additions & 0 deletions orderer/sbft/simplebft/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/orderer/common/filter"
)

func (s *SBFT) makeBatch(seq uint64, prevHash []byte, data [][]byte) *Batch {
Expand Down Expand Up @@ -90,3 +91,31 @@ func (b *Batch) DecodeHeader() *BatchHeader {

return batchheader
}

func (s *SBFT) getCommittersFromBatch(reqBatch *Batch) (bool, []filter.Committer) {
reqs := make([]*Request, 0, len(reqBatch.Payloads))
for _, pl := range reqBatch.Payloads {
req := &Request{Payload: pl}
reqs = append(reqs, req)
}
batches := make([][]*Request, 0, 1)
comms := [][]filter.Committer{}
for _, r := range reqs {
b, c, valid := s.sys.Validate(s.chainId, r)
if !valid {
return false, nil
}
batches = append(batches, b...)
comms = append(comms, c...)
}
if len(batches) > 1 || len(batches) != len(comms) {
return false, nil
}

if len(batches) == 0 {
_, committer := s.sys.Cut(s.chainId)
return true, committer
} else {
return true, comms[0]
}
}
2 changes: 1 addition & 1 deletion orderer/sbft/simplebft/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *SBFT) handleHello(h *Hello, src uint64) {

if s.sys.LastBatch(s.chainId).DecodeHeader().Seq < bh.Seq {
log.Debugf("replica %d: delivering batches %d after hello from replica %d", s.id, bh.Seq, src)
blockOK, committers := s.getCommittersFromBlockCutter(h.Batch)
blockOK, committers := s.getCommittersFromBatch(h.Batch)
if blockOK {
s.deliverBatch(h.Batch, committers)
} else {
Expand Down
7 changes: 4 additions & 3 deletions orderer/sbft/simplebft/newview.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
}
}
// TODO we should not do this here, as prevBatch was already delivered
blockOK, committers := s.getCommittersFromBlockCutter(prevBatch)
blockOK, committers := s.getCommittersFromBatch(prevBatch)
if !blockOK {
log.Panic("Replica %d: our last checkpointed batch is erroneous (block cutter).", s.id)
}
Expand All @@ -180,9 +180,10 @@ func (s *SBFT) handleNewView(nv *NewView, src uint64) {
Seq: &SeqView{Seq: nv.Batch.DecodeHeader().Seq, View: s.view},
Batch: nv.Batch,
}
blockOK, committers := s.getCommittersFromBlockCutter(nv.Batch)
blockOK, committers := s.getCommittersFromBatch(nv.Batch)
if !blockOK {
log.Panic("Replica %d: new view %d batch erroneous (block cutter).", s.id, nv.View)
log.Debugf("Replica %d: new view %d batch erroneous (block cutter).", s.id, nv.View)
s.sendViewChange()
}

s.handleCheckedPreprepare(pp, committers)
Expand Down
56 changes: 7 additions & 49 deletions orderer/sbft/simplebft/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ func (s *SBFT) handlePreprepare(pp *Preprepare, src uint64) {
log.Infof("replica %d: preprepare batches prev hash does not match expected %s, got %s", s.id, hash2str(batchheader.PrevHash), hash2str(prevhash))
return
}
committers := s.getCommitters(pp)

blockOK, committers := s.getCommittersFromBatch(pp.Batch)
if !blockOK {
log.Debugf("Replica %d found Byzantine block in preprepare, Seq: %d View: %d", s.id, pp.Seq.Seq, pp.Seq.View)
s.sendViewChange()
return
}
log.Infof("replica %d: handlePrepare", s.id)
s.handleCheckedPreprepare(pp, committers)
}
Expand All @@ -100,26 +106,6 @@ func (s *SBFT) acceptPreprepare(pp *Preprepare, committers []filter.Committer) {
}
}

func (s *SBFT) getCommitters(pp *Preprepare) []filter.Committer {
// if we are the primary, we can be sure the block is OK
// and we also have the committers
// TODO what to do with the remaining ones???
// how to mantain the mapping between batches and committers?
var committers []filter.Committer

if !s.isPrimary() {
blockOK, allcommitters := s.getCommittersFromBlockCutter(pp.Batch)
if !blockOK {
log.Panicf("Replica %d found Byzantine block, Seq: %d View: %d", s.id, pp.Seq.Seq, pp.Seq.View)
}
committers = allcommitters
} else {
committers = s.primarycommitters[0]
s.primarycommitters = s.primarycommitters[1:]
}
return committers
}

func (s *SBFT) handleCheckedPreprepare(pp *Preprepare, committers []filter.Committer) {
s.acceptPreprepare(pp, committers)
if !s.isPrimary() {
Expand All @@ -130,34 +116,6 @@ func (s *SBFT) handleCheckedPreprepare(pp *Preprepare, committers []filter.Commi
s.maybeSendCommit()
}

func (s *SBFT) getCommittersFromBlockCutter(reqBatch *Batch) (bool, []filter.Committer) {
reqs := make([]*Request, 0, len(reqBatch.Payloads))
for _, pl := range reqBatch.Payloads {
req := &Request{Payload: pl}
reqs = append(reqs, req)
}
batches := make([][]*Request, 0, 1)
comms := [][]filter.Committer{}
for _, r := range reqs {
b, c, accepted := s.sys.Ordered(s.chainId, r)
if !accepted {
return false, nil
}
batches = append(batches, b...)
comms = append(comms, c...)
}
if len(batches) > 1 || len(batches) != len(comms) {
return false, nil
}

if len(batches) == 0 {
_, committer := s.sys.Cut(s.chainId)
return true, committer
} else {
return true, comms[0]
}
}

////////////////////////////////////////////////

func (s *SBFT) requestTimeout() {
Expand Down
23 changes: 13 additions & 10 deletions orderer/sbft/simplebft/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ func (s *SBFT) handleRequest(req *Request, src uint64) {
log.Infof("replica %d: inserting %x into pending", s.id, key)
s.pending[key] = req
if s.isPrimary() && s.activeView {
s.passedToBC[key] = req
batches, committers, accepted := s.sys.Ordered(s.chainId, req)
if !accepted {
batches, committers, valid := s.sys.Validate(s.chainId, req)
if !valid {
// this one is problematic, lets skip it
delete(s.pending, key)
return
}
s.validated[key] = valid
if len(batches) == 0 {
s.startBatchTimer()
} else {
Expand Down Expand Up @@ -90,23 +91,25 @@ func (s *SBFT) maybeSendNextBatch() {
if len(s.batches) == 0 {
hasPending := len(s.pending) != 0
for k, req := range s.pending {
if s.passedToBC[k] == nil {
batches, committers, accepted := s.sys.Ordered(s.chainId, req)
if s.validated[k] == false {
batches, committers, valid := s.sys.Validate(s.chainId, req)
s.batches = append(s.batches, batches...)
s.primarycommitters = append(s.primarycommitters, committers...)
if !accepted {
if !valid {
log.Panicf("Replica %d: one of our own pending requests is erroneous.", s.id)
delete(s.pending, k)
continue
}
s.validated[k] = true
}
}
if len(s.batches) == 0 {
// if we have no pending, every req was included
// in a batches
// if we have no pending, every req was included in batches
if !hasPending {
return
}
// we have pending reqs that were just send to BC or
// were already sent (they are in passedToBC)
// we have pending reqs that were just sent for validation or
// were already sent (they are in s.validated)
batch, committers := s.sys.Cut(s.chainId)
s.batches = append(s.batches, batch)
s.primarycommitters = append(s.primarycommitters, committers)
Expand Down
10 changes: 5 additions & 5 deletions orderer/sbft/simplebft/simplebft.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type System interface {
Sign(data []byte) []byte
CheckSig(data []byte, src uint64, sig []byte) error
Reconnect(chainId string, replica uint64)
Ordered(chainID string, req *Request) ([][]*Request, [][]filter.Committer, bool)
Validate(chainID string, req *Request) ([][]*Request, [][]filter.Committer, bool)
Cut(chainID string) ([]*Request, []filter.Committer)
}

Expand All @@ -77,7 +77,7 @@ type SBFT struct {
viewChangeTimer Canceller
replicaState []replicaInfo
pending map[string]*Request
passedToBC map[string]*Request
validated map[string]bool
chainId string
primarycommitters [][]filter.Committer
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func New(id uint64, chainID string, config *Config, sys System) (*SBFT, error) {
viewChangeTimer: dummyCanceller{},
replicaState: make([]replicaInfo, config.N),
pending: make(map[string]*Request),
passedToBC: make(map[string]*Request),
validated: make(map[string]bool),
batches: make([][]*Request, 0, 3),
primarycommitters: make([][]filter.Committer, 0),
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func New(id uint64, chainID string, config *Config, sys System) (*SBFT, error) {
s.activeView = true
if pp.Seq.Seq > s.seq() {
// TODO double add to BC?
_, committers := s.getCommittersFromBlockCutter(pp.Batch)
_, committers := s.getCommittersFromBatch(pp.Batch)
s.acceptPreprepare(pp, committers)
}
}
Expand Down Expand Up @@ -288,6 +288,6 @@ func (s *SBFT) deliverBatch(batch *Batch, committers []filter.Committer) {
key := hash2str(hash(req))
log.Infof("replica %d: attempting to remove %x from pending", s.id, key)
delete(s.pending, key)
delete(s.passedToBC, key)
delete(s.validated, key)
}
}
2 changes: 1 addition & 1 deletion orderer/sbft/simplebft/testsys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (t *testSystemAdapter) Deliver(chainId string, batch *Batch, committer []fi
t.batches[chainId] = append(t.batches[chainId], batch)
}

func (t *testSystemAdapter) Ordered(chainID string, req *Request) ([][]*Request, [][]filter.Committer, bool) {
func (t *testSystemAdapter) Validate(chainID string, req *Request) ([][]*Request, [][]filter.Committer, bool) {
r := t.reqs
if t.reqs == nil || uint64(len(t.reqs)) == maxReqCount-uint64(1) {
t.reqs = make([]*Request, 0, maxReqCount-1)
Expand Down

0 comments on commit 6657459

Please sign in to comment.