Skip to content

Commit

Permalink
Static leader should not give up retrieving blocks
Browse files Browse the repository at this point in the history
FAB-17327

Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
  • Loading branch information
wlahti authored and sykesm committed Jan 22, 2020
1 parent f0ea825 commit cae8b55
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 13 deletions.
6 changes: 5 additions & 1 deletion core/deliverservice/deliveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type deliverClient struct {
// how it verifies messages received from it,
// and how it disseminates the messages to other peers
type Config struct {
IsStaticLeader bool
// ConnFactory returns a function that creates a connection to an endpoint
ConnFactory func(channelID string, endpointOverrides map[string]*comm.OrdererEndpoint) func(endpointCriteria comm.EndpointCriteria) (*grpc.ClientConn, error)
// ABCFactory creates an AtomicBroadcastClient out of a connection
Expand Down Expand Up @@ -292,7 +293,10 @@ func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocks
}
backoffPolicy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
if elapsedTime >= reconnectTotalTimeThreshold {
return 0, false
if !d.conf.IsStaticLeader {
return 0, false
}
logger.Warning("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold")
}
sleepIncrement := float64(time.Millisecond * 500)
attempt := float64(attemptNum)
Expand Down
15 changes: 9 additions & 6 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ type DeliveryServiceFactory interface {
}

type deliveryFactoryImpl struct {
isStaticLeader bool
}

// Returns an instance of delivery client
func (*deliveryFactoryImpl) Service(g GossipService, ec OrdererAddressConfig, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
func (d *deliveryFactoryImpl) Service(g GossipService, ec OrdererAddressConfig, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
return deliverclient.NewDeliverService(&deliverclient.Config{
CryptoSvc: mcs,
Gossip: g,
ConnFactory: deliverclient.DefaultConnectionFactory,
ABCFactory: deliverclient.DefaultABCFactory,
IsStaticLeader: d.isStaticLeader,
CryptoSvc: mcs,
Gossip: g,
ConnFactory: deliverclient.DefaultConnectionFactory,
ABCFactory: deliverclient.DefaultABCFactory,
}, deliverclient.ConnectionCriteria{
OrdererEndpointsByOrg: ec.AddressesByOrg,
Organizations: ec.Organizations,
Expand Down Expand Up @@ -150,6 +152,7 @@ func InitGossipService(
mcs api.MessageCryptoService,
secAdv api.SecurityAdvisor,
secureDialOpts api.PeerSecureDialOpts,
orgLeader bool,
bootPeers ...string,
) error {
// TODO: Remove this.
Expand All @@ -162,7 +165,7 @@ func InitGossipService(
endpoint,
s,
certs,
&deliveryFactoryImpl{},
&deliveryFactoryImpl{isStaticLeader: orgLeader},
mcs,
secAdv,
secureDialOpts,
Expand Down
6 changes: 3 additions & 3 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestInitGossipService(t *testing.T) {
messageCryptoService := peergossip.NewMCS(&mocks.ChannelPolicyManagerGetter{}, localmsp.NewSigner(), mgmt.NewDeserializersManager())
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
err := InitGossipService(identity, &disabled.Provider{}, endpoint, grpcServer, nil,
messageCryptoService, secAdv, nil)
messageCryptoService, secAdv, nil, false)
assert.NoError(t, err)
}()
}
Expand Down Expand Up @@ -832,7 +832,7 @@ func TestInvalidInitialization(t *testing.T) {

secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
err := InitGossipService(api.PeerIdentityType("IDENTITY"), &disabled.Provider{}, endpoint, grpcServer, nil,
&naiveCryptoService{}, secAdv, nil)
&naiveCryptoService{}, secAdv, nil, false)
assert.NoError(t, err)
gService := GetGossipService().(*gossipServiceImpl)
defer gService.Stop()
Expand All @@ -859,7 +859,7 @@ func TestChannelConfig(t *testing.T) {

secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
error := InitGossipService(api.PeerIdentityType("IDENTITY"), &disabled.Provider{}, endpoint, grpcServer, nil,
&naiveCryptoService{}, secAdv, nil)
&naiveCryptoService{}, secAdv, nil, false)
assert.NoError(t, error)
gService := GetGossipService().(*gossipServiceImpl)
defer gService.Stop()
Expand Down
47 changes: 44 additions & 3 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,48 @@ var _ = Describe("EndToEnd", func() {
})

Describe("basic single node etcdraft network", func() {
var (
peerRunners []*ginkgomon.Runner
processes map[string]ifrit.Process
ordererProcess ifrit.Process
)

BeforeEach(func() {
network = nwo.New(nwo.MultiChannelEtcdRaft(), testDir, client, BasePort(), components)
network.GenerateConfigTree()
for _, peer := range network.Peers {
core := network.ReadPeerConfig(peer)
core.Peer.Gossip.UseLeaderElection = false
core.Peer.Gossip.OrgLeader = true
core.Peer.Deliveryclient.ReconnectTotalTimeThreshold = time.Duration(time.Second)
network.WritePeerConfig(peer, core)
}
network.Bootstrap()

networkRunner := network.NetworkGroupRunner()
process = ifrit.Invoke(networkRunner)
Eventually(process.Ready(), network.EventuallyTimeout).Should(BeClosed())
ordererRunner := network.OrdererGroupRunner()
ordererProcess = ifrit.Invoke(ordererRunner)
Eventually(ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed())

peerRunners = make([]*ginkgomon.Runner, len(network.Peers))
processes = map[string]ifrit.Process{}
for i, peer := range network.Peers {
pr := network.PeerRunner(peer)
peerRunners[i] = pr
p := ifrit.Invoke(pr)
processes[peer.ID()] = p
Eventually(p.Ready(), network.EventuallyTimeout).Should(BeClosed())
}
})

AfterEach(func() {
if ordererProcess != nil {
ordererProcess.Signal(syscall.SIGTERM)
Eventually(ordererProcess.Wait(), network.EventuallyTimeout).Should(Receive())
}
for _, p := range processes {
p.Signal(syscall.SIGTERM)
Eventually(p.Wait(), network.EventuallyTimeout).Should(Receive())
}
})

It("creates two channels with two orgs trying to reconfigure and update metadata", func() {
Expand Down Expand Up @@ -223,6 +257,13 @@ var _ = Describe("EndToEnd", func() {
Expect(err).NotTo(HaveOccurred())
Expect(len(files)).To(Equal(numOfSnaps))

By("ensuring that static leaders do not give up on retrieving blocks after the orderer goes down")
ordererProcess.Signal(syscall.SIGTERM)
Eventually(ordererProcess.Wait(), network.EventuallyTimeout).Should(Receive())
for _, peerRunner := range peerRunners {
Eventually(peerRunner.Err(), network.EventuallyTimeout).Should(gbytes.Say("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold"))
}

})
})

Expand Down
2 changes: 2 additions & 0 deletions peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ func initGossipService(policyMgr policies.ChannelPolicyManagerGetter, metricsPro
)
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
bootstrap := viper.GetStringSlice("peer.gossip.bootstrap")
orgLeader := viper.GetBool("peer.gossip.orgLeader")

return service.InitGossipService(
serializedIdentity,
Expand All @@ -902,6 +903,7 @@ func initGossipService(policyMgr policies.ChannelPolicyManagerGetter, metricsPro
messageCryptoService,
secAdv,
secureDialOpts,
orgLeader,
bootstrap...,
)
}
Expand Down

0 comments on commit cae8b55

Please sign in to comment.