Skip to content

Commit

Permalink
[Backport] #2936 to release-1.4 (#2964)
Browse files Browse the repository at this point in the history
* Do not create new chain of type etcdraft.Chain if such exists in map of chains. This can happen when in Raft protocol a channel was created, but not marked as done in WAL logs, so at orderer startup it will try to rerun creation tx and panic because the channel already exists.

Signed-off-by: Vladyslav Kopaihorodskyi <vlad.kopaygorodsky@gmail.com>

* fixed test

Signed-off-by: Vladyslav Kopaihorodskyi <vlad.kopaygorodsky@gmail.com>
  • Loading branch information
kopaygorodsky authored Sep 29, 2021
1 parent 8d17c80 commit 89a1307
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 1 deletion.
43 changes: 42 additions & 1 deletion orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ func (r *Registrar) CreateChain(chainName string) {
if chain != nil {
logger.Infof("A chain of type %T for channel %s already exists. "+
"Halting it.", chain.Chain, chainName)
r.lock.Lock()
chain.Halt()
delete(r.chains, chainName)
r.lock.Unlock()
}
r.newChain(configTx(lf))
}
Expand All @@ -312,10 +315,26 @@ func (r *Registrar) newChain(configtx *cb.Envelope) {
r.lock.Lock()
defer r.lock.Unlock()

channelName, err := channelNameFromConfigTx(configtx)
if err != nil {
logger.Warnf("Failed extracting channel name: %v", err)
return
}

// fixes https://github.com/hyperledger/fabric/issues/2931
if existingChain, exists := r.chains[channelName]; exists {
if raftChain, isRaftChain := existingChain.Chain.(RaftChain); isRaftChain && raftChain.IsRaft() {
logger.Infof("Channel %s already created, skipping its creation", channelName)
return
}
}

ledgerResources := r.newLedgerResources(configtx)
// If we have no blocks, we need to create the genesis block ourselves.
if ledgerResources.Height() == 0 {
ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx}))
if err := ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx})); err != nil {
logger.Panicf("Error appending genesis block to ledger: %s", err)
}
}

// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
Expand Down Expand Up @@ -352,3 +371,25 @@ func (r *Registrar) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfi
func (r *Registrar) CreateBundle(channelID string, config *cb.Config) (channelconfig.Resources, error) {
return channelconfig.NewBundle(channelID, config)
}

type RaftChain interface {
IsRaft() bool
}

func channelNameFromConfigTx(configtx *cb.Envelope) (string, error) {
payload, err := utils.UnmarshalPayload(configtx.Payload)
if err != nil {
return "", errors.WithMessage(err, "error umarshaling envelope to payload")
}

if payload.Header == nil {
return "", errors.New("missing channel header")
}

chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return "", errors.WithMessage(err, "error unmarshalling channel header")
}

return chdr.ChannelId, nil
}
58 changes: 58 additions & 0 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package multichannel

import (
"io/ioutil"
"os"
"testing"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -295,6 +297,33 @@ func TestCreateChain(t *testing.T) {
rcs := newChainSupport(manager, chainSupport.ledgerResources, consenters, mockCrypto(), blockcutter.NewMetrics(&disabled.Provider{}))
assert.Equal(t, expectedLastConfigSeq, rcs.lastConfigSeq, "On restart, incorrect lastConfigSeq")
})

t.Run("chain of type etcdraft.Chain is already created", func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "registrar_test-")
assert.NoError(t, err)
defer os.RemoveAll(tmpdir)

ledgerFactory, _ := newRAMLedgerAndFactory(10, genesisconfig.TestChainID, genesisBlockSys)
mockConsenters := map[string]consensus.Consenter{confSys.Orderer.OrdererType: &mockConsenter{}, "etcdraft": &mockConsenter{}}

registrar := NewRegistrar(conf, ledgerFactory, mockCrypto(), &disabled.Provider{})
registrar.Initialize(mockConsenters)

testChainSupport := &ChainSupport{Chain: &mockRaftChain{}}
registrar.chains["test"] = testChainSupport

orglessChannelConf := configtxgentest.Load(genesisconfig.SampleSingleMSPChannelProfile)
orglessChannelConf.Application.Organizations = nil
envConfigUpdate, err := encoder.MakeChannelCreationTransaction("test", mockCrypto(), orglessChannelConf)
assert.NoError(t, err, "Constructing chain creation tx")

registrar.newChain(envConfigUpdate)

testChainSupport2 := registrar.GetChain("test")
assert.NotNil(t, testChainSupport2)

assert.True(t, testChainSupport == testChainSupport2)
})
}

func testLastConfigBlockNumber(t *testing.T, block *cb.Block, expectedBlockNumber uint64) {
Expand Down Expand Up @@ -390,3 +419,32 @@ func TestBroadcastChannelSupportRejection(t *testing.T) {
assert.Error(t, err, "Messages of type HeaderType_CONFIG should return an error.")
})
}

type mockRaftChain struct {
}

func (c *mockRaftChain) Order(env *cb.Envelope, configSeq uint64) error {
return nil
}

func (c *mockRaftChain) Configure(config *cb.Envelope, configSeq uint64) error {
return nil
}

func (c *mockRaftChain) WaitReady() error {
return nil
}

func (c *mockRaftChain) Errored() <-chan struct{} {
return nil
}

func (c *mockRaftChain) Start() {
}

func (c *mockRaftChain) Halt() {
}

func (c *mockRaftChain) IsRaft() bool {
return true
}
4 changes: 4 additions & 0 deletions orderer/consensus/etcdraft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,3 +1362,7 @@ func (c *Chain) triggerCatchup(sn *raftpb.Snapshot) {
case <-c.doneC:
}
}

func (c *Chain) IsRaft() bool {
return true
}

0 comments on commit 89a1307

Please sign in to comment.