Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAB-17911 Ch.Part.API: join system channel #1884

Merged
merged 1 commit into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions orderer/common/localconfig/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,5 @@ func TestChannelParticipationDefaults(t *testing.T) {
cfg, err := cc.load()
require.NoError(t, err)
require.Equal(t, cfg.ChannelParticipation.Enabled, Defaults.ChannelParticipation.Enabled)
require.Equal(t, cfg.ChannelParticipation.MaxRequestBodySize, Defaults.ChannelParticipation.MaxRequestBodySize)
}
17 changes: 17 additions & 0 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/internal/pkg/identity"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/hyperledger/fabric/orderer/consensus/inactive"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -180,3 +182,18 @@ func (cs *ChainSupport) Sequence() uint64 {
func (cs *ChainSupport) Append(block *cb.Block) error {
return cs.ledgerResources.ReadWriter.Append(block)
}

func newOnBoardingChainSupport(
ledgerResources *ledgerResources,
config localconfig.TopLevel,
bccsp bccsp.BCCSP,
) (*ChainSupport, error) {
cs := &ChainSupport{ledgerResources: ledgerResources}
cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs, config), bccsp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have thought this field would be safe to leave uninitialized? What path is the processor for this channel called? If it's in the broadcast/deliver path, perhaps we'd be better off explicitly returning an error of some sort?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in the broadcast path. The first is "ClasifyHeader" and then "Process*" which will ultimately get to the inactive.Chain which is primed with an informative error message. We'll test that in IT.

cs.Chain = &inactive.Chain{Err: errors.New("system channel creation pending: server requires restart")}
cs.StatusReporter = consensus.StaticStatusReporter{ClusterRelation: types.ClusterRelationMember, Status: types.StatusInactive}

logger.Debugf("[channel: %s] Done creating onboarding channel support resources", cs.ChannelID())

return cs, nil
}
51 changes: 51 additions & 0 deletions orderer/common/multichannel/chainsupport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ SPDX-License-Identifier: Apache-2.0
package multichannel

import (
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/types"
"testing"

"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -63,3 +66,51 @@ func TestConsensusMetadataValidation(t *testing.T) {
_, err = cs.ProposeConfigUpdate(&common.Envelope{})
require.EqualError(t, err, "consensus metadata update for channel config update is invalid: bananas")
}

func TestNewOnboardingChainSupport(t *testing.T) {
mockResources := &mocks.Resources{}
mockValidator := &mocks.ConfigTXValidator{}
mockValidator.ChannelIDReturns("mychannel")
mockResources.ConfigtxValidatorReturns(mockValidator)

ms := &mutableResourcesMock{
Resources: mockResources,
}
cryptoProvider, err := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore())
require.NoError(t, err)

mockRW := &mocks.ReadWriter{}
mockRW.HeightReturns(7)
ledgerRes := &ledgerResources{
configResources: &configResources{
mutableResources: ms,
bccsp: cryptoProvider,
},
ReadWriter: mockRW,
}

cs, err := newOnBoardingChainSupport(ledgerRes, localconfig.TopLevel{}, cryptoProvider)
require.NoError(t, err)
require.NotNil(t, cs)

errStr := "system channel creation pending: server requires restart"
require.EqualError(t, cs.Order(nil, 0), errStr)
require.EqualError(t, cs.Configure(nil, 0), errStr)
require.EqualError(t, cs.WaitReady(), errStr)
require.NotPanics(t, cs.Start)
require.NotPanics(t, cs.Halt)
_, open := <-cs.Errored()
require.False(t, open)

cRel, status := cs.StatusReport()
require.Equal(t, types.ClusterRelationMember, cRel)
require.Equal(t, types.StatusInactive, status)

require.Equal(t, uint64(7), cs.Height(), "ledger ReadWriter is initialized")
require.Equal(t, "mychannel", cs.ConfigtxValidator().ChannelID(), "ChannelConfig is initialized")
require.Equal(t, msgprocessor.ConfigUpdateMsg,
cs.ClassifyMsg(&common.ChannelHeader{
Type: int32(common.HeaderType_CONFIG_UPDATE),
ChannelId: "mychannel",
}), "Message processor is initialized")
}
59 changes: 50 additions & 9 deletions orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ func NewRegistrar(
}

if config.ChannelParticipation.Enabled {
err := r.initializeJoinBlockFileRepo()
var err error
r.joinBlockFileRepo, err = InitJoinBlockFileRepo(&r.config)
if err != nil {
logger.Panicf("Error initializing joinblock file repo: %s", err)
}
Expand All @@ -115,20 +116,17 @@ func NewRegistrar(
return r
}

// initialize the channel participation API joinblock file repo. This creates
// InitJoinBlockFileRepo initialize the channel participation API joinblock file repo. This creates
// the fileRepoDir on the filesystem if it does not already exist.
func (r *Registrar) initializeJoinBlockFileRepo() error {
fileRepoDir := filepath.Join(r.config.FileLedger.Location, "filerepo")
func InitJoinBlockFileRepo(config *localconfig.TopLevel) (*filerepo.Repo, error) {
fileRepoDir := filepath.Join(config.FileLedger.Location, "filerepo")
logger.Infof("Channel Participation API enabled, registrar initializing with file repo %s", fileRepoDir)

joinBlockFileRepo, err := filerepo.New(fileRepoDir, "joinblock")
if err != nil {
return err
return nil, err
}

r.joinBlockFileRepo = joinBlockFileRepo

return nil
return joinBlockFileRepo, nil
}

func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
Expand Down Expand Up @@ -698,6 +696,11 @@ func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block, isAppCh
}
}()

if !isAppChannel {
info, err := r.joinSystemChannel(ledgerRes, clusterConsenter, configBlock, channelID)
return info, err
}

isMember, err := clusterConsenter.IsChannelMember(configBlock)
if err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "failed to determine cluster membership from join-block")
Expand Down Expand Up @@ -797,6 +800,44 @@ func (r *Registrar) createFollower(
return fChain, info, nil
}

// Assumes the system channel join-block is saved to the file repo.
func (r *Registrar) joinSystemChannel(
ledgerRes *ledgerResources,
clusterConsenter consensus.ClusterConsenter,
configBlock *cb.Block,
channelID string,
) (types.ChannelInfo, error) {
logger.Infof("Joining system channel '%s', with config block number: %d", channelID, configBlock.Header.Number)

if configBlock.Header.Number == 0 {
if err := ledgerRes.Append(configBlock); err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "error appending config block to the ledger")
}
}

// This is a degenerate ChainSupport holding an inactive.Chain, that will respond to a GET request with the info
// returned below. This is an indication to the user/admin that the orderer needs a restart, and prevent
// conflicting channel participation API actions on the orderer.
cs, err := newOnBoardingChainSupport(ledgerRes, r.config, r.bccsp)
if err != nil {
return types.ChannelInfo{}, errors.Wrap(err, "error creating onboarding chain support")
}
r.chains[channelID] = cs
r.systemChannel = cs
r.systemChannelID = channelID

info := types.ChannelInfo{
Name: channelID,
URL: "",
Height: ledgerRes.Height(),
}
info.ClusterRelation, info.Status = r.systemChannel.StatusReport()

logger.Infof("System channel creation pending: server requires restart! ChannelInfo: %v", info)

return info, nil
}

// RemoveChannel instructs the orderer to remove a channel.
func (r *Registrar) RemoveChannel(channelID string) error {
r.lock.Lock()
Expand Down
73 changes: 73 additions & 0 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,79 @@ func TestRegistrar_JoinChannel(t *testing.T) {
fChain.Halt()
require.False(t, fChain.IsRunning())
})

t.Run("Join system channel without on-boarding", func(t *testing.T) {
setup(t)
defer cleanup()

consenter.IsChannelMemberReturns(true, nil)
registrar := NewRegistrar(config, ledgerFactory, mockCrypto(), &disabled.Provider{}, cryptoProvider, nil)
registrar.Initialize(mockConsenters)

// Before join the chain, it doesn't exist
require.Nil(t, registrar.GetChain("sys-raft-channel"))

info, err := registrar.JoinChannel("sys-raft-channel", genesisBlockSysRaft, false)
require.NoError(t, err)
require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x1}, info)
// After creating the chain, it exists
cs := registrar.GetChain("sys-raft-channel")
require.NotNil(t, cs)

// join-block exists
joinBlockPath := filepath.Join(tmpdir, "filerepo", "joinblock", "sys-raft-channel.joinblock")
_, err = os.Stat(joinBlockPath)
require.NoError(t, err)

// ChannelInfo() and ChannelList() are working fine
info, err = registrar.ChannelInfo("sys-raft-channel")
require.NoError(t, err)
require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x1}, info)
channelList := registrar.ChannelList()
require.Equal(t, 0, len(channelList.Channels))
require.NotNil(t, channelList.SystemChannel)
require.Equal(t, "sys-raft-channel", channelList.SystemChannel.Name)
ledgerRW, err := ledgerFactory.GetOrCreate("sys-raft-channel")
require.NoError(t, err)
require.Equal(t, uint64(1), ledgerRW.Height(), "block was appended")
})

t.Run("Join system channel with on-boarding", func(t *testing.T) {
setup(t)
defer cleanup()

consenter.IsChannelMemberReturns(true, nil)
registrar := NewRegistrar(config, ledgerFactory, mockCrypto(), &disabled.Provider{}, cryptoProvider, nil)
registrar.Initialize(mockConsenters)

// Before join the chain, it doesn't exist
require.Nil(t, registrar.GetChain("sys-raft-channel"))

genesisBlockSysRaft.Header.Number = 7
info, err := registrar.JoinChannel("sys-raft-channel", genesisBlockSysRaft, false)
require.NoError(t, err)
require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x0}, info)
// After creating the chain, it exists
cs := registrar.GetChain("sys-raft-channel")
require.NotNil(t, cs)

// join-block exists
joinBlockPath := filepath.Join(tmpdir, "filerepo", "joinblock", "sys-raft-channel.joinblock")
_, err = os.Stat(joinBlockPath)
require.NoError(t, err)

// ChannelInfo() and ChannelList() are working fine
info, err = registrar.ChannelInfo("sys-raft-channel")
require.NoError(t, err)
require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x0}, info)
channelList := registrar.ChannelList()
require.Equal(t, 0, len(channelList.Channels))
require.NotNil(t, channelList.SystemChannel)
require.Equal(t, "sys-raft-channel", channelList.SystemChannel.Name)
ledgerRW, err := ledgerFactory.GetOrCreate("sys-raft-channel")
require.NoError(t, err)
require.Equal(t, uint64(0), ledgerRW.Height(), "block was not appended")
})
}

func TestRegistrar_RemoveChannel(t *testing.T) {
Expand Down
Loading