From 30a11e0eda0e29240e186398338a701c4bb6fde9 Mon Sep 17 00:00:00 2001 From: Artem Barger Date: Sun, 18 Jul 2021 00:07:21 +0300 Subject: [PATCH] [FAB-18521] Replicate block metadata with block while OSN catching up While OSN catches up replicating block from the up-to-date replica the metadata information omitted, i.e. `` c.support.WriteBlock(block, nil) `` where `nil` substitutes for block's metadata. In this commit, the consenters metadata extracted from the replicated block and being written with the block. Signed-off-by: Artem Barger --- integration/raft/cft_test.go | 218 +++++++++++++++++++++++++++- orderer/consensus/etcdraft/chain.go | 10 +- 2 files changed, 222 insertions(+), 6 deletions(-) diff --git a/integration/raft/cft_test.go b/integration/raft/cft_test.go index 05a4428da4d..f3f8279e02d 100644 --- a/integration/raft/cft_test.go +++ b/integration/raft/cft_test.go @@ -27,6 +27,7 @@ import ( conftx "github.com/hyperledger/fabric-config/configtx" "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/msp" + "github.com/hyperledger/fabric-protos-go/orderer/etcdraft" "github.com/hyperledger/fabric/cmd/common/signer" "github.com/hyperledger/fabric/common/configtx" "github.com/hyperledger/fabric/common/util" @@ -233,13 +234,222 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() { env = CreateBroadcastEnvelope(network, o1, channelID, make([]byte, 1000)) resp, err := ordererclient.Broadcast(network, o1, env) Expect(err).NotTo(HaveOccurred()) - Expect(resp.Status).To(Equal(common.Status_SUCCESS)) + Eventually(resp.Status, network.EventuallyTimeout).Should(Equal(common.Status_SUCCESS)) - blko1 := FetchBlock(network, o1, 5, channelID) - blko2 := FetchBlock(network, o2, 5, channelID) + for i := 1; i <= 5; i++ { + blko1 := FetchBlock(network, o1, uint64(i), channelID) + blko2 := FetchBlock(network, o2, uint64(i), channelID) - Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash)) + Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash)) + metao1, err := protoutil.GetConsenterMetadataFromBlock(blko1) + Expect(err).NotTo(HaveOccurred()) + metao2, err := protoutil.GetConsenterMetadataFromBlock(blko2) + Expect(err).NotTo(HaveOccurred()) + + bmo1 := &etcdraft.BlockMetadata{} + proto.Unmarshal(metao1.Value, bmo1) + bmo2 := &etcdraft.BlockMetadata{} + proto.Unmarshal(metao2.Value, bmo2) + + Expect(bmo2).To(Equal(bmo1)) + } }) + + It("catches up and replicates consenters metadata", func() { + network = nwo.New(nwo.MultiNodeEtcdRaft(), testDir, client, StartPort(), components) + orderers := []*nwo.Orderer{network.Orderer("orderer1"), network.Orderer("orderer2"), network.Orderer("orderer3")} + peer = network.Peer("Org1", "peer0") + + network.GenerateConfigTree() + network.Bootstrap() + + ordererRunners := []*ginkgomon.Runner{} + orderersMembers := grouper.Members{} + for _, o := range orderers { + runner := network.OrdererRunner(o) + ordererRunners = append(ordererRunners, runner) + orderersMembers = append(orderersMembers, grouper.Member{ + Name: o.ID(), + Runner: runner, + }) + } + + By("Starting ordering service cluster") + ordererGroup := grouper.NewParallel(syscall.SIGTERM, orderersMembers) + ordererProc = ifrit.Invoke(ordererGroup) + Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + + By("Setting up new OSN to be added to the cluster") + o4 := &nwo.Orderer{ + Name: "orderer4", + Organization: "OrdererOrg", + } + ports := nwo.Ports{} + for _, portName := range nwo.OrdererPortNames() { + ports[portName] = network.ReservePort() + } + + network.PortsByOrdererID[o4.ID()] = ports + network.Orderers = append(network.Orderers, o4) + network.GenerateOrdererConfig(o4) + extendNetwork(network) + + ordererCertificatePath := filepath.Join(network.OrdererLocalTLSDir(o4), "server.crt") + ordererCert, err := ioutil.ReadFile(ordererCertificatePath) + Expect(err).NotTo(HaveOccurred()) + + By("Adding new ordering service node") + addConsenter(network, peer, orderers[0], "systemchannel", etcdraft.Consenter{ + ServerTlsCert: ordererCert, + ClientTlsCert: ordererCert, + Host: "127.0.0.1", + Port: uint32(network.OrdererPort(o4, nwo.ClusterPort)), + }) + + // Get the last config block of the system channel + configBlock := nwo.GetConfigBlock(network, peer, orderers[0], "systemchannel") + // Plant it in the file system of orderer, the new node to be onboarded. + err = ioutil.WriteFile(filepath.Join(testDir, "systemchannel_block.pb"), protoutil.MarshalOrPanic(configBlock), 0o644) + + Expect(err).NotTo(HaveOccurred()) + By("Starting new ordering service node") + r4 := network.OrdererRunner(o4) + orderers = append(orderers, o4) + ordererRunners = append(ordererRunners, r4) + o4process := ifrit.Invoke(r4) + Eventually(o4process.Ready(), network.EventuallyTimeout).Should(BeClosed()) + + By("Pick ordering service node to be evicted") + victimIdx := findLeader(ordererRunners) - 1 + victim := orderers[victimIdx] + victimCertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(victim), "server.crt")) + Expect(err).NotTo(HaveOccurred()) + + assertBlockReception(map[string]int{ + "systemchannel": 1, + }, orderers, peer, network) + + By("Removing OSN from the channel") + remainedOrderers := []*nwo.Orderer{} + remainedRunners := []*ginkgomon.Runner{} + + for i, o := range orderers { + if i == victimIdx { + continue + } + remainedOrderers = append(remainedOrderers, o) + remainedRunners = append(remainedRunners, ordererRunners[i]) + } + + removeConsenter(network, peer, remainedOrderers[0], "systemchannel", victimCertBytes) + + By("Asserting all remaining nodes got last block") + assertBlockReception(map[string]int{ + "systemchannel": 2, + }, remainedOrderers, peer, network) + By("Making sure OSN was evicted and configuration applied") + findLeader(remainedRunners) + + By("Restarting all nodes") + o4process.Signal(syscall.SIGTERM) + Eventually(o4process.Wait(), network.EventuallyTimeout).Should(Receive()) + ordererProc.Signal(syscall.SIGTERM) + Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive()) + + r1 := network.OrdererRunner(remainedOrderers[1]) + r2 := network.OrdererRunner(remainedOrderers[2]) + orderersMembers = grouper.Members{ + {Name: remainedOrderers[1].ID(), Runner: r1}, + {Name: remainedOrderers[2].ID(), Runner: r2}, + } + + ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers) + ordererProc = ifrit.Invoke(ordererGroup) + Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + findLeader([]*ginkgomon.Runner{r1, r2}) + + By("Submitting several transactions to trigger snapshot") + env := CreateBroadcastEnvelope(network, remainedOrderers[1], "systemchannel", make([]byte, 2000)) + for i := 3; i <= 10; i++ { + // Note that MaxMessageCount is 1 be default, so every tx results in a new block + resp, err := ordererclient.Broadcast(network, remainedOrderers[1], env) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Status).To(Equal(common.Status_SUCCESS)) + } + + assertBlockReception(map[string]int{ + "systemchannel": 10, + }, []*nwo.Orderer{remainedOrderers[1], remainedOrderers[2]}, peer, network) + + By("Clean snapshot folder of lagging behind node") + snapDir := path.Join(network.RootDir, "orderers", remainedOrderers[0].ID(), "etcdraft", "snapshot") + snapshots, err := ioutil.ReadDir(snapDir) + Expect(err).NotTo(HaveOccurred()) + + for _, snap := range snapshots { + os.RemoveAll(path.Join(snapDir, snap.Name())) + } + + ordererProc.Signal(syscall.SIGTERM) + Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive()) + + r0 := network.OrdererRunner(remainedOrderers[0]) + r1 = network.OrdererRunner(remainedOrderers[1]) + orderersMembers = grouper.Members{ + {Name: remainedOrderers[0].ID(), Runner: r0}, + {Name: remainedOrderers[1].ID(), Runner: r1}, + } + + ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers) + ordererProc = ifrit.Invoke(ordererGroup) + Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + findLeader([]*ginkgomon.Runner{r0, r1}) + + By("Asserting that orderer1 receives and persists snapshot") + Eventually(func() int { + files, err := ioutil.ReadDir(path.Join(snapDir, "systemchannel")) + Expect(err).NotTo(HaveOccurred()) + return len(files) + }, network.EventuallyTimeout).Should(BeNumerically(">", 0)) + + assertBlockReception(map[string]int{ + "systemchannel": 10, + }, []*nwo.Orderer{remainedOrderers[0]}, peer, network) + + By("Make sure we can restart and connect to orderer1 with orderer4") + ordererProc.Signal(syscall.SIGTERM) + Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive()) + + r0 = network.OrdererRunner(remainedOrderers[0]) + r2 = network.OrdererRunner(remainedOrderers[2]) + orderersMembers = grouper.Members{ + {Name: remainedOrderers[0].ID(), Runner: r0}, + {Name: remainedOrderers[2].ID(), Runner: r2}, + } + + ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderersMembers) + ordererProc = ifrit.Invoke(ordererGroup) + Eventually(ordererProc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + findLeader([]*ginkgomon.Runner{r0, r2}) + + for i := 1; i <= 10; i++ { + blko1 := FetchBlock(network, remainedOrderers[0], uint64(i), "systemchannel") + blko2 := FetchBlock(network, remainedOrderers[2], uint64(i), "systemchannel") + Expect(blko1.Header.DataHash).To(Equal(blko2.Header.DataHash)) + metao1, err := protoutil.GetConsenterMetadataFromBlock(blko1) + Expect(err).NotTo(HaveOccurred()) + metao2, err := protoutil.GetConsenterMetadataFromBlock(blko2) + Expect(err).NotTo(HaveOccurred()) + + bmo1 := &etcdraft.BlockMetadata{} + proto.Unmarshal(metao1.Value, bmo1) + bmo2 := &etcdraft.BlockMetadata{} + proto.Unmarshal(metao2.Value, bmo2) + + Expect(bmo2).To(Equal(bmo1)) + } + }) + }) When("The leader dies", func() { diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index 828ce99ec37..26c7a047264 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -985,12 +985,18 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error { } func (c *Chain) commitBlock(block *common.Block) { + // read consenters metadata to write into the replicated block + blockMeta, err := protoutil.GetConsenterMetadataFromBlock(block) + if err != nil { + c.logger.Panicf("Failed to obtain metadata: %s", err) + } + if !protoutil.IsConfigBlock(block) { - c.support.WriteBlock(block, nil) + c.support.WriteBlock(block, blockMeta.Value) return } - c.support.WriteConfigBlock(block, nil) + c.support.WriteConfigBlock(block, blockMeta.Value) configMembership := c.detectConfChange(block)