diff --git a/orderer/common/multichannel/registrar.go b/orderer/common/multichannel/registrar.go index 37cacb8c7ef..bee0c1f5053 100644 --- a/orderer/common/multichannel/registrar.go +++ b/orderer/common/multichannel/registrar.go @@ -303,7 +303,10 @@ func (r *Registrar) CreateChain(chainName string) { if chain != nil { logger.Infof("A chain of type %T for channel %s already exists. "+ "Halting it.", chain.Chain, chainName) + r.lock.Lock() chain.Halt() + delete(r.chains, chainName) + r.lock.Unlock() } r.newChain(configTx(lf)) } @@ -312,10 +315,26 @@ func (r *Registrar) newChain(configtx *cb.Envelope) { r.lock.Lock() defer r.lock.Unlock() + channelName, err := channelNameFromConfigTx(configtx) + if err != nil { + logger.Warnf("Failed extracting channel name: %v", err) + return + } + + // fixes https://github.com/hyperledger/fabric/issues/2931 + if existingChain, exists := r.chains[channelName]; exists { + if raftChain, isRaftChain := existingChain.Chain.(RaftChain); isRaftChain && raftChain.IsRaft() { + logger.Infof("Channel %s already created, skipping its creation", channelName) + return + } + } + ledgerResources := r.newLedgerResources(configtx) // If we have no blocks, we need to create the genesis block ourselves. if ledgerResources.Height() == 0 { - ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})) + if err := ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})); err != nil { + logger.Panicf("Error appending genesis block to ledger: %s", err) + } } // Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is @@ -352,3 +371,25 @@ func (r *Registrar) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfi func (r *Registrar) CreateBundle(channelID string, config *cb.Config) (channelconfig.Resources, error) { return channelconfig.NewBundle(channelID, config) } + +type RaftChain interface { + IsRaft() bool +} + +func channelNameFromConfigTx(configtx *cb.Envelope) (string, error) { + payload, err := utils.UnmarshalPayload(configtx.Payload) + if err != nil { + return "", errors.WithMessage(err, "error umarshaling envelope to payload") + } + + if payload.Header == nil { + return "", errors.New("missing channel header") + } + + chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) + if err != nil { + return "", errors.WithMessage(err, "error unmarshalling channel header") + } + + return chdr.ChannelId, nil +} diff --git a/orderer/common/multichannel/registrar_test.go b/orderer/common/multichannel/registrar_test.go index 1321866df92..5ff0a2c2c65 100644 --- a/orderer/common/multichannel/registrar_test.go +++ b/orderer/common/multichannel/registrar_test.go @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0 package multichannel import ( + "io/ioutil" + "os" "testing" "github.com/golang/protobuf/proto" @@ -295,6 +297,33 @@ func TestCreateChain(t *testing.T) { rcs := newChainSupport(manager, chainSupport.ledgerResources, consenters, mockCrypto(), blockcutter.NewMetrics(&disabled.Provider{})) assert.Equal(t, expectedLastConfigSeq, rcs.lastConfigSeq, "On restart, incorrect lastConfigSeq") }) + + t.Run("chain of type etcdraft.Chain is already created", func(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "registrar_test-") + assert.NoError(t, err) + defer os.RemoveAll(tmpdir) + + ledgerFactory, _ := newRAMLedgerAndFactory(10, genesisconfig.TestChainID, genesisBlockSys) + mockConsenters := map[string]consensus.Consenter{confSys.Orderer.OrdererType: &mockConsenter{}, "etcdraft": &mockConsenter{}} + + registrar := NewRegistrar(conf, ledgerFactory, mockCrypto(), &disabled.Provider{}) + registrar.Initialize(mockConsenters) + + testChainSupport := &ChainSupport{Chain: &mockRaftChain{}} + registrar.chains["test"] = testChainSupport + + orglessChannelConf := configtxgentest.Load(genesisconfig.SampleSingleMSPChannelProfile) + orglessChannelConf.Application.Organizations = nil + envConfigUpdate, err := encoder.MakeChannelCreationTransaction("test", mockCrypto(), orglessChannelConf) + assert.NoError(t, err, "Constructing chain creation tx") + + registrar.newChain(envConfigUpdate) + + testChainSupport2 := registrar.GetChain("test") + assert.NotNil(t, testChainSupport2) + + assert.True(t, testChainSupport == testChainSupport2) + }) } func testLastConfigBlockNumber(t *testing.T, block *cb.Block, expectedBlockNumber uint64) { @@ -390,3 +419,32 @@ func TestBroadcastChannelSupportRejection(t *testing.T) { assert.Error(t, err, "Messages of type HeaderType_CONFIG should return an error.") }) } + +type mockRaftChain struct { +} + +func (c *mockRaftChain) Order(env *cb.Envelope, configSeq uint64) error { + return nil +} + +func (c *mockRaftChain) Configure(config *cb.Envelope, configSeq uint64) error { + return nil +} + +func (c *mockRaftChain) WaitReady() error { + return nil +} + +func (c *mockRaftChain) Errored() <-chan struct{} { + return nil +} + +func (c *mockRaftChain) Start() { +} + +func (c *mockRaftChain) Halt() { +} + +func (c *mockRaftChain) IsRaft() bool { + return true +} diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index 6fbe5324904..1b78e45f1c1 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -1362,3 +1362,7 @@ func (c *Chain) triggerCatchup(sn *raftpb.Snapshot) { case <-c.doneC: } } + +func (c *Chain) IsRaft() bool { + return true +}