Skip to content

Commit

Permalink
removed the request from the pool if the check showed that the propos…
Browse files Browse the repository at this point in the history
…al is bad (#4572)

Signed-off-by: Fedor Partanskiy <pfi79@mail.ru>
  • Loading branch information
pfi79 authored Dec 25, 2023
1 parent 1b21b0b commit 073b570
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
110 changes: 110 additions & 0 deletions integration/smartbft/smartbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -1809,6 +1810,115 @@ var _ = Describe("EndToEnd Smart BFT configuration test", func() {
By("invoking the chaincode")
invokeQuery(network, peer, network.Orderers[1], channel, 90)
})

It("send an update transaction to each orderer and deleting failed requests", func() {
channel := "testchannel1"
By("Create network")
networkConfig := nwo.MultiNodeSmartBFT()
networkConfig.Channels = nil
network = nwo.New(networkConfig, testDir, client, StartPort(), components)
network.GenerateConfigTree()
network.Bootstrap()
network.EventuallyTimeout *= 2

By("Start orderers")
var ordererRunners []*ginkgomon.Runner
for _, orderer := range network.Orderers {
runner := network.OrdererRunner(orderer,
"FABRIC_LOGGING_SPEC=orderer.common.cluster=debug:orderer.consensus.smartbft=debug:policies.ImplicitOrderer=debug",
"ORDERER_GENERAL_BACKOFF_MAXDELAY=20s")
ordererRunners = append(ordererRunners, runner)
proc := ifrit.Invoke(runner)
ordererProcesses = append(ordererProcesses, proc)
Eventually(proc.Ready(), network.EventuallyTimeout).Should(BeClosed())
}

By("Start peer")
peerRunner := network.PeerGroupRunner()
peerProcesses = ifrit.Invoke(peerRunner)
Eventually(peerProcesses.Ready(), network.EventuallyTimeout).Should(BeClosed())
peer := network.Peer("Org1", "peer0")

By("Join network to channel")
joinChannel(network, channel)

By("Waiting for followers to see the leader")
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))
Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))
Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Message from 1"))

By("Joining peers to channel")
orderer1 := network.Orderers[0]
network.JoinChannel(channel, orderer1, network.PeersWithChannel(channel)...)

By("Deploying chaincode")
deployChaincode(network, channel, testDir)

By("querying the chaincode")
sess, err := network.PeerUserSession(peer, "User1", commands.ChaincodeQuery{
ChannelID: channel,
Name: "mycc",
Ctor: `{"Args":["query","a"]}`,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say("100"))

By("invoking the chaincode")
invokeQuery(network, peer, network.Orderers[1], channel, 90)

By("Changing the channel config, e.g. maximum number of bytes in a 1 MB batch")
newAbsoluteMaxBytes := 1_000_000
config := nwo.GetConfig(network, peer, orderer1, channel)
updatedConfig := proto.Clone(config).(*common.Config)
batchSizeConfigValue := updatedConfig.ChannelGroup.Groups["Orderer"].Values["BatchSize"]
batchSizeValue := &ordererProtos.BatchSize{}
Expect(proto.Unmarshal(batchSizeConfigValue.Value, batchSizeValue)).To(Succeed())
batchSizeValue.AbsoluteMaxBytes = uint32(newAbsoluteMaxBytes)
updatedConfig.ChannelGroup.Groups["Orderer"].Values["BatchSize"] = &common.ConfigValue{
ModPolicy: "Admins",
Value: protoutil.MarshalOrPanic(batchSizeValue),
}

tempDir, err := os.MkdirTemp(network.RootDir, "updateConfig")
Expect(err).NotTo(HaveOccurred())
updateFile := filepath.Join(tempDir, "update.pb")
defer os.RemoveAll(tempDir)

nwo.ComputeUpdateOrdererConfig(updateFile, network, channel, config, updatedConfig, peer, orderer1)

fileData, err := os.ReadFile(updateFile)
Expect(err).NotTo(HaveOccurred())

ctxEnv, err := protoutil.UnmarshalEnvelope(fileData)
Expect(err).NotTo(HaveOccurred())

By("Begin broadcast")
var wg sync.WaitGroup
for i := range network.Orderers {
wg.Add(1)
go func(ccid int) {
defer wg.Done()
resp, err := ordererclient.Broadcast(network, network.Orderers[ccid], ctxEnv)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Status).To(Equal(common.Status_SUCCESS))
}(i)
}
wg.Wait()
By("End broadcast")

// move the logger cursor to the beginning of the broadcast
Eventually(ordererRunners[0].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg"))
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg"))
Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg"))
Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("verifyConfigUpdateMsg"))

// the channel update transaction will have to be deleted from each orderer
Eventually(ordererRunners[0].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest"))
Eventually(ordererRunners[1].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest"))
Eventually(ordererRunners[2].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest"))
Eventually(ordererRunners[3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("deleteRequest"))
})
})
})

Expand Down
8 changes: 8 additions & 0 deletions orderer/consensus/smartbft/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,13 @@ func (c *BFTChain) pruneCommittedRequests(block *cb.Block) {
wg.Wait()
}

func (c *BFTChain) pruneBadRequests() {
c.consensus.Pool.Prune(func(req []byte) error {
_, err := c.consensus.Verifier.VerifyRequest(req)
return err
})
}

func (c *BFTChain) submit(env *cb.Envelope, configSeq uint64) error {
reqBytes, err := proto.Marshal(env)
if err != nil {
Expand Down Expand Up @@ -566,6 +573,7 @@ func (c *BFTChain) updateRuntimeConfig(block *cb.Block) types.Reconfig {
if protoutil.IsConfigBlock(block) {
c.Comm.Configure(c.Channel, newRTC.RemoteNodes)
c.clusterService.ConfigureNodeCerts(c.Channel, newRTC.consenters)
c.pruneBadRequests()
}

membershipDidNotChange := reflect.DeepEqual(newRTC.Nodes, prevRTC.Nodes)
Expand Down

0 comments on commit 073b570

Please sign in to comment.