Skip to content

Commit

Permalink
FAB-17911 Ch.Part.API: join system channel (#1884)
Browse files Browse the repository at this point in the history
An orderer without channels joins the system channel.
It must be a member of the system channel cluster as well.

1. The join-block is saved to the join-block filerepo.
2. If the join-block.number=0, it is appended to the ledger.
3. A degenerate ChainSupport is created, holding an inactive.Chain
   within, and set into the chains map and the "systemChannel" field.
   This is done to prevent further invocations to the API from issuing
   conflicting commands, as they will find a system channel there.
4. Only "List" and "Remove" will be accepted. Transactions sent to the
   system channel will be rejected.
5. As soon as the Join REST call returns successfully the orderer should
   be restarted.
6. Upon restart, (boostrapMethod="none") 'Main' will look for a system
   channel join block, and if it finds one, will treat it as a bootstrap block.
7. Normal boot then resumes, and the orderer will on-board (replicate) the
   system channel and any channel referred by it, just as if we had 
   boostrapMethod="file" with the same block as bootstrap block.
8. After replication ends, the system channel join block is removed.
9. The orderer now operates in the "legacy" system channel mode.

Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: Ief7511de61eccc876f83157315bca98f9e30397c
  • Loading branch information
tock-ibm authored Sep 17, 2020
1 parent 91f0866 commit ebfd084
Show file tree
Hide file tree
Showing 9 changed files with 481 additions and 41 deletions.
1 change: 1 addition & 0 deletions orderer/common/localconfig/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,5 @@ func TestChannelParticipationDefaults(t *testing.T) {
cfg, err := cc.load()
require.NoError(t, err)
require.Equal(t, cfg.ChannelParticipation.Enabled, Defaults.ChannelParticipation.Enabled)
require.Equal(t, cfg.ChannelParticipation.MaxRequestBodySize, Defaults.ChannelParticipation.MaxRequestBodySize)
}
17 changes: 17 additions & 0 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/inactive"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -180,3 +182,18 @@ func (cs *ChainSupport) Sequence() uint64 {
func (cs *ChainSupport) Append(block *cb.Block) error {
return cs.ledgerResources.ReadWriter.Append(block)
}

func newOnBoardingChainSupport(
ledgerResources *ledgerResources,
config localconfig.TopLevel,
bccsp bccsp.BCCSP,
) (*ChainSupport, error) {
cs := &ChainSupport{ledgerResources: ledgerResources}
cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs, config), bccsp)
cs.Chain = &inactive.Chain{Err: errors.New("system channel creation pending: server requires restart")}
cs.StatusReporter = consensus.StaticStatusReporter{ClusterRelation: types.ClusterRelationMember, Status: types.StatusInactive}

logger.Debugf("[channel: %s] Done creating onboarding channel support resources", cs.ChannelID())

return cs, nil
}
51 changes: 51 additions & 0 deletions orderer/common/multichannel/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ SPDX-License-Identifier: Apache-2.0
package multichannel

import (
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"testing"

"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -63,3 +66,51 @@ func TestConsensusMetadataValidation(t *testing.T) {
_, err = cs.ProposeConfigUpdate(&common.Envelope{})
require.EqualError(t, err, "consensus metadata update for channel config update is invalid: bananas")
}

func TestNewOnboardingChainSupport(t *testing.T) {
mockResources := &mocks.Resources{}
mockValidator := &mocks.ConfigTXValidator{}
mockValidator.ChannelIDReturns("mychannel")
mockResources.ConfigtxValidatorReturns(mockValidator)

ms := &mutableResourcesMock{
Resources: mockResources,
}
cryptoProvider, err := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore())
require.NoError(t, err)

mockRW := &mocks.ReadWriter{}
mockRW.HeightReturns(7)
ledgerRes := &ledgerResources{
configResources: &configResources{
mutableResources: ms,
bccsp: cryptoProvider,
},
ReadWriter: mockRW,
}

cs, err := newOnBoardingChainSupport(ledgerRes, localconfig.TopLevel{}, cryptoProvider)
require.NoError(t, err)
require.NotNil(t, cs)

errStr := "system channel creation pending: server requires restart"
require.EqualError(t, cs.Order(nil, 0), errStr)
require.EqualError(t, cs.Configure(nil, 0), errStr)
require.EqualError(t, cs.WaitReady(), errStr)
require.NotPanics(t, cs.Start)
require.NotPanics(t, cs.Halt)
_, open := <-cs.Errored()
require.False(t, open)

cRel, status := cs.StatusReport()
require.Equal(t, types.ClusterRelationMember, cRel)
require.Equal(t, types.StatusInactive, status)

require.Equal(t, uint64(7), cs.Height(), "ledger ReadWriter is initialized")
require.Equal(t, "mychannel", cs.ConfigtxValidator().ChannelID(), "ChannelConfig is initialized")
require.Equal(t, msgprocessor.ConfigUpdateMsg,
cs.ClassifyMsg(&common.ChannelHeader{
Type: int32(common.HeaderType_CONFIG_UPDATE),
ChannelId: "mychannel",
}), "Message processor is initialized")
}
59 changes: 50 additions & 9 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ func NewRegistrar(
}

if config.ChannelParticipation.Enabled {
err := r.initializeJoinBlockFileRepo()
var err error
r.joinBlockFileRepo, err = InitJoinBlockFileRepo(&r.config)
if err != nil {
logger.Panicf("Error initializing joinblock file repo: %s", err)
}
Expand All @@ -115,20 +116,17 @@ func NewRegistrar(
return r
}

// initialize the channel participation API joinblock file repo. This creates
// InitJoinBlockFileRepo initialize the channel participation API joinblock file repo. This creates
// the fileRepoDir on the filesystem if it does not already exist.
func (r *Registrar) initializeJoinBlockFileRepo() error {
fileRepoDir := filepath.Join(r.config.FileLedger.Location, "filerepo")
func InitJoinBlockFileRepo(config *localconfig.TopLevel) (*filerepo.Repo, error) {
fileRepoDir := filepath.Join(config.FileLedger.Location, "filerepo")
logger.Infof("Channel Participation API enabled, registrar initializing with file repo %s", fileRepoDir)

joinBlockFileRepo, err := filerepo.New(fileRepoDir, "joinblock")
if err != nil {
return err
return nil, err
}

r.joinBlockFileRepo = joinBlockFileRepo

return nil
return joinBlockFileRepo, nil
}

func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
Expand Down Expand Up @@ -698,6 +696,11 @@ func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block, isAppCh
}
}()

if !isAppChannel {
info, err := r.joinSystemChannel(ledgerRes, clusterConsenter, configBlock, channelID)
return info, err
}

isMember, err := clusterConsenter.IsChannelMember(configBlock)
if err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "failed to determine cluster membership from join-block")
Expand Down Expand Up @@ -797,6 +800,44 @@ func (r *Registrar) createFollower(
return fChain, info, nil
}

// Assumes the system channel join-block is saved to the file repo.
func (r *Registrar) joinSystemChannel(
ledgerRes *ledgerResources,
clusterConsenter consensus.ClusterConsenter,
configBlock *cb.Block,
channelID string,
) (types.ChannelInfo, error) {
logger.Infof("Joining system channel '%s', with config block number: %d", channelID, configBlock.Header.Number)

if configBlock.Header.Number == 0 {
if err := ledgerRes.Append(configBlock); err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "error appending config block to the ledger")
}
}

// This is a degenerate ChainSupport holding an inactive.Chain, that will respond to a GET request with the info
// returned below. This is an indication to the user/admin that the orderer needs a restart, and prevent
// conflicting channel participation API actions on the orderer.
cs, err := newOnBoardingChainSupport(ledgerRes, r.config, r.bccsp)
if err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "error creating onboarding chain support")
}
r.chains[channelID] = cs
r.systemChannel = cs
r.systemChannelID = channelID

info := types.ChannelInfo{
Name: channelID,
URL: "",
Height: ledgerRes.Height(),
}
info.ClusterRelation, info.Status = r.systemChannel.StatusReport()

logger.Infof("System channel creation pending: server requires restart! ChannelInfo: %v", info)

return info, nil
}

// RemoveChannel instructs the orderer to remove a channel.
func (r *Registrar) RemoveChannel(channelID string) error {
r.lock.Lock()
Expand Down
73 changes: 73 additions & 0 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,79 @@ func TestRegistrar_JoinChannel(t *testing.T) {
fChain.Halt()
require.False(t, fChain.IsRunning())
})

t.Run("Join system channel without on-boarding", func(t *testing.T) {
setup(t)
defer cleanup()

consenter.IsChannelMemberReturns(true, nil)
registrar := NewRegistrar(config, ledgerFactory, mockCrypto(), &disabled.Provider{}, cryptoProvider, nil)
registrar.Initialize(mockConsenters)

// Before join the chain, it doesn't exist
require.Nil(t, registrar.GetChain("sys-raft-channel"))

info, err := registrar.JoinChannel("sys-raft-channel", genesisBlockSysRaft, false)
require.NoError(t, err)
require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x1}, info)
// After creating the chain, it exists
cs := registrar.GetChain("sys-raft-channel")
require.NotNil(t, cs)

// join-block exists
joinBlockPath := filepath.Join(tmpdir, "filerepo", "joinblock", "sys-raft-channel.joinblock")
_, err = os.Stat(joinBlockPath)
require.NoError(t, err)

// ChannelInfo() and ChannelList() are working fine
info, err = registrar.ChannelInfo("sys-raft-channel")
require.NoError(t, err)
require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x1}, info)
channelList := registrar.ChannelList()
require.Equal(t, 0, len(channelList.Channels))
require.NotNil(t, channelList.SystemChannel)
require.Equal(t, "sys-raft-channel", channelList.SystemChannel.Name)
ledgerRW, err := ledgerFactory.GetOrCreate("sys-raft-channel")
require.NoError(t, err)
require.Equal(t, uint64(1), ledgerRW.Height(), "block was appended")
})

t.Run("Join system channel with on-boarding", func(t *testing.T) {
setup(t)
defer cleanup()

consenter.IsChannelMemberReturns(true, nil)
registrar := NewRegistrar(config, ledgerFactory, mockCrypto(), &disabled.Provider{}, cryptoProvider, nil)
registrar.Initialize(mockConsenters)

// Before join the chain, it doesn't exist
require.Nil(t, registrar.GetChain("sys-raft-channel"))

genesisBlockSysRaft.Header.Number = 7
info, err := registrar.JoinChannel("sys-raft-channel", genesisBlockSysRaft, false)
require.NoError(t, err)
require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x0}, info)
// After creating the chain, it exists
cs := registrar.GetChain("sys-raft-channel")
require.NotNil(t, cs)

// join-block exists
joinBlockPath := filepath.Join(tmpdir, "filerepo", "joinblock", "sys-raft-channel.joinblock")
_, err = os.Stat(joinBlockPath)
require.NoError(t, err)

// ChannelInfo() and ChannelList() are working fine
info, err = registrar.ChannelInfo("sys-raft-channel")
require.NoError(t, err)
require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x0}, info)
channelList := registrar.ChannelList()
require.Equal(t, 0, len(channelList.Channels))
require.NotNil(t, channelList.SystemChannel)
require.Equal(t, "sys-raft-channel", channelList.SystemChannel.Name)
ledgerRW, err := ledgerFactory.GetOrCreate("sys-raft-channel")
require.NoError(t, err)
require.Equal(t, uint64(0), ledgerRW.Height(), "block was not appended")
})
}

func TestRegistrar_RemoveChannel(t *testing.T) {
Expand Down
Loading

0 comments on commit ebfd084

Please sign in to comment.