From ebfd084fedb262393ec5e004990d8dedc51e6cc3 Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Thu, 17 Sep 2020 07:41:08 +0300 Subject: [PATCH] FAB-17911 Ch.Part.API: join system channel (#1884) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An orderer without channels joins the system channel. It must be a member of the system channel cluster as well. 1. The join-block is saved to the join-block filerepo. 2. If the join-block.number=0, it is appended to the ledger. 3. A degenerate ChainSupport is created, holding an inactive.Chain within, and set into the chains map and the "systemChannel" field. This is done to prevent further invocations to the API from issuing conflicting commands, as they will find a system channel there. 4. Only "List" and "Remove" will be accepted. Transactions sent to the system channel will be rejected. 5. As soon as the Join REST call returns successfully the orderer should be restarted. 6. Upon restart, (boostrapMethod="none") 'Main' will look for a system channel join block, and if it finds one, will treat it as a bootstrap block. 7. Normal boot then resumes, and the orderer will on-board (replicate) the system channel and any channel referred by it, just as if we had  boostrapMethod="file" with the same block as bootstrap block. 8. After replication ends, the system channel join block is removed. 9. The orderer now operates in the "legacy" system channel mode. Signed-off-by: Yoav Tock Change-Id: Ief7511de61eccc876f83157315bca98f9e30397c --- orderer/common/localconfig/config_test.go | 1 + orderer/common/multichannel/chainsupport.go | 17 +++ .../common/multichannel/chainsupport_test.go | 51 ++++++++ orderer/common/multichannel/registrar.go | 59 +++++++-- orderer/common/multichannel/registrar_test.go | 73 ++++++++++++ orderer/common/server/etcdraft_test.go | 112 ++++++++++++++---- orderer/common/server/main.go | 96 +++++++++++++-- orderer/common/server/main_test.go | 97 +++++++++++++++ orderer/common/server/testdata/orderer.yaml | 16 +++ 9 files changed, 481 insertions(+), 41 deletions(-) diff --git a/orderer/common/localconfig/config_test.go b/orderer/common/localconfig/config_test.go index f1cfbc13ad7..87ce5265344 100644 --- a/orderer/common/localconfig/config_test.go +++ b/orderer/common/localconfig/config_test.go @@ -269,4 +269,5 @@ func TestChannelParticipationDefaults(t *testing.T) { cfg, err := cc.load() require.NoError(t, err) require.Equal(t, cfg.ChannelParticipation.Enabled, Defaults.ChannelParticipation.Enabled) + require.Equal(t, cfg.ChannelParticipation.MaxRequestBodySize, Defaults.ChannelParticipation.MaxRequestBodySize) } diff --git a/orderer/common/multichannel/chainsupport.go b/orderer/common/multichannel/chainsupport.go index 5f808cab659..bb708e8de9d 100644 --- a/orderer/common/multichannel/chainsupport.go +++ b/orderer/common/multichannel/chainsupport.go @@ -12,9 +12,11 @@ import ( "github.com/hyperledger/fabric/common/ledger/blockledger" "github.com/hyperledger/fabric/internal/pkg/identity" "github.com/hyperledger/fabric/orderer/common/blockcutter" + "github.com/hyperledger/fabric/orderer/common/localconfig" "github.com/hyperledger/fabric/orderer/common/msgprocessor" "github.com/hyperledger/fabric/orderer/common/types" "github.com/hyperledger/fabric/orderer/consensus" + "github.com/hyperledger/fabric/orderer/consensus/inactive" "github.com/hyperledger/fabric/protoutil" "github.com/pkg/errors" ) @@ -180,3 +182,18 @@ func (cs *ChainSupport) Sequence() uint64 { func (cs *ChainSupport) Append(block *cb.Block) error { return cs.ledgerResources.ReadWriter.Append(block) } + +func newOnBoardingChainSupport( + ledgerResources *ledgerResources, + config localconfig.TopLevel, + bccsp bccsp.BCCSP, +) (*ChainSupport, error) { + cs := &ChainSupport{ledgerResources: ledgerResources} + cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs, config), bccsp) + cs.Chain = &inactive.Chain{Err: errors.New("system channel creation pending: server requires restart")} + cs.StatusReporter = consensus.StaticStatusReporter{ClusterRelation: types.ClusterRelationMember, Status: types.StatusInactive} + + logger.Debugf("[channel: %s] Done creating onboarding channel support resources", cs.ChannelID()) + + return cs, nil +} diff --git a/orderer/common/multichannel/chainsupport_test.go b/orderer/common/multichannel/chainsupport_test.go index 0e4880ed374..e682f2bea2e 100644 --- a/orderer/common/multichannel/chainsupport_test.go +++ b/orderer/common/multichannel/chainsupport_test.go @@ -7,6 +7,9 @@ SPDX-License-Identifier: Apache-2.0 package multichannel import ( + "github.com/hyperledger/fabric/orderer/common/localconfig" + "github.com/hyperledger/fabric/orderer/common/msgprocessor" + "github.com/hyperledger/fabric/orderer/common/types" "testing" "github.com/hyperledger/fabric-protos-go/common" @@ -63,3 +66,51 @@ func TestConsensusMetadataValidation(t *testing.T) { _, err = cs.ProposeConfigUpdate(&common.Envelope{}) require.EqualError(t, err, "consensus metadata update for channel config update is invalid: bananas") } + +func TestNewOnboardingChainSupport(t *testing.T) { + mockResources := &mocks.Resources{} + mockValidator := &mocks.ConfigTXValidator{} + mockValidator.ChannelIDReturns("mychannel") + mockResources.ConfigtxValidatorReturns(mockValidator) + + ms := &mutableResourcesMock{ + Resources: mockResources, + } + cryptoProvider, err := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore()) + require.NoError(t, err) + + mockRW := &mocks.ReadWriter{} + mockRW.HeightReturns(7) + ledgerRes := &ledgerResources{ + configResources: &configResources{ + mutableResources: ms, + bccsp: cryptoProvider, + }, + ReadWriter: mockRW, + } + + cs, err := newOnBoardingChainSupport(ledgerRes, localconfig.TopLevel{}, cryptoProvider) + require.NoError(t, err) + require.NotNil(t, cs) + + errStr := "system channel creation pending: server requires restart" + require.EqualError(t, cs.Order(nil, 0), errStr) + require.EqualError(t, cs.Configure(nil, 0), errStr) + require.EqualError(t, cs.WaitReady(), errStr) + require.NotPanics(t, cs.Start) + require.NotPanics(t, cs.Halt) + _, open := <-cs.Errored() + require.False(t, open) + + cRel, status := cs.StatusReport() + require.Equal(t, types.ClusterRelationMember, cRel) + require.Equal(t, types.StatusInactive, status) + + require.Equal(t, uint64(7), cs.Height(), "ledger ReadWriter is initialized") + require.Equal(t, "mychannel", cs.ConfigtxValidator().ChannelID(), "ChannelConfig is initialized") + require.Equal(t, msgprocessor.ConfigUpdateMsg, + cs.ClassifyMsg(&common.ChannelHeader{ + Type: int32(common.HeaderType_CONFIG_UPDATE), + ChannelId: "mychannel", + }), "Message processor is initialized") +} diff --git a/orderer/common/multichannel/registrar.go b/orderer/common/multichannel/registrar.go index 555e859e30b..21e3cfec480 100644 --- a/orderer/common/multichannel/registrar.go +++ b/orderer/common/multichannel/registrar.go @@ -106,7 +106,8 @@ func NewRegistrar( } if config.ChannelParticipation.Enabled { - err := r.initializeJoinBlockFileRepo() + var err error + r.joinBlockFileRepo, err = InitJoinBlockFileRepo(&r.config) if err != nil { logger.Panicf("Error initializing joinblock file repo: %s", err) } @@ -115,20 +116,17 @@ func NewRegistrar( return r } -// initialize the channel participation API joinblock file repo. This creates +// InitJoinBlockFileRepo initialize the channel participation API joinblock file repo. This creates // the fileRepoDir on the filesystem if it does not already exist. -func (r *Registrar) initializeJoinBlockFileRepo() error { - fileRepoDir := filepath.Join(r.config.FileLedger.Location, "filerepo") +func InitJoinBlockFileRepo(config *localconfig.TopLevel) (*filerepo.Repo, error) { + fileRepoDir := filepath.Join(config.FileLedger.Location, "filerepo") logger.Infof("Channel Participation API enabled, registrar initializing with file repo %s", fileRepoDir) joinBlockFileRepo, err := filerepo.New(fileRepoDir, "joinblock") if err != nil { - return err + return nil, err } - - r.joinBlockFileRepo = joinBlockFileRepo - - return nil + return joinBlockFileRepo, nil } func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) { @@ -698,6 +696,11 @@ func (r *Registrar) JoinChannel(channelID string, configBlock *cb.Block, isAppCh } }() + if !isAppChannel { + info, err := r.joinSystemChannel(ledgerRes, clusterConsenter, configBlock, channelID) + return info, err + } + isMember, err := clusterConsenter.IsChannelMember(configBlock) if err != nil { return types.ChannelInfo{}, errors.Wrap(err, "failed to determine cluster membership from join-block") @@ -797,6 +800,44 @@ func (r *Registrar) createFollower( return fChain, info, nil } +// Assumes the system channel join-block is saved to the file repo. +func (r *Registrar) joinSystemChannel( + ledgerRes *ledgerResources, + clusterConsenter consensus.ClusterConsenter, + configBlock *cb.Block, + channelID string, +) (types.ChannelInfo, error) { + logger.Infof("Joining system channel '%s', with config block number: %d", channelID, configBlock.Header.Number) + + if configBlock.Header.Number == 0 { + if err := ledgerRes.Append(configBlock); err != nil { + return types.ChannelInfo{}, errors.Wrap(err, "error appending config block to the ledger") + } + } + + // This is a degenerate ChainSupport holding an inactive.Chain, that will respond to a GET request with the info + // returned below. This is an indication to the user/admin that the orderer needs a restart, and prevent + // conflicting channel participation API actions on the orderer. + cs, err := newOnBoardingChainSupport(ledgerRes, r.config, r.bccsp) + if err != nil { + return types.ChannelInfo{}, errors.Wrap(err, "error creating onboarding chain support") + } + r.chains[channelID] = cs + r.systemChannel = cs + r.systemChannelID = channelID + + info := types.ChannelInfo{ + Name: channelID, + URL: "", + Height: ledgerRes.Height(), + } + info.ClusterRelation, info.Status = r.systemChannel.StatusReport() + + logger.Infof("System channel creation pending: server requires restart! ChannelInfo: %v", info) + + return info, nil +} + // RemoveChannel instructs the orderer to remove a channel. func (r *Registrar) RemoveChannel(channelID string) error { r.lock.Lock() diff --git a/orderer/common/multichannel/registrar_test.go b/orderer/common/multichannel/registrar_test.go index 4db20c87f6a..1af8a8a58e7 100644 --- a/orderer/common/multichannel/registrar_test.go +++ b/orderer/common/multichannel/registrar_test.go @@ -1250,6 +1250,79 @@ func TestRegistrar_JoinChannel(t *testing.T) { fChain.Halt() require.False(t, fChain.IsRunning()) }) + + t.Run("Join system channel without on-boarding", func(t *testing.T) { + setup(t) + defer cleanup() + + consenter.IsChannelMemberReturns(true, nil) + registrar := NewRegistrar(config, ledgerFactory, mockCrypto(), &disabled.Provider{}, cryptoProvider, nil) + registrar.Initialize(mockConsenters) + + // Before join the chain, it doesn't exist + require.Nil(t, registrar.GetChain("sys-raft-channel")) + + info, err := registrar.JoinChannel("sys-raft-channel", genesisBlockSysRaft, false) + require.NoError(t, err) + require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x1}, info) + // After creating the chain, it exists + cs := registrar.GetChain("sys-raft-channel") + require.NotNil(t, cs) + + // join-block exists + joinBlockPath := filepath.Join(tmpdir, "filerepo", "joinblock", "sys-raft-channel.joinblock") + _, err = os.Stat(joinBlockPath) + require.NoError(t, err) + + // ChannelInfo() and ChannelList() are working fine + info, err = registrar.ChannelInfo("sys-raft-channel") + require.NoError(t, err) + require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x1}, info) + channelList := registrar.ChannelList() + require.Equal(t, 0, len(channelList.Channels)) + require.NotNil(t, channelList.SystemChannel) + require.Equal(t, "sys-raft-channel", channelList.SystemChannel.Name) + ledgerRW, err := ledgerFactory.GetOrCreate("sys-raft-channel") + require.NoError(t, err) + require.Equal(t, uint64(1), ledgerRW.Height(), "block was appended") + }) + + t.Run("Join system channel with on-boarding", func(t *testing.T) { + setup(t) + defer cleanup() + + consenter.IsChannelMemberReturns(true, nil) + registrar := NewRegistrar(config, ledgerFactory, mockCrypto(), &disabled.Provider{}, cryptoProvider, nil) + registrar.Initialize(mockConsenters) + + // Before join the chain, it doesn't exist + require.Nil(t, registrar.GetChain("sys-raft-channel")) + + genesisBlockSysRaft.Header.Number = 7 + info, err := registrar.JoinChannel("sys-raft-channel", genesisBlockSysRaft, false) + require.NoError(t, err) + require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x0}, info) + // After creating the chain, it exists + cs := registrar.GetChain("sys-raft-channel") + require.NotNil(t, cs) + + // join-block exists + joinBlockPath := filepath.Join(tmpdir, "filerepo", "joinblock", "sys-raft-channel.joinblock") + _, err = os.Stat(joinBlockPath) + require.NoError(t, err) + + // ChannelInfo() and ChannelList() are working fine + info, err = registrar.ChannelInfo("sys-raft-channel") + require.NoError(t, err) + require.Equal(t, types.ChannelInfo{Name: "sys-raft-channel", URL: "", ClusterRelation: "member", Status: "inactive", Height: 0x0}, info) + channelList := registrar.ChannelList() + require.Equal(t, 0, len(channelList.Channels)) + require.NotNil(t, channelList.SystemChannel) + require.Equal(t, "sys-raft-channel", channelList.SystemChannel.Name) + ledgerRW, err := ledgerFactory.GetOrCreate("sys-raft-channel") + require.NoError(t, err) + require.Equal(t, uint64(0), ledgerRW.Height(), "block was not appended") + }) } func TestRegistrar_RemoveChannel(t *testing.T) { diff --git a/orderer/common/server/etcdraft_test.go b/orderer/common/server/etcdraft_test.go index 0a61edd6277..b6e7e4fc2c2 100644 --- a/orderer/common/server/etcdraft_test.go +++ b/orderer/common/server/etcdraft_test.go @@ -43,21 +43,21 @@ func TestSpawnEtcdRaft(t *testing.T) { defer gexec.CleanupBuildArtifacts() - tempDir, err := ioutil.TempDir("", "etcdraft-test") + tempSharedDir, err := ioutil.TempDir("", "etcdraft-test") gt.Expect(err).NotTo(HaveOccurred()) - defer os.RemoveAll(tempDir) + defer os.RemoveAll(tempSharedDir) - copyYamlFiles(gt, "testdata", tempDir) + copyYamlFiles(gt, "testdata", tempSharedDir) - cryptoPath := generateCryptoMaterials(gt, cryptogen, tempDir) + cryptoPath := generateCryptoMaterials(gt, cryptogen, tempSharedDir) t.Run("Bad", func(t *testing.T) { t.Run("Invalid bootstrap block", func(t *testing.T) { - testEtcdRaftOSNFailureInvalidBootstrapBlock(NewGomegaWithT(t), tempDir, orderer, configtxgen, cryptoPath) + testEtcdRaftOSNFailureInvalidBootstrapBlock(NewGomegaWithT(t), tempSharedDir, orderer, configtxgen, cryptoPath) }) t.Run("TLS disabled single listener", func(t *testing.T) { - testEtcdRaftOSNNoTLSSingleListener(NewGomegaWithT(t), tempDir, orderer, configtxgen, cryptoPath) + testEtcdRaftOSNNoTLSSingleListener(NewGomegaWithT(t), tempSharedDir, orderer, configtxgen, cryptoPath) }) }) @@ -65,16 +65,21 @@ func TestSpawnEtcdRaft(t *testing.T) { // tests in this suite actually launch process with success, hence we need to avoid // conflicts in listening port, opening files. t.Run("TLS disabled dual listener", func(t *testing.T) { - testEtcdRaftOSNNoTLSDualListener(NewGomegaWithT(t), tempDir, orderer, configtxgen, cryptoPath) + testEtcdRaftOSNNoTLSDualListener(NewGomegaWithT(t), tempSharedDir, orderer, configtxgen, cryptoPath) }) t.Run("TLS enabled single listener", func(t *testing.T) { - testEtcdRaftOSNSuccess(NewGomegaWithT(t), tempDir, configtxgen, orderer, cryptoPath) + testEtcdRaftOSNSuccess(NewGomegaWithT(t), tempSharedDir, configtxgen, orderer, cryptoPath) }) t.Run("Restart orderer without Genesis Block", func(t *testing.T) { - testEtcdRaftOSNRestart(NewGomegaWithT(t), tempDir, configtxgen, orderer, cryptoPath) + testEtcdRaftOSNRestart(NewGomegaWithT(t), tempSharedDir, configtxgen, orderer, cryptoPath) + }) + + t.Run("Restart orderer after joining system channel", func(t *testing.T) { + testEtcdRaftOSNJoinSysChan(NewGomegaWithT(t), tempSharedDir, configtxgen, orderer, cryptoPath) }) + }) } @@ -122,26 +127,69 @@ func generateCryptoMaterials(gt *GomegaWithT, cryptogen, path string) string { } func testEtcdRaftOSNRestart(gt *GomegaWithT, tempDir, configtxgen, orderer, cryptoPath string) { + genesisBlockPath := generateBootstrapBlock(gt, tempDir, configtxgen, "system", "SampleEtcdRaftSystemChannel") // Launch the OSN - ordererProcess := launchOrderer(gt, orderer, tempDir, genesisBlockPath, cryptoPath, "file") + ordererProcess := launchOrderer(gt, orderer, tempDir, tempDir, genesisBlockPath, cryptoPath, "file", "false") defer func() { gt.Eventually(ordererProcess.Kill(), time.Minute).Should(gexec.Exit()) }() gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Beginning to serve requests")) gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("becomeLeader")) // Restart orderer with ORDERER_GENERAL_BOOTSTRAPMETHOD = none gt.Eventually(ordererProcess.Kill(), time.Minute).Should(gexec.Exit()) - ordererProcess = launchOrderer(gt, orderer, tempDir, "", cryptoPath, "none") + ordererProcess = launchOrderer(gt, orderer, tempDir, tempDir, "", cryptoPath, "none", "true") gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Beginning to serve requests")) gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("becomeLeader")) + gt.Eventually(ordererProcess.Kill(), time.Minute).Should(gexec.Exit()) } -func testEtcdRaftOSNSuccess(gt *GomegaWithT, tempDir, configtxgen, orderer, cryptoPath string) { - genesisBlockPath := generateBootstrapBlock(gt, tempDir, configtxgen, "system", "SampleEtcdRaftSystemChannel") +func testEtcdRaftOSNJoinSysChan(gt *GomegaWithT, configPath, configtxgen, orderer, cryptoPath string) { + tempDir, err := ioutil.TempDir("", "etcdraft-test") + gt.Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(tempDir) + + // Launch the OSN without channels + ordererProcess := launchOrderer(gt, orderer, tempDir, configPath, "", cryptoPath, "none", "true") + defer func() { gt.Eventually(ordererProcess.Kill(), time.Minute).Should(gexec.Exit()) }() + gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Channel Participation API enabled, registrar initializing with file repo")) + gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("No join-block was found for the system channel")) + gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Beginning to serve requests")) + + // emulate a join-block for the system channel written to the join-block filerepo location + genesisBlockPath := generateBootstrapBlock(gt, configPath, configtxgen, "system", "SampleEtcdRaftSystemChannel") + genesisBlockBytes, err := ioutil.ReadFile(genesisBlockPath) + gt.Expect(err).NotTo(HaveOccurred()) + fileRepoDir := filepath.Join(tempDir, "ledger", "filerepo", "joinblock") + joinBlockPath := filepath.Join(fileRepoDir, "system.joinblock") + err = ioutil.WriteFile(joinBlockPath, genesisBlockBytes, 0644) + gt.Expect(err).NotTo(HaveOccurred()) + + gt.Eventually(ordererProcess.Kill(), time.Minute).Should(gexec.Exit()) + + // Restart, should pick up the join-block and bootstrap with it + ordererProcess = launchOrderer(gt, orderer, tempDir, configPath, "", cryptoPath, "none", "true") + gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Channel Participation API enabled, registrar initializing with file repo")) + gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Join-block was found for the system channel: system, number: 0")) + gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Beginning to serve requests")) + gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("becomeLeader")) + // File was removed after on-boarding + _, err = os.Stat(joinBlockPath) + gt.Expect(err).To(HaveOccurred()) + pathErr := err.(*os.PathError) + gt.Expect(pathErr.Err.Error()).To(Equal("no such file or directory")) + gt.Eventually(ordererProcess.Kill(), time.Minute).Should(gexec.Exit()) +} + +func testEtcdRaftOSNSuccess(gt *GomegaWithT, configPath, configtxgen, orderer, cryptoPath string) { + tempDir, err := ioutil.TempDir("", "etcdraft-test") + gt.Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(tempDir) + + genesisBlockPath := generateBootstrapBlock(gt, configPath, configtxgen, "system", "SampleEtcdRaftSystemChannel") // Launch the OSN - ordererProcess := launchOrderer(gt, orderer, tempDir, genesisBlockPath, cryptoPath, "file") + ordererProcess := launchOrderer(gt, orderer, tempDir, configPath, genesisBlockPath, cryptoPath, "file", "false") defer func() { gt.Eventually(ordererProcess.Kill(), time.Minute).Should(gexec.Exit()) }() // The following configuration parameters are not specified in the orderer.yaml, so let's ensure // they are really configured autonomously via the localconfig code. @@ -163,9 +211,13 @@ func testEtcdRaftOSNSuccess(gt *GomegaWithT, tempDir, configtxgen, orderer, cryp gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("becomeLeader")) } -func testEtcdRaftOSNFailureInvalidBootstrapBlock(gt *GomegaWithT, tempDir, orderer, configtxgen, cryptoPath string) { +func testEtcdRaftOSNFailureInvalidBootstrapBlock(gt *GomegaWithT, configPath, orderer, configtxgen, cryptoPath string) { + tempDir, err := ioutil.TempDir("", "etcdraft-test") + gt.Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(tempDir) + // create an application channel genesis block - genesisBlockPath := generateBootstrapBlock(gt, tempDir, configtxgen, "mychannel", "SampleOrgChannel") + genesisBlockPath := generateBootstrapBlock(gt, configPath, configtxgen, "mychannel", "SampleOrgChannel") genesisBlockBytes, err := ioutil.ReadFile(genesisBlockPath) gt.Expect(err).NotTo(HaveOccurred()) @@ -175,15 +227,19 @@ func testEtcdRaftOSNFailureInvalidBootstrapBlock(gt *GomegaWithT, tempDir, order gt.Expect(err).NotTo(HaveOccurred()) // Launch the OSN - ordererProcess := launchOrderer(gt, orderer, tempDir, genesisBlockPath, cryptoPath, "") + ordererProcess := launchOrderer(gt, orderer, tempDir, configPath, genesisBlockPath, cryptoPath, "", "false") defer func() { gt.Eventually(ordererProcess.Kill(), time.Minute).Should(gexec.Exit()) }() expectedErr := "Failed validating bootstrap block: the block isn't a system channel block because it lacks ConsortiumsConfig" gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say(expectedErr)) } -func testEtcdRaftOSNNoTLSSingleListener(gt *GomegaWithT, tempDir, orderer string, configtxgen, cryptoPath string) { - genesisBlockPath := generateBootstrapBlock(gt, tempDir, configtxgen, "system", "SampleEtcdRaftSystemChannel") +func testEtcdRaftOSNNoTLSSingleListener(gt *GomegaWithT, configPath, orderer string, configtxgen, cryptoPath string) { + tempDir, err := ioutil.TempDir("", "etcdraft-test") + gt.Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(tempDir) + + genesisBlockPath := generateBootstrapBlock(gt, configPath, configtxgen, "system", "SampleEtcdRaftSystemChannel") cmd := exec.Command(orderer) cmd.Env = []string{ @@ -192,7 +248,7 @@ func testEtcdRaftOSNNoTLSSingleListener(gt *GomegaWithT, tempDir, orderer string "ORDERER_GENERAL_SYSTEMCHANNEL=system", fmt.Sprintf("ORDERER_FILELEDGER_LOCATION=%s", filepath.Join(tempDir, "ledger")), fmt.Sprintf("ORDERER_GENERAL_BOOTSTRAPFILE=%s", genesisBlockPath), - fmt.Sprintf("FABRIC_CFG_PATH=%s", tempDir), + fmt.Sprintf("FABRIC_CFG_PATH=%s", configPath), } ordererProcess, err := gexec.Start(cmd, nil, nil) gt.Expect(err).NotTo(HaveOccurred()) @@ -202,9 +258,13 @@ func testEtcdRaftOSNNoTLSSingleListener(gt *GomegaWithT, tempDir, orderer string gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say(expectedErr)) } -func testEtcdRaftOSNNoTLSDualListener(gt *GomegaWithT, tempDir, orderer string, configtxgen, cryptoPath string) { +func testEtcdRaftOSNNoTLSDualListener(gt *GomegaWithT, configPath, orderer string, configtxgen, cryptoPath string) { + tempDir, err := ioutil.TempDir("", "etcdraft-test") + gt.Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(tempDir) + ordererTLSPath := filepath.Join(cryptoPath, "ordererOrganizations", "example.com", "orderers", "127.0.0.1.example.com", "tls") - genesisBlockPath := generateBootstrapBlock(gt, tempDir, configtxgen, "system", "SampleEtcdRaftSystemChannel") + genesisBlockPath := generateBootstrapBlock(gt, configPath, configtxgen, "system", "SampleEtcdRaftSystemChannel") cmd := exec.Command(orderer) cmd.Env = []string{ @@ -224,7 +284,7 @@ func testEtcdRaftOSNNoTLSDualListener(gt *GomegaWithT, tempDir, orderer string, fmt.Sprintf("ORDERER_GENERAL_CLUSTER_ROOTCAS=[%s]", filepath.Join(ordererTLSPath, "ca.crt")), fmt.Sprintf("ORDERER_CONSENSUS_WALDIR=%s", filepath.Join(tempDir, "wal")), fmt.Sprintf("ORDERER_CONSENSUS_SNAPDIR=%s", filepath.Join(tempDir, "snapshot")), - fmt.Sprintf("FABRIC_CFG_PATH=%s", tempDir), + fmt.Sprintf("FABRIC_CFG_PATH=%s", configPath), "ORDERER_OPERATIONS_LISTENADDRESS=127.0.0.1:0", } ordererProcess, err := gexec.Start(cmd, nil, nil) @@ -235,7 +295,7 @@ func testEtcdRaftOSNNoTLSDualListener(gt *GomegaWithT, tempDir, orderer string, gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("becomeLeader")) } -func launchOrderer(gt *GomegaWithT, orderer, tempDir, genesisBlockPath, cryptoPath, bootstrapMethod string) *gexec.Session { +func launchOrderer(gt *GomegaWithT, orderer, tempDir, configPath, genesisBlockPath, cryptoPath, bootstrapMethod, channelParticipationEnabled string) *gexec.Session { ordererTLSPath := filepath.Join(cryptoPath, "ordererOrganizations", "example.com", "orderers", "127.0.0.1.example.com", "tls") // Launch the orderer process cmd := exec.Command(orderer) @@ -260,7 +320,9 @@ func launchOrderer(gt *GomegaWithT, orderer, tempDir, genesisBlockPath, cryptoPa fmt.Sprintf("ORDERER_GENERAL_TLS_PRIVATEKEY=%s", filepath.Join(ordererTLSPath, "server.key")), fmt.Sprintf("ORDERER_CONSENSUS_WALDIR=%s", filepath.Join(tempDir, "wal")), fmt.Sprintf("ORDERER_CONSENSUS_SNAPDIR=%s", filepath.Join(tempDir, "snapshot")), - fmt.Sprintf("FABRIC_CFG_PATH=%s", tempDir), + fmt.Sprintf("ORDERER_CHANNELPARTICIPATION_ENABLED=%s", channelParticipationEnabled), + fmt.Sprintf("FABRIC_CFG_PATH=%s", configPath), + "FABRIC_LOGGING_SPEC=info", } sess, err := gexec.Start(cmd, nil, nil) gt.Expect(err).NotTo(HaveOccurred()) diff --git a/orderer/common/server/main.go b/orderer/common/server/main.go index 96f729a5f5c..7baf0c1c63a 100644 --- a/orderer/common/server/main.go +++ b/orderer/common/server/main.go @@ -129,7 +129,9 @@ func Main() { } else { logger.Infof("Not bootstrapping the system channel because the bootstrap block number is %d (>0), replication is needed", bootstrapBlock.Header.Number) } - } else if conf.General.BootstrapMethod != "none" { + } else if conf.General.BootstrapMethod == "none" { + bootstrapBlock = initSystemChannelWithJoinBlock(conf, cryptoProvider, lf) + } else { logger.Panicf("Unknown bootstrap method: %s", conf.General.BootstrapMethod) } @@ -181,12 +183,15 @@ func Main() { // If the orderer has a system channel and is of cluster type, it may have to replicate first. if clusterBootBlock != nil && isClusterType { - // Are we bootstrapping with a clusterBootBlock with number >0 ? If yes, perform replication. + // When we are bootstrapping with a clusterBootBlock with number >0, replication will be performed. // Only clusters that are equipped with a recent config block (number i.e. >0) can replicate. // This will replicate all channels if the clusterBootBlock number > system-channel height (i.e. there is a gap in the ledger). repInitiator = onboarding.NewReplicationInitiator(lf, clusterBootBlock, conf, clusterClientConfig.SecOpts, signer, cryptoProvider) - if conf.General.BootstrapMethod == "file" { - repInitiator.ReplicateIfNeeded(clusterBootBlock) + repInitiator.ReplicateIfNeeded(clusterBootBlock) + // With BootstrapMethod == "none", the bootstrapBlock comes from a join-block. If it exists, we need to remove + // the system channel join-block from the filerepo. + if conf.General.BootstrapMethod == "none" && bootstrapBlock != nil { + discardSystemChannelJoinBlock(conf, bootstrapBlock) } } @@ -282,6 +287,80 @@ func Main() { } } +// Searches whether there is a join block for a system channel, and if there is, and it is a genesis block, +// initializes the ledger with it. Returns the join-block if it finds one. +func initSystemChannelWithJoinBlock( + config *localconfig.TopLevel, + cryptoProvider bccsp.BCCSP, + lf blockledger.Factory, +) (bootstrapBlock *cb.Block) { + if !config.ChannelParticipation.Enabled { + return nil + } + + joinBlockFileRepo, err := multichannel.InitJoinBlockFileRepo(config) + if err != nil { + logger.Panicf("Failed initializing join-block file repo: %v", err) + } + + joinBlockFiles, err := joinBlockFileRepo.List() + if err != nil { + logger.Panicf("Failed listing join-block file repo: %v", err) + } + + var systemChannelID string + for _, fileName := range joinBlockFiles { + channelName := joinBlockFileRepo.FileToBaseName(fileName) + blockBytes, err := joinBlockFileRepo.Read(channelName) + if err != nil { + logger.Panicf("Failed reading join-block for channel '%s', error: %v", channelName, err) + } + block, err := protoutil.UnmarshalBlock(blockBytes) + if err != nil { + logger.Panicf("Failed unmarshalling join-block for channel '%s', error: %v", channelName, err) + } + if err = onboarding.ValidateBootstrapBlock(block, cryptoProvider); err == nil { + bootstrapBlock = block + systemChannelID = channelName + break + } + } + + if bootstrapBlock == nil { + logger.Info("No join-block was found for the system channel") + return bootstrapBlock + } + + if bootstrapBlock.Header.Number == 0 { + initializeBootstrapChannel(bootstrapBlock, lf) + } + + logger.Infof("Join-block was found for the system channel: %s, number: %d", systemChannelID, bootstrapBlock.Header.Number) + return bootstrapBlock +} + +func discardSystemChannelJoinBlock( + config *localconfig.TopLevel, + bootstrapBlock *cb.Block, +) { + if !config.ChannelParticipation.Enabled { + return + } + + systemChannelName, err := protoutil.GetChannelIDFromBlock(bootstrapBlock) + if err != nil { + logger.Panicf("Failed to extract system channel name from join-block: %s", err) + } + joinBlockFileRepo, err := multichannel.InitJoinBlockFileRepo(config) + if err != nil { + logger.Panicf("Failed initializing join-block file repo: %v", err) + } + err = joinBlockFileRepo.Remove(systemChannelName) + if err != nil { + logger.Panicf("Failed to remove join-block for system channel: %s", err) + } +} + func reuseListener(conf *localconfig.TopLevel) bool { clusterConf := conf.General.Cluster // If listen address is not configured, and the TLS certificate isn't configured, @@ -343,6 +422,7 @@ func extractSystemChannel(lf blockledger.Factory, bccsp bccsp.BCCSP) *cb.Block { err = onboarding.ValidateBootstrapBlock(channelConfigBlock, bccsp) if err == nil { + logger.Infof("Found system channel config block, number: %d", channelConfigBlock.Header.Number) return channelConfigBlock } } @@ -627,10 +707,12 @@ func initializeBootstrapChannel(genesisBlock *cb.Block, lf blockledger.Factory) if err != nil { logger.Fatal("Failed to create the system channel:", err) } - - if err := gl.Append(genesisBlock); err != nil { - logger.Fatal("Could not write genesis block to ledger:", err) + if gl.Height() == 0 { + if err := gl.Append(genesisBlock); err != nil { + logger.Fatal("Could not write genesis block to ledger:", err) + } } + logger.Infof("Initialized the system channel '%s' from bootstrap block", channelID) } func isClusterType(genesisBlock *cb.Block, bccsp bccsp.BCCSP) bool { diff --git a/orderer/common/server/main_test.go b/orderer/common/server/main_test.go index 99456948dea..7bdfb867247 100644 --- a/orderer/common/server/main_test.go +++ b/orderer/common/server/main_test.go @@ -5,6 +5,8 @@ package server import ( "fmt" + "github.com/hyperledger/fabric/bccsp" + "github.com/hyperledger/fabric/orderer/common/filerepo" "io/ioutil" "net" "net/http" @@ -297,6 +299,101 @@ func TestExtractBootstrapBlock(t *testing.T) { } } +func TestInitSystemChannelWithJoinBlock(t *testing.T) { + configPathCleanup := configtest.SetDevFabricConfigPath(t) + defer configPathCleanup() + genesisFile := produceGenesisFile(t, genesisconfig.SampleSingleMSPSoloProfile, "testchannelid") + defer os.Remove(genesisFile) + + var ( + config *localconfig.TopLevel + cryptoProvider bccsp.BCCSP + ledgerFactory blockledger.Factory + fileRepo *filerepo.Repo + genesisBytes []byte + ) + + setup := func() func() { + fileLedgerLocation, err := ioutil.TempDir("", "main_test-") + require.NoError(t, err) + + config = &localconfig.TopLevel{ + General: localconfig.General{ + BootstrapMethod: "none", + }, + FileLedger: localconfig.FileLedger{ + Location: fileLedgerLocation, + }, + ChannelParticipation: localconfig.ChannelParticipation{Enabled: true}, + } + + cryptoProvider, err = sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore()) + require.NoError(t, err) + + ledgerFactory, err = createLedgerFactory(config, &disabled.Provider{}) + require.NoError(t, err) + + fileRepo, err = multichannel.InitJoinBlockFileRepo(config) + require.NoError(t, err) + require.NotNil(t, fileRepo) + + genesisBytes, err = ioutil.ReadFile(genesisFile) + require.NoError(t, err) + require.NotNil(t, genesisBytes) + + return func() { + os.RemoveAll(fileLedgerLocation) + } + } + + t.Run("No join-block", func(t *testing.T) { + cleanup := setup() + defer cleanup() + + bootstrapBlock := initSystemChannelWithJoinBlock(config, cryptoProvider, ledgerFactory) + require.Nil(t, bootstrapBlock) + ledger, err := ledgerFactory.GetOrCreate("testchannelid") + require.NoError(t, err) + require.Equal(t, uint64(0), ledger.Height()) + }) + + t.Run("With genesis join-block", func(t *testing.T) { + cleanup := setup() + defer cleanup() + + err := fileRepo.Save("testchannelid", genesisBytes) + require.NoError(t, err) + bootstrapBlock := initSystemChannelWithJoinBlock(config, cryptoProvider, ledgerFactory) + require.NotNil(t, bootstrapBlock) + ledger, err := ledgerFactory.GetOrCreate("testchannelid") + require.NoError(t, err) + require.Equal(t, uint64(1), ledger.Height()) + // Again, ledger already exists + bootstrapBlock = initSystemChannelWithJoinBlock(config, cryptoProvider, ledgerFactory) + require.NotNil(t, bootstrapBlock) + ledger, err = ledgerFactory.GetOrCreate("testchannelid") + require.NoError(t, err) + require.Equal(t, uint64(1), ledger.Height()) + }) + + t.Run("With non-genesis join-block", func(t *testing.T) { + cleanup := setup() + defer cleanup() + + block := protoutil.UnmarshalBlockOrPanic(genesisBytes) + block.Header.Number = 7 + configBlockBytes := protoutil.MarshalOrPanic(block) + err := fileRepo.Save("testchannelid", configBlockBytes) + require.NoError(t, err) + bootstrapBlock := initSystemChannelWithJoinBlock(config, cryptoProvider, ledgerFactory) + require.NotNil(t, bootstrapBlock) + ledger, err := ledgerFactory.GetOrCreate("testchannelid") + require.NoError(t, err) + require.Equal(t, uint64(0), ledger.Height()) + }) + +} + func TestExtractSysChanLastConfig(t *testing.T) { tmpdir, err := ioutil.TempDir("", "main_test-") require.NoError(t, err) diff --git a/orderer/common/server/testdata/orderer.yaml b/orderer/common/server/testdata/orderer.yaml index 24ab5204d96..56b0f5c3f70 100644 --- a/orderer/common/server/testdata/orderer.yaml +++ b/orderer/common/server/testdata/orderer.yaml @@ -314,6 +314,22 @@ Metrics: # The prefix is prepended to all emitted statsd metrics Prefix: +################################################################################ +# +# Channel participation API Configuration +# +# - This provides the channel participation API configuration for the orderer. +# - Channel participation uses the same ListenAddress and TLS settings of the +# Operations service. +# +################################################################################ +ChannelParticipation: + # Channel participation API is enabled. + Enabled: false + + # The maximum size of the request body when joining a channel. + MaxRequestBodySize: 1 MB + ################################################################################ # # Consensus Configuration