From 2b1cd0ed09baf1ad5ed60b2f6384c41ce6866cfd Mon Sep 17 00:00:00 2001 From: Yacov Manevich Date: Tue, 28 Nov 2023 22:04:59 +0100 Subject: [PATCH] Raft to BFT migration Signed-off-by: Yacov Manevich --- integration/raft/migration_test.go | 198 +++++++++++++++++- .../common/msgprocessor/maintenancefilter.go | 4 +- orderer/consensus/etcdraft/chain.go | 20 +- orderer/consensus/etcdraft/util.go | 26 +-- 4 files changed, 225 insertions(+), 23 deletions(-) diff --git a/integration/raft/migration_test.go b/integration/raft/migration_test.go index 5573a07724b..c489b38f394 100644 --- a/integration/raft/migration_test.go +++ b/integration/raft/migration_test.go @@ -15,6 +15,10 @@ import ( "syscall" "time" + "github.com/SmartBFT-Go/consensus/pkg/types" + "github.com/hyperledger/fabric-protos-go/orderer/smartbft" + "github.com/hyperledger/fabric/common/policies" + docker "github.com/fsouza/go-dockerclient" "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" @@ -39,10 +43,12 @@ var _ = Describe("ConsensusTypeMigration", func() { client *docker.Client network *nwo.Network - o1Proc, o2Proc, o3Proc ifrit.Process + o1Proc, o2Proc, o3Proc, o4Proc ifrit.Process o1Runner *ginkgomon.Runner o2Runner *ginkgomon.Runner + o3Runner *ginkgomon.Runner + o4Runner *ginkgomon.Runner ) BeforeEach(func() { @@ -55,7 +61,7 @@ var _ = Describe("ConsensusTypeMigration", func() { }) AfterEach(func() { - for _, oProc := range []ifrit.Process{o1Proc, o2Proc, o3Proc} { + for _, oProc := range []ifrit.Process{o1Proc, o2Proc, o3Proc, o4Proc} { if oProc != nil { oProc.Signal(syscall.SIGTERM) Eventually(oProc.Wait(), network.EventuallyTimeout).Should(Receive()) @@ -69,6 +75,125 @@ var _ = Describe("ConsensusTypeMigration", func() { _ = os.RemoveAll(testDir) }) + Describe("Raft to BFT migration", func() { + It("migrates from Raft to BFT", func() { + networkConfig := nwo.MultiNodeEtcdRaft() + networkConfig.Orderers = append(networkConfig.Orderers, &nwo.Orderer{Name: "orderer4", Organization: "OrdererOrg"}) + networkConfig.Profiles[0].Orderers = []string{"orderer1", "orderer2", "orderer3", "orderer4"} + + network = nwo.New(networkConfig, testDir, client, StartPort(), components) + + o1, o2, o3, o4 := network.Orderer("orderer1"), network.Orderer("orderer2"), network.Orderer("orderer3"), network.Orderer("orderer4") + network.GenerateConfigTree() + network.Bootstrap() + + runOrderers := func() { + o1Runner = network.OrdererRunner(o1) + o2Runner = network.OrdererRunner(o2) + o3Runner = network.OrdererRunner(o3) + o4Runner = network.OrdererRunner(o4) + + o1Runner.Command.Env = append(o1Runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug") + o2Runner.Command.Env = append(o2Runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug") + o3Runner.Command.Env = append(o3Runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug") + o4Runner.Command.Env = append(o4Runner.Command.Env, "FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug") + + o1Proc = ifrit.Invoke(o1Runner) + o2Proc = ifrit.Invoke(o2Runner) + o3Proc = ifrit.Invoke(o3Runner) + o4Proc = ifrit.Invoke(o4Runner) + + Eventually(o1Proc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + Eventually(o2Proc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + Eventually(o3Proc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + Eventually(o4Proc.Ready(), network.EventuallyTimeout).Should(BeClosed()) + } + + runOrderers() + + channelparticipation.JoinOrderersAppChannelCluster(network, "testchannel", o1, o2, o3, o4) + FindLeader([]*ginkgomon.Runner{o1Runner, o2Runner, o3Runner, o4Runner}) + + By("performing operation with orderer1") + env := CreateBroadcastEnvelope(network, o1, "testchannel", []byte("foo")) + resp, err := ordererclient.Broadcast(network, o1, env) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Status).To(Equal(common.Status_SUCCESS)) + + block := FetchBlock(network, o1, 1, "testchannel") + Expect(block).NotTo(BeNil()) + + By("Change to maintenance mode") + + peer := network.Peer("Org1", "peer0") + orderer := network.Orderer("orderer1") + + addBFTInConfigTree(network, peer, orderer, "testchannel") + + By("Config update on standard channel, State=MAINTENANCE, enter maintenance-mode") + config, updatedConfig := prepareTransition(network, peer, orderer, "testchannel", + "etcdraft", protosorderer.ConsensusType_STATE_NORMAL, + "etcdraft", nil, protosorderer.ConsensusType_STATE_MAINTENANCE) + nwo.UpdateOrdererConfig(network, orderer, "testchannel", config, updatedConfig, peer, orderer) + + bftMetadata := protoutil.MarshalOrPanic(&smartbft.Options{ + RequestBatchMaxCount: types.DefaultConfig.RequestBatchMaxCount, + RequestBatchMaxBytes: types.DefaultConfig.RequestBatchMaxBytes, + RequestBatchMaxInterval: types.DefaultConfig.RequestBatchMaxInterval.String(), + IncomingMessageBufferSize: types.DefaultConfig.IncomingMessageBufferSize, + RequestPoolSize: types.DefaultConfig.RequestPoolSize, + RequestForwardTimeout: types.DefaultConfig.RequestForwardTimeout.String(), + RequestComplainTimeout: types.DefaultConfig.RequestComplainTimeout.String(), + RequestAutoRemoveTimeout: types.DefaultConfig.RequestAutoRemoveTimeout.String(), + ViewChangeResendInterval: types.DefaultConfig.ViewChangeResendInterval.String(), + ViewChangeTimeout: types.DefaultConfig.ViewChangeTimeout.String(), + LeaderHeartbeatTimeout: types.DefaultConfig.LeaderHeartbeatTimeout.String(), + LeaderHeartbeatCount: types.DefaultConfig.LeaderHeartbeatCount, + CollectTimeout: types.DefaultConfig.CollectTimeout.String(), + SyncOnStart: types.DefaultConfig.SyncOnStart, + SpeedUpViewChange: types.DefaultConfig.SpeedUpViewChange, + }) + config, updatedConfig = prepareTransition(network, peer, orderer, "testchannel", + "etcdraft", protosorderer.ConsensusType_STATE_MAINTENANCE, + "BFT", bftMetadata, protosorderer.ConsensusType_STATE_MAINTENANCE) + nwo.UpdateOrdererConfig(network, orderer, "testchannel", config, updatedConfig, peer, orderer) + + time.Sleep(time.Second) // TODO: check block was committed in all orderers + + for _, oProc := range []ifrit.Process{o1Proc, o2Proc, o3Proc, o4Proc} { + if oProc != nil { + oProc.Signal(syscall.SIGTERM) + Eventually(oProc.Wait(), network.EventuallyTimeout).Should(Receive()) + } + } + + runOrderers() + + // TODO: + By("Waiting for followers to see the leader, again") + Eventually(o2Runner.Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1 channel=testchannel")) + Eventually(o3Runner.Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1 channel=testchannel")) + Eventually(o4Runner.Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1 channel=testchannel")) + + // Exit maintenance mode + + config, updatedConfig = prepareTransition(network, peer, orderer, "testchannel", + "BFT", protosorderer.ConsensusType_STATE_MAINTENANCE, + "BFT", bftMetadata, protosorderer.ConsensusType_STATE_NORMAL) + nwo.UpdateOrdererConfig(network, orderer, "testchannel", config, updatedConfig, peer, orderer) + + // Now, run a transaction to ensure BFT works. + env = CreateBroadcastEnvelope(network, o1, "testchannel", []byte("foo")) + resp, err = ordererclient.Broadcast(network, o1, env) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Status).To(Equal(common.Status_SUCCESS)) + + time.Sleep(time.Second * 5) + + // TODO: check block was successfully committed in all orderers + }) + }) + // These tests execute the migration config updates on an etcdraft based system, but do not restart the orderer // to a "future-type" consensus-type that currently does not exist. However, these tests try to maintain as much as // possible the testing infrastructure for consensus-type migration that existed when "solo" and "kafka" were still @@ -604,3 +729,72 @@ func createDeliverEnvelope(n *nwo.Network, signer *nwo.SigningIdentity, blkNum u return env } + +func addBFTInConfigTree(network *nwo.Network, peer *nwo.Peer, orderer *nwo.Orderer, channel string) { + config := nwo.GetConfig(network, peer, orderer, channel) + + updatedConfig := proto.Clone(config).(*common.Config) + + orderersVal := &common.Orderers{ + ConsenterMapping: computeConsenterMappings(network), + } + + policies.EncodeBFTBlockVerificationPolicy(orderersVal.ConsenterMapping, updatedConfig.ChannelGroup.Groups["Orderer"]) + + updatedConfig.ChannelGroup.Groups["Orderer"].Values["Orderers"] = &common.ConfigValue{ + Value: protoutil.MarshalOrPanic(orderersVal), + ModPolicy: "/Channel/Orderer/Admins", + } + + nwo.UpdateOrdererConfig(network, orderer, channel, config, updatedConfig, peer, orderer) +} + +func computeConsenterMappings(network *nwo.Network) []*common.Consenter { + var consenters []*common.Consenter + + for i, orderer := range network.Orderers { + ecertPath := network.OrdererCert(orderer) + ecert, err := os.ReadFile(ecertPath) + Expect(err).To(Not(HaveOccurred())) + + tlsDir := network.OrdererLocalTLSDir(orderer) + tlsPath := filepath.Join(tlsDir, "server.crt") + tlsCert, err := os.ReadFile(tlsPath) + Expect(err).To(Not(HaveOccurred())) + + consenters = append(consenters, &common.Consenter{ + ServerTlsCert: tlsCert, + ClientTlsCert: tlsCert, + MspId: network.Organization(orderer.Organization).MSPID, + Host: "127.0.0.1", + Port: uint32(network.PortsByOrdererID[orderer.ID()][nwo.ClusterPort]), + Id: uint32(i + 1), + Identity: ecert, + }) + } + + return consenters +} + +func CreateBroadcastEnvelope(n *nwo.Network, entity interface{}, channel string, data []byte) *common.Envelope { + var signer *nwo.SigningIdentity + switch creator := entity.(type) { + case *nwo.Peer: + signer = n.PeerUserSigner(creator, "Admin") + case *nwo.Orderer: + signer = n.OrdererUserSigner(creator, "Admin") + } + Expect(signer).NotTo(BeNil()) + + env, err := protoutil.CreateSignedEnvelope( + common.HeaderType_ENDORSER_TRANSACTION, + channel, + signer, + &common.Envelope{Payload: data}, + 0, + 0, + ) + Expect(err).NotTo(HaveOccurred()) + + return env +} diff --git a/orderer/common/msgprocessor/maintenancefilter.go b/orderer/common/msgprocessor/maintenancefilter.go index fa068dd703f..23f5eb617ab 100644 --- a/orderer/common/msgprocessor/maintenancefilter.go +++ b/orderer/common/msgprocessor/maintenancefilter.go @@ -46,9 +46,7 @@ func NewMaintenanceFilter(support MaintenanceFilterSupport, bccsp bccsp.BCCSP) * bccsp: bccsp, } mf.permittedTargetConsensusTypes["etcdraft"] = true - // Until we have a BFT consensus type, we use this for integration testing of consensus-type migration. - // Caution: proposing a config block with this type will cause panic. - mf.permittedTargetConsensusTypes["testing-only"] = true + mf.permittedTargetConsensusTypes["BFT"] = true return mf } diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index 371922822e6..5fd1db3c21e 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -1115,12 +1115,17 @@ func (c *Chain) commitBlock(block *common.Block) { func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges { // If config is targeting THIS channel, inspect consenter set and // propose raft ConfChange if it adds/removes node. - configMetadata := c.newConfigMetadata(block) + configMetadata, consensusType := c.newConfigMetadata(block) if configMetadata == nil { return nil } + if consensusType.Type != "etcdraft" { + c.logger.Infof("Detected migration to %s", consensusType.Type) + return nil + } + if configMetadata.Options != nil && configMetadata.Options.SnapshotIntervalSize != 0 && configMetadata.Options.SnapshotIntervalSize != c.sizeLimit { @@ -1425,12 +1430,12 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange { } // newMetadata extract config metadata from the configuration block -func (c *Chain) newConfigMetadata(block *common.Block) *etcdraft.ConfigMetadata { - metadata, err := ConsensusMetadataFromConfigBlock(block) +func (c *Chain) newConfigMetadata(block *common.Block) (*etcdraft.ConfigMetadata, *orderer.ConsensusType) { + metadata, consensusType, err := ConsensusMetadataFromConfigBlock(block) if err != nil { c.logger.Panicf("error reading consensus metadata: %s", err) } - return metadata + return metadata, consensusType } // ValidateConsensusMetadata determines the validity of a @@ -1446,6 +1451,11 @@ func (c *Chain) ValidateConsensusMetadata(oldOrdererConfig, newOrdererConfig cha return nil } + if newOrdererConfig.ConsensusType() != "etcdraft" { + // This is a migration, so we don't know how to validate this config change. + return nil + } + if oldOrdererConfig == nil { c.logger.Panic("Programming Error: ValidateConsensusMetadata called with nil old channel config") return nil @@ -1572,7 +1582,7 @@ func (c *Chain) checkForEvictionNCertRotation(env *common.Envelope) bool { return false } - configMeta, err := MetadataFromConfigUpdate(configUpdate) + configMeta, _, err := MetadataFromConfigUpdate(configUpdate) if err != nil || configMeta == nil { c.logger.Warnf("could not read config metadata: %s", err) return false diff --git a/orderer/consensus/etcdraft/util.go b/orderer/consensus/etcdraft/util.go index 53ec773313c..f734fc3c1b4 100644 --- a/orderer/consensus/etcdraft/util.go +++ b/orderer/consensus/etcdraft/util.go @@ -84,22 +84,22 @@ func MetadataHasDuplication(md *etcdraft.ConfigMetadata) error { } // MetadataFromConfigValue reads and translates configuration updates from config value into raft metadata -func MetadataFromConfigValue(configValue *common.ConfigValue) (*etcdraft.ConfigMetadata, error) { +func MetadataFromConfigValue(configValue *common.ConfigValue) (*etcdraft.ConfigMetadata, *orderer.ConsensusType, error) { consensusTypeValue := &orderer.ConsensusType{} if err := proto.Unmarshal(configValue.Value, consensusTypeValue); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal consensusType config update") + return nil, nil, errors.Wrap(err, "failed to unmarshal consensusType config update") } updatedMetadata := &etcdraft.ConfigMetadata{} if err := proto.Unmarshal(consensusTypeValue.Metadata, updatedMetadata); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal updated (new) etcdraft metadata configuration") + return nil, nil, errors.Wrap(err, "failed to unmarshal updated (new) etcdraft metadata configuration") } - return updatedMetadata, nil + return updatedMetadata, consensusTypeValue, nil } // MetadataFromConfigUpdate extracts consensus metadata from config update -func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMetadata, error) { +func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMetadata, *orderer.ConsensusType, error) { var baseVersion uint64 if update.ReadSet != nil && update.ReadSet.Groups != nil { if ordererConfigGroup, ok := update.ReadSet.Groups["Orderer"]; ok { @@ -115,13 +115,13 @@ func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMeta if baseVersion == val.Version { // Only if the version in the write set differs from the read-set // should we consider this to be an update to the consensus type - return nil, nil + return nil, nil, nil } return MetadataFromConfigValue(val) } } } - return nil, nil + return nil, nil, nil } // ConfigChannelHeader expects a config block and returns the header type @@ -168,28 +168,28 @@ func ConfigEnvelopeFromBlock(block *common.Block) (*common.Envelope, error) { } // ConsensusMetadataFromConfigBlock reads consensus metadata updates from the configuration block -func ConsensusMetadataFromConfigBlock(block *common.Block) (*etcdraft.ConfigMetadata, error) { +func ConsensusMetadataFromConfigBlock(block *common.Block) (*etcdraft.ConfigMetadata, *orderer.ConsensusType, error) { if block == nil { - return nil, errors.New("nil block") + return nil, nil, errors.New("nil block") } if !protoutil.IsConfigBlock(block) { - return nil, errors.New("not a config block") + return nil, nil, errors.New("not a config block") } configEnvelope, err := ConfigEnvelopeFromBlock(block) if err != nil { - return nil, errors.Wrap(err, "cannot read config update") + return nil, nil, errors.Wrap(err, "cannot read config update") } payload, err := protoutil.UnmarshalPayload(configEnvelope.Payload) if err != nil { - return nil, errors.Wrap(err, "failed to extract payload from config envelope") + return nil, nil, errors.Wrap(err, "failed to extract payload from config envelope") } // get config update configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload) if err != nil { - return nil, errors.Wrap(err, "could not read config update") + return nil, nil, errors.Wrap(err, "could not read config update") } return MetadataFromConfigUpdate(configUpdate)