Skip to content

Commit

Permalink
[FAB-816] Clean multichain integration interfaces
Browse files Browse the repository at this point in the history
The multichain integration in https://gerrit.hyperledger.org/r/#/c/2721/
hooked the multi-chain manager into the default solo path.  This was
attempted in a way which kept the diff minimal, but correspondingly
produced some unpleasant artifacts in the code.

In particular, the integration used one catch-all interface in
multichain.Manager to supply support to the various components, this
meant that components were forced to drag in unnecessary imports and it
complicated mock testing with many panic-ing unimplemented functions.

This changeset breaks this interface into pieces, and pushes the
definition of the interface back into the components which depend on the
definition.

Also included in this changeset is substantial cleanup of the
solo/consenter_test.go file. This set of tests was originally written in
a way which depended on the end to end flow of the system, but as the
common components have been factored out, and as the solo 'consensus'
has been reduced to its simplest component, more targetted tests are now
needed.

Also included are some assorted linting fixes.

Change-Id: I7c5e20cd7b8c66eb51cc56ad539177ce81cbcbfc
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Dec 5, 2016
1 parent da16559 commit ae9f2f2
Show file tree
Hide file tree
Showing 18 changed files with 399 additions and 357 deletions.
6 changes: 3 additions & 3 deletions bddtests/docker-compose-with-orderer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
- ORDERER_GENERAL_MAXWINDOWSIZE=1000
- ORDERER_GENERAL_ORDERERTYPE=solo
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
- ORDERER_GENERAL_LISTENPORT=5005
- ORDERER_GENERAL_LISTENPORT=7050
- ORDERER_RAMLEDGER_HISTORY_SIZE=100
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
command: orderer
Expand All @@ -29,7 +29,7 @@ services:
- CORE_NEXT=true
- CORE_PEER_ENDORSER_ENABLED=true
- CORE_PEER_COMMITTER_ENABLED=true
- CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:5005
- CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:7050
volumes:
- /var/run/:/host/var/run/
networks:
Expand Down Expand Up @@ -67,7 +67,7 @@ services:
environment:
- CORE_PEER_ID=vp2
- CORE_PEER_PROFILE_ENABLED=true
- CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:5005
- CORE_PEER_COMMITTER_LEDGER_ORDERER=orderer:7050
- CORE_PEER_GOSSIP_BOOTSTRAP=vp0:10000
- CORE_PEER_GOSSIP_ORGLEADER=false
command: peer node start
Expand Down
7 changes: 5 additions & 2 deletions bddtests/steps/orderer_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ def getABStubForComposeService(self, context, composeService):
self.atomicBroadcastStubsDict[composeService] = newABStub
return newABStub

# The default chain ID when the system is statically bootstrapped for testing
TEST_CHAIN_ID = "**TEST_CHAINID**".encode()

# Registerses a user on a specific composeService
def registerUser(context, secretMsg, composeService):
Expand Down Expand Up @@ -217,7 +219,7 @@ def getUserRegistration(context, enrollId):
def createDeliverUpdateMsg(Start, SpecifiedNumber, WindowSize):
seek = ab_pb2.SeekInfo()
startVal = seek.__getattribute__(Start)
seekInfo = ab_pb2.SeekInfo(Start = startVal, SpecifiedNumber = SpecifiedNumber, WindowSize = WindowSize)
seekInfo = ab_pb2.SeekInfo(Start = startVal, SpecifiedNumber = SpecifiedNumber, WindowSize = WindowSize, ChainID = TEST_CHAIN_ID)
deliverUpdateMsg = ab_pb2.DeliverUpdate(Seek = seekInfo)
return deliverUpdateMsg

Expand All @@ -227,7 +229,8 @@ def generateBroadcastMessages(numToGenerate = 1, timeToHoldOpen = 1):
for i in range(0, numToGenerate):
envelope = common_pb2.Envelope()
payload = common_pb2.Payload(header = common_pb2.Header(chainHeader = common_pb2.ChainHeader()))
# TODO, appropriately set the header type
payload.header.chainHeader.chainID = TEST_CHAIN_ID
payload.header.chainHeader.type = common_pb2.ENDORSER_TRANSACTION
payload.data = str("BDD test: {0}".format(datetime.datetime.utcnow()))
envelope.payload = payload.SerializeToString()
messages.append(envelope)
Expand Down
4 changes: 2 additions & 2 deletions orderer/common/blockcutter/blockcutter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type receiver struct {
curBatch []*cb.Envelope
}

// NewReceiverImpl creates a Receiver implementation based on the given batchsize, filters, and configtx manager
func NewReceiverImpl(batchSize int, filters *broadcastfilter.RuleSet, configManager configtx.Manager) Receiver {
return &receiver{
batchSize: batchSize,
Expand Down Expand Up @@ -106,9 +107,8 @@ func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) {
secondBatch := []*cb.Envelope{msg}
if firstBatch == nil {
return [][]*cb.Envelope{secondBatch}, true
} else {
return [][]*cb.Envelope{firstBatch, secondBatch}, true
}
return [][]*cb.Envelope{firstBatch, secondBatch}, true
case broadcastfilter.Reject:
logger.Debugf("Rejecting message")
return nil, false
Expand Down
17 changes: 14 additions & 3 deletions orderer/common/bootstrap/static/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import (
ab "github.com/hyperledger/fabric/protos/orderer"
)

var TestChainID = "**TEST_CHAINID**"
// TestChainID is the default chain ID which is used by all statically bootstrapped networks
// This is necessary to allow test clients to connect without being rejected for targetting
// a chain which does not exist
const TestChainID = "**TEST_CHAINID**"

const msgVersion = int32(1)

Expand All @@ -38,12 +41,20 @@ type bootstrapper struct {
batchSize int32
}

const (
// DefaultBatchSize is the default value of BatchSizeKey
DefaultBatchSize = 10

// DefaultConsensusType is the default value of ConsensusTypeKey
DefaultConsensusType = "solo"
)

// New returns a new static bootstrap helper.
func New() bootstrap.Helper {
return &bootstrapper{
chainID: TestChainID,
consensusType: "solo",
batchSize: 10,
consensusType: DefaultConsensusType,
batchSize: DefaultBatchSize,
}
}

Expand Down
66 changes: 39 additions & 27 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package broadcast

import (
"github.com/hyperledger/fabric/orderer/common/broadcastfilter"
"github.com/hyperledger/fabric/orderer/multichain"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
Expand All @@ -38,18 +37,30 @@ type Handler interface {
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
}

// SupportManager provides a way for the Handler to look up the Support for a chain
type SupportManager interface {
GetChain(chainID string) (Support, bool)
}

// Support provides the backing resources needed to support broadcast on a chain
type Support interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
Enqueue(env *cb.Envelope) bool

// Filters returns the set of broadcast filters for this chain
Filters() *broadcastfilter.RuleSet
}

type handlerImpl struct {
queueSize int
ml multichain.Manager
exitChan chan struct{}
sm SupportManager
}

// NewHandlerImpl constructs a new implementation of the Handler interface
func NewHandlerImpl(queueSize int, ml multichain.Manager) Handler {
func NewHandlerImpl(sm SupportManager, queueSize int) Handler {
return &handlerImpl{
queueSize: queueSize,
ml: ml,
exitChan: make(chan struct{}),
sm: sm,
}
}

Expand All @@ -61,44 +72,45 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return b.queueEnvelopes(srv)
}

type msgAndChainSupport struct {
msg *cb.Envelope
chainSupport multichain.ChainSupport
type msgAndSupport struct {
msg *cb.Envelope
support Support
}

type broadcaster struct {
bs *handlerImpl
queue chan *msgAndChainSupport
bs *handlerImpl
queue chan *msgAndSupport
exitChan chan struct{}
}

func newBroadcaster(bs *handlerImpl) *broadcaster {
b := &broadcaster{
bs: bs,
queue: make(chan *msgAndChainSupport, bs.queueSize),
bs: bs,
queue: make(chan *msgAndSupport, bs.queueSize),
exitChan: make(chan struct{}),
}
return b
}

func (b *broadcaster) drainQueue() {
for {
select {
case msgAndChainSupport, ok := <-b.queue:
if ok {
if !msgAndChainSupport.chainSupport.Chain().Enqueue(msgAndChainSupport.msg) {
return
}
} else {
return
}
case <-b.bs.exitChan:
defer close(b.exitChan)
for msgAndSupport := range b.queue {
if !msgAndSupport.support.Enqueue(msgAndSupport.msg) {
logger.Debugf("Consenter instructed us to shut down")
return
}
}
logger.Debugf("Exiting because the queue channel closed")
}

func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) error {

for {
select {
case <-b.exitChan:
return nil
default:
}
msg, err := srv.Recv()
if err != nil {
return err
Expand All @@ -111,20 +123,20 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

chainSupport, ok := b.bs.ml.GetChain(payload.Header.ChainHeader.ChainID)
support, ok := b.bs.sm.GetChain(payload.Header.ChainHeader.ChainID)
if !ok {
// XXX Hook in chain creation logic here
panic("Unimplemented")
}

action, _ := chainSupport.Filters().Apply(msg)
action, _ := support.Filters().Apply(msg)

switch action {
case broadcastfilter.Reconfigure:
fallthrough
case broadcastfilter.Accept:
select {
case b.queue <- &msgAndChainSupport{msg: msg, chainSupport: chainSupport}:
case b.queue <- &msgAndSupport{msg: msg, support: support}:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
default:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
Expand Down
Loading

0 comments on commit ae9f2f2

Please sign in to comment.