Skip to content

Commit

Permalink
FAB-18204 Ch.Part.API: decouple multichannel and etcdraft (#1862)
Browse files Browse the repository at this point in the history
The etcdraft package imports the multichannel package.
This prevents the multichannel package from using the onboarding
and cluster packages, which depend on etcdraft.

Here etcdraft is decoupled from multichannel by declaring an
interface that capture the dependecy on the Registrar.

This also eliminates some of the opaque function pointers to the
Registrar that the etcdraft package was using.

Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: If8112edd7a8fe08a151bb3ca19bf0eeeedc2fb31
  • Loading branch information
tock-ibm authored Sep 10, 2020
1 parent 20bd103 commit 331d950
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 100 deletions.
13 changes: 13 additions & 0 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,19 @@ func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader
return chdr, isConfig, cs, nil
}

// GetConsensusChain retrieves the consensus.Chain of the channel, if it exists.
func (r *Registrar) GetConsensusChain(chainID string) consensus.Chain {
r.lock.RLock()
defer r.lock.RUnlock()

cs, exists := r.chains[chainID]
if !exists {
return nil
}

return cs.Chain
}

// GetChain retrieves the chain support for a chain if it exists.
func (r *Registrar) GetChain(chainID string) *ChainSupport {
r.lock.RLock()
Expand Down
41 changes: 16 additions & 25 deletions orderer/consensus/etcdraft/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/multichannel"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/inactive"
"github.com/hyperledger/fabric/protoutil"
Expand All @@ -43,14 +42,13 @@ type InactiveChainRegistry interface {
TrackChain(chainName string, genesisBlock *common.Block, createChain func())
}

//go:generate mockery -dir . -name ChainGetter -case underscore -output mocks
//go:generate mockery -dir . -name ChainManager -case underscore -output mocks

// ChainGetter obtains instances of ChainSupport for the given channel
type ChainGetter interface {
// GetChain obtains the ChainSupport for the given channel.
// Returns nil, false when the ChainSupport for the given channel
// isn't found.
GetChain(chainID string) *multichannel.ChainSupport
// ChainManager defines the methods from multichannel.Registrar needed by the Consenter.
type ChainManager interface {
GetConsensusChain(channelID string) consensus.Chain
CreateChain(channelID string)
SwitchChainToFollower(channelID string)
}

// Config contains etcdraft configurations
Expand All @@ -62,13 +60,11 @@ type Config struct {

// Consenter implements etcdraft consenter
type Consenter struct {
CreateChain func(chainName string) //TODO FAB-18204 convert function pointers to interface
SwitchToFollower func(chainName string) //TODO FAB-18204 convert function pointers to interface
ChainManager ChainManager
InactiveChainRegistry InactiveChainRegistry
Dialer *cluster.PredicateDialer
Communication cluster.Communicator
*Dispatcher
Chains ChainGetter //TODO FAB-18204 use a single interface for multichannel.Registrar functions
Logger *flogging.FabricLogger
EtcdRaftConfig Config
OrdererConfig localconfig.TopLevel
Expand All @@ -93,17 +89,14 @@ func (c *Consenter) TargetChannel(message proto.Message) string {
// ReceiverByChain returns the MessageReceiver for the given channelID or nil
// if not found.
func (c *Consenter) ReceiverByChain(channelID string) MessageReceiver {
cs := c.Chains.GetChain(channelID)
if cs == nil {
chain := c.ChainManager.GetConsensusChain(channelID)
if chain == nil {
return nil
}
if cs.Chain == nil {
c.Logger.Panicf("Programming error - Chain %s is nil although it exists in the mapping", channelID)
}
if etcdRaftChain, isEtcdRaftChain := cs.Chain.(*Chain); isEtcdRaftChain {
if etcdRaftChain, isEtcdRaftChain := chain.(*Chain); isEtcdRaftChain {
return etcdRaftChain
}
c.Logger.Warningf("Chain %s is of type %v and not etcdraft.Chain", channelID, reflect.TypeOf(cs.Chain))
c.Logger.Warningf("Chain %s is of type %v and not etcdraft.Chain", channelID, reflect.TypeOf(chain))
return nil
}

Expand Down Expand Up @@ -165,7 +158,7 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
if c.InactiveChainRegistry != nil {
// There is a system channel, use the InactiveChainRegistry to track the future config updates of application channel.
c.InactiveChainRegistry.TrackChain(support.ChannelID(), support.Block(0), func() {
c.CreateChain(support.ChannelID())
c.ChainManager.CreateChain(support.ChannelID())
})
return &inactive.Chain{Err: errors.Errorf("channel %s is not serviced by me", support.ChannelID())}, nil
}
Expand Down Expand Up @@ -227,12 +220,12 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
// when we have a system channel, we use the InactiveChainRegistry to track membership upon eviction.
c.Logger.Info("With system channel: after eviction InactiveChainRegistry.TrackChain will be called")
haltCallback = func() {
c.InactiveChainRegistry.TrackChain(support.ChannelID(), nil, func() { c.CreateChain(support.ChannelID()) })
c.InactiveChainRegistry.TrackChain(support.ChannelID(), nil, func() { c.ChainManager.CreateChain(support.ChannelID()) })
}
} else {
// when we do NOT have a system channel, we switch to a follower.Chain upon eviction.
c.Logger.Info("Without system channel: after eviction Registrar.SwitchToFollower will be called")
haltCallback = func() { c.SwitchToFollower(support.ChannelID()) }
haltCallback = func() { c.ChainManager.SwitchChainToFollower(support.ChannelID()) }
}

return NewChain(
Expand Down Expand Up @@ -314,7 +307,7 @@ func New(
conf *localconfig.TopLevel,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
r *multichannel.Registrar,
registrar ChainManager,
icr InactiveChainRegistry,
metricsProvider metrics.Provider,
bccsp bccsp.BCCSP,
Expand All @@ -328,11 +321,9 @@ func New(
}

consenter := &Consenter{
CreateChain: r.CreateChain,
SwitchToFollower: r.SwitchChainToFollower,
ChainManager: registrar,
Cert: srvConf.SecOpts.Certificate,
Logger: logger,
Chains: r,
EtcdRaftConfig: cfg,
OrdererConfig: *conf,
Dialer: clusterDialer,
Expand Down
68 changes: 27 additions & 41 deletions orderer/consensus/etcdraft/consenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import (
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
clustermocks "github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/hyperledger/fabric/orderer/common/multichannel"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
"github.com/hyperledger/fabric/orderer/consensus/inactive"
consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks"
"github.com/hyperledger/fabric/protoutil"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -59,7 +60,7 @@ type ordererConfig interface {

var _ = Describe("Consenter", func() {
var (
chainGetter *mocks.ChainGetter
chainManager *mocks.ChainManager
support *consensusmocks.FakeConsenterSupport
dataDir string
snapDir string
Expand All @@ -76,7 +77,7 @@ var _ = Describe("Consenter", func() {
if certAsPEM == nil {
certAsPEM = kp.Cert
}
chainGetter = &mocks.ChainGetter{}
chainManager = &mocks.ChainManager{}
support = &consensusmocks.FakeConsenterSupport{}
dataDir, err = ioutil.TempDir("", "consenter-")
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -113,17 +114,17 @@ var _ = Describe("Consenter", func() {

When("the consenter is extracting the channel", func() {
It("extracts successfully from step requests", func() {
consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
ch := consenter.TargetChannel(&orderer.ConsensusRequest{Channel: "mychannel"})
Expect(ch).To(BeIdenticalTo("mychannel"))
})
It("extracts successfully from submit requests", func() {
consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
ch := consenter.TargetChannel(&orderer.SubmitRequest{Channel: "mychannel"})
Expect(ch).To(BeIdenticalTo("mychannel"))
})
It("returns an empty string for the rest of the messages", func() {
consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
ch := consenter.TargetChannel(&common.Block{})
Expect(ch).To(BeEmpty())
})
Expand All @@ -132,7 +133,7 @@ var _ = Describe("Consenter", func() {
When("the consenter is asked about join-block membership", func() {
table.DescribeTable("identifies a bad block",
func(block *common.Block, errExpected string) {
consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
isMem, err := consenter.IsChannelMember(block)
Expect(isMem).To(BeFalse())
Expect(err).To(MatchError(errExpected))
Expand Down Expand Up @@ -162,7 +163,7 @@ var _ = Describe("Consenter", func() {
})

It("identifies a member block", func() {
consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
for i := 0; i < len(serverCertificates); i++ {
consenter.Cert = serverCertificates[i]
isMem, err := consenter.IsChannelMember(genesisBlockApp)
Expand All @@ -172,7 +173,7 @@ var _ = Describe("Consenter", func() {
})

It("identifies a non-member block", func() {
consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
consenter.Cert = certAsPEM
isMem, err := consenter.IsChannelMember(genesisBlockApp)
Expect(isMem).To(BeFalse())
Expand All @@ -183,48 +184,33 @@ var _ = Describe("Consenter", func() {
When("the consenter is asked for a chain", func() {
cryptoProvider, _ := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore())
chainInstance := &etcdraft.Chain{CryptoProvider: cryptoProvider}
cs := &multichannel.ChainSupport{
Chain: chainInstance,
BCCSP: cryptoProvider,
}
BeforeEach(func() {
chainGetter.On("GetChain", "mychannel").Return(cs)
chainGetter.On("GetChain", "badChainObject").Return(&multichannel.ChainSupport{})
chainGetter.On("GetChain", "notmychannel").Return(nil)
chainGetter.On("GetChain", "notraftchain").Return(&multichannel.ChainSupport{
Chain: &multichannel.ChainSupport{},
})
chainManager.On("GetConsensusChain", "mychannel").Return(chainInstance)
chainManager.On("GetConsensusChain", "notmychannel").Return(nil)
chainManager.On("GetConsensusChain", "notraftchain").Return(&inactive.Chain{Err: errors.New("not a raft chain")})
})
It("calls the chain getter and returns the reference when it is found", func() {
consenter := newConsenter(chainGetter)
It("calls the chain manager and returns the reference when it is found", func() {
consenter := newConsenter(chainManager)
Expect(consenter).NotTo(BeNil())

chain := consenter.ReceiverByChain("mychannel")
Expect(chain).NotTo(BeNil())
Expect(chain).To(BeIdenticalTo(chainInstance))
})
It("calls the chain getter and returns nil when it's not found", func() {
consenter := newConsenter(chainGetter)
It("calls the chain manager and returns nil when it's not found", func() {
consenter := newConsenter(chainManager)
Expect(consenter).NotTo(BeNil())

chain := consenter.ReceiverByChain("notmychannel")
Expect(chain).To(BeNil())
})
It("calls the chain getter and returns nil when it's not a raft chain", func() {
consenter := newConsenter(chainGetter)
It("calls the chain manager and returns nil when it's not a raft chain", func() {
consenter := newConsenter(chainManager)
Expect(consenter).NotTo(BeNil())

chain := consenter.ReceiverByChain("notraftchain")
Expect(chain).To(BeNil())
})
It("calls the chain getter and panics when the chain has a bad internal state", func() {
consenter := newConsenter(chainGetter)
Expect(consenter).NotTo(BeNil())

Expect(func() {
consenter.ReceiverByChain("badChainObject")
}).To(Panic())
})
})

It("successfully constructs a Chain", func() {
Expand All @@ -251,7 +237,7 @@ var _ = Describe("Consenter", func() {
)
support.SharedConfigReturns(mockOrderer)

consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
consenter.EtcdRaftConfig.WALDir = walDir
consenter.EtcdRaftConfig.SnapDir = snapDir
// consenter.EtcdRaftConfig.EvictionSuspicion is missing
Expand Down Expand Up @@ -302,7 +288,7 @@ var _ = Describe("Consenter", func() {
)
support.SharedConfigReturns(mockOrderer)

consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
consenter.EtcdRaftConfig.WALDir = walDir
consenter.EtcdRaftConfig.SnapDir = snapDir
//without a system channel, the InactiveChainRegistry is nil
Expand Down Expand Up @@ -357,7 +343,7 @@ var _ = Describe("Consenter", func() {
support.SharedConfigReturns(mockOrderer)
support.ChannelIDReturns("foo")

consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)

chain, err := consenter.HandleChain(support, &common.Metadata{})
Expect(chain).To(Not(BeNil()))
Expand All @@ -382,7 +368,7 @@ var _ = Describe("Consenter", func() {
)
support.SharedConfigReturns(mockOrderer)

consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)

chain, err := consenter.HandleChain(support, nil)
Expect(chain).To(BeNil())
Expand Down Expand Up @@ -412,7 +398,7 @@ var _ = Describe("Consenter", func() {
mockOrderer.CapabilitiesReturns(&mocks.OrdererCapabilities{})
support.SharedConfigReturns(mockOrderer)

consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)

chain, err := consenter.HandleChain(support, nil)
Expect(chain).To(BeNil())
Expand Down Expand Up @@ -443,7 +429,7 @@ var _ = Describe("Consenter", func() {
support.SharedConfigReturns(mockOrderer)
support.ChannelIDReturns("foo")

consenter := newConsenter(chainGetter)
consenter := newConsenter(chainManager)
//without a system channel, the InactiveChainRegistry is nil
consenter.InactiveChainRegistry = nil
consenter.icr = nil
Expand All @@ -459,7 +445,7 @@ type consenter struct {
icr *mocks.InactiveChainRegistry
}

func newConsenter(chainGetter *mocks.ChainGetter) *consenter {
func newConsenter(chainManager *mocks.ChainManager) *consenter {
communicator := &clustermocks.Communicator{}
ca, err := tlsgen.NewCA()
Expect(err).NotTo(HaveOccurred())
Expand All @@ -471,11 +457,11 @@ func newConsenter(chainGetter *mocks.ChainGetter) *consenter {
Expect(err).NotTo(HaveOccurred())

c := &etcdraft.Consenter{
ChainManager: chainManager,
InactiveChainRegistry: icr,
Communication: communicator,
Cert: certAsPEM,
Logger: flogging.MustGetLogger("test"),
Chains: chainGetter,
Dispatcher: &etcdraft.Dispatcher{
Logger: flogging.MustGetLogger("test"),
ChainSelector: &mocks.ReceiverGetter{},
Expand Down
13 changes: 8 additions & 5 deletions orderer/consensus/etcdraft/initialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/multichannel"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
"github.com/stretchr/testify/require"
Expand All @@ -33,7 +32,9 @@ func TestNewEtcdRaftConsenter(t *testing.T) {
SecOpts: comm.SecureOptions{
Certificate: []byte{1, 2, 3},
},
}, srv, &multichannel.Registrar{},
},
srv,
&mocks.ChainManager{},
&mocks.InactiveChainRegistry{},
&disabled.Provider{},
cryptoProvider,
Expand All @@ -43,7 +44,7 @@ func TestNewEtcdRaftConsenter(t *testing.T) {
require.Equal(t, []byte{1, 2, 3}, consenter.Cert)
// Assert that all dependencies for the consenter were populated
require.NotNil(t, consenter.Communication)
require.NotNil(t, consenter.Chains)
require.NotNil(t, consenter.ChainManager)
require.NotNil(t, consenter.ChainSelector)
require.NotNil(t, consenter.Dispatcher)
require.NotNil(t, consenter.Logger)
Expand All @@ -63,7 +64,9 @@ func TestNewEtcdRaftConsenterNoSystemChannel(t *testing.T) {
SecOpts: comm.SecureOptions{
Certificate: []byte{1, 2, 3},
},
}, srv, &multichannel.Registrar{},
},
srv,
&mocks.ChainManager{},
nil, // without a system channel we have InactiveChainRegistry == nil
&disabled.Provider{},
cryptoProvider,
Expand All @@ -73,7 +76,7 @@ func TestNewEtcdRaftConsenterNoSystemChannel(t *testing.T) {
require.Equal(t, []byte{1, 2, 3}, consenter.Cert)
// Assert that all dependencies for the consenter were populated
require.NotNil(t, consenter.Communication)
require.NotNil(t, consenter.Chains)
require.NotNil(t, consenter.ChainManager)
require.NotNil(t, consenter.ChainSelector)
require.NotNil(t, consenter.Dispatcher)
require.NotNil(t, consenter.Logger)
Expand Down
Loading

0 comments on commit 331d950

Please sign in to comment.