From d73f193d0dd63f45b0b728b2029ff788b57346e9 Mon Sep 17 00:00:00 2001 From: Wenjian Qiao Date: Wed, 24 Jun 2020 17:30:44 -0400 Subject: [PATCH] [FAB-17539] Always remember anchor peers in membership (#1422) Gossip service removes a peer from its membership if the peer's alive message is expired. However, it should always remember the anchor peers and bootstrap peers in order for the peer to reconnect. Gossip already remembers bootstrap peers. This PR adds code to track all anchor peers' endpoints and updates the expiration callback function to not delete anchor peers. Signed-off-by: Wenjian Qiao --- gossip/discovery/discovery.go | 5 + gossip/discovery/discovery_impl.go | 37 ++-- gossip/discovery/discovery_test.go | 133 +++++++++++++- gossip/gossip/gossip.go | 3 +- gossip/gossip/gossip_impl.go | 7 +- gossip/gossip/gossip_test.go | 11 +- gossip/gossip/orgs_test.go | 4 +- gossip/integration/integration.go | 6 +- gossip/integration/integration_test.go | 8 +- gossip/service/gossip_service.go | 76 +++++--- gossip/service/gossip_service_test.go | 26 ++- gossip/service/join_test.go | 9 +- gossip/state/state_test.go | 5 +- integration/gossip/gossip_suite_test.go | 41 +++++ integration/gossip/gossip_test.go | 221 ++++++++++++++++++++++++ integration/nwo/core_template.go | 4 +- integration/nwo/fabricconfig/core.go | 2 + integration/nwo/network.go | 5 +- sampleconfig/core.yaml | 4 + 19 files changed, 535 insertions(+), 72 deletions(-) create mode 100644 integration/gossip/gossip_suite_test.go create mode 100644 integration/gossip/gossip_test.go diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index 7f4faff4982..9e977e6c86c 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -71,6 +71,11 @@ type CommService interface { IdentitySwitch() <-chan common.PKIidType } +// AnchorPeerTracker is an interface that is passed to discovery to check if an endpoint is an anchor peer +type AnchorPeerTracker interface { + IsAnchorPeer(endpoint string) bool +} + // NetworkMember is a peer's representation type NetworkMember struct { Endpoint string diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 3e4bb37da55..4a457945ac6 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -27,15 +27,8 @@ const DefAliveTimeInterval = 5 * time.Second const DefAliveExpirationTimeout = 5 * DefAliveTimeInterval const DefAliveExpirationCheckInterval = DefAliveExpirationTimeout / 10 const DefReconnectInterval = DefAliveExpirationTimeout -const msgExpirationFactor = 20 - -var maxConnectionAttempts = 120 - -// SetMaxConnAttempts sets the maximum number of connection -// attempts the peer would perform when invoking Connect() -func SetMaxConnAttempts(attempts int) { - maxConnectionAttempts = attempts -} +const DefMsgExpirationFactor = 20 +const DefMaxConnectionAttempts = 120 type timestamp struct { incTime time.Time @@ -75,8 +68,11 @@ type gossipDiscoveryImpl struct { aliveExpirationTimeout time.Duration aliveExpirationCheckInterval time.Duration reconnectInterval time.Duration + msgExpirationFactor int + maxConnectionAttempts int - bootstrapPeers []string + bootstrapPeers []string + anchorPeerTracker AnchorPeerTracker } type DiscoveryConfig struct { @@ -84,12 +80,14 @@ type DiscoveryConfig struct { AliveExpirationTimeout time.Duration AliveExpirationCheckInterval time.Duration ReconnectInterval time.Duration + MaxConnectionAttempts int + MsgExpirationFactor int BootstrapPeers []string } // NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy, - config DiscoveryConfig) Discovery { + config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) Discovery { d := &gossipDiscoveryImpl{ self: self, incTime: uint64(time.Now().UnixNano()), @@ -112,8 +110,11 @@ func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoServi aliveExpirationTimeout: config.AliveExpirationTimeout, aliveExpirationCheckInterval: config.AliveExpirationCheckInterval, reconnectInterval: config.ReconnectInterval, + maxConnectionAttempts: config.MaxConnectionAttempts, + msgExpirationFactor: config.MsgExpirationFactor, - bootstrapPeers: config.BootstrapPeers, + bootstrapPeers: config.BootstrapPeers, + anchorPeerTracker: anchorPeerTracker, } d.validateSelfConfig() @@ -149,7 +150,7 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) { d.logger.Debug("Entering", member) defer d.logger.Debug("Exiting") go func() { - for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ { + for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ { id, err := id() if err != nil { if d.toDie() { @@ -216,7 +217,7 @@ func (d *gossipDiscoveryImpl) validateSelfConfig() { func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *proto.SignedGossipMessage) { nonce := message.Nonce - for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ { + for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ { sub := d.pubsub.Subscribe(fmt.Sprintf("%d", nonce), time.Second*5) d.comm.SendToPeer(peer, message) if _, timeoutErr := sub.Listen(); timeoutErr == nil { @@ -1032,7 +1033,7 @@ type aliveMsgStore struct { func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore { policy := proto.NewGossipMessageComparator(0) trigger := func(m interface{}) {} - aliveMsgTTL := d.aliveExpirationTimeout * msgExpirationFactor + aliveMsgTTL := d.aliveExpirationTimeout * time.Duration(d.msgExpirationFactor) externalLock := func() { d.lock.Lock() } externalUnlock := func() { d.lock.Unlock() } callback := func(m interface{}) { @@ -1044,8 +1045,10 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore { id := membership.PkiId endpoint := membership.Endpoint internalEndpoint := msg.SecretEnvelope.InternalEndpoint() - if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) { - // Never remove a bootstrap peer + if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) || + d.anchorPeerTracker.IsAnchorPeer(endpoint) || d.anchorPeerTracker.IsAnchorPeer(internalEndpoint) { + // Never remove a bootstrap peer or an anchor peer + d.logger.Debugf("Do not remove bootstrap or anchor peer endpoint %s from membership", endpoint) return } d.logger.Infof("Removing member: Endpoint: %s, InternalEndpoint: %s, PKIID: %x", endpoint, internalEndpoint, id) diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index ca3dd5a1da5..d2f2d6bdf11 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -43,11 +43,13 @@ var defaultTestConfig = DiscoveryConfig{ AliveExpirationTimeout: 10 * aliveTimeInterval, AliveExpirationCheckInterval: aliveTimeInterval, ReconnectInterval: 10 * aliveTimeInterval, + MaxConnectionAttempts: DefMaxConnectionAttempts, + MsgExpirationFactor: DefMsgExpirationFactor, } func init() { util.SetupTestLogging() - maxConnectionAttempts = 10000 + defaultTestConfig.MaxConnectionAttempts = 10000 } type dummyReceivedMessage struct { @@ -75,6 +77,15 @@ func (*dummyReceivedMessage) Ack(err error) { panic("implement me") } +// mockAnchorPeerTracker implements AnchorPeerTracker interface +type mockAnchorPeerTracker struct { + apEndpoints []string +} + +func (m *mockAnchorPeerTracker) IsAnchorPeer(endpoint string) bool { + return util.Contains(endpoint, m.apEndpoints) +} + type dummyCommModule struct { validatedMessages chan *proto.SignedGossipMessage msgsReceived uint32 @@ -377,6 +388,12 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st } func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*proto.SignedGossipMessage), config DiscoveryConfig) *gossipInstance { + mockTracker := &mockAnchorPeerTracker{} + return createDiscoveryInstanceWithAnchorPeerTracker(port, id, bootstrapPeers, shouldGossip, pol, f, config, mockTracker) +} + +func createDiscoveryInstanceWithAnchorPeerTracker(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, + f func(*proto.SignedGossipMessage), config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) *gossipInstance { comm := &dummyCommModule{ conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), @@ -407,7 +424,8 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo s := grpc.NewServer() config.BootstrapPeers = bootstrapPeers - discSvc := NewDiscoveryService(self, comm, comm, pol, config) + + discSvc := NewDiscoveryService(self, comm, comm, pol, config, anchorPeerTracker) for _, bootPeer := range bootstrapPeers { bp := bootPeer discSvc.Connect(NetworkMember{Endpoint: bp, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) { @@ -1127,6 +1145,7 @@ func TestExpirationNoSecretEnvelope(t *testing.T) { aliveMembership: util.NewMembershipStore(), deadMembership: util.NewMembershipStore(), logger: logger, + anchorPeerTracker: &mockAnchorPeerTracker{[]string{}}, }) msg := &proto.GossipMessage{ @@ -1281,7 +1300,7 @@ func TestMsgStoreExpiration(t *testing.T) { return true } - waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(msgExpirationFactor+5)) + waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(DefMsgExpirationFactor+5)) assertMembership(t, instances[:len(instances)-2], nodeNum-3) @@ -1434,7 +1453,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { } // Sleep until expire - time.Sleep(defaultTestConfig.AliveExpirationTimeout * (msgExpirationFactor + 5)) + time.Sleep(defaultTestConfig.AliveExpirationTimeout * (DefMsgExpirationFactor + 5)) // Checking Alive expired for i := 0; i < peersNum; i++ { @@ -1676,7 +1695,7 @@ func TestPeerIsolation(t *testing.T) { // Sleep the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL) // Add a second as buffer - time.Sleep(config.AliveExpirationTimeout*msgExpirationFactor + time.Second) + time.Sleep(config.AliveExpirationTimeout*DefMsgExpirationFactor + time.Second) // Start again the first 2 peers and wait for all the peers to get full membership. // Especially, we want to test that peer2 won't be isolated @@ -1688,6 +1707,110 @@ func TestPeerIsolation(t *testing.T) { assertMembership(t, instances, peersNum-1) } +func TestMembershipAfterExpiration(t *testing.T) { + // Scenario: + // Start 3 peers (peer0, peer1, peer2). Set peer0 as the anchor peer. + // Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership + + config := defaultTestConfig + // Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test. + config.AliveExpirationTimeout = 2 * config.AliveTimeInterval + config.ReconnectInterval = config.AliveExpirationTimeout + config.MsgExpirationFactor = 5 + + peersNum := 3 + ports := []int{9120, 9121, 9122} + anchorPeer := "localhost:9120" + bootPeers := []string{} + instances := []*gossipInstance{} + var inst *gossipInstance + mockTracker := &mockAnchorPeerTracker{[]string{anchorPeer}} + + // use a custom logger to verify messages from expiration callback + expectedMsgs := []string{ + "Do not remove bootstrap or anchor peer endpoint localhost:9120 from membership", + "Removing member: Endpoint: localhost:9121", + } + numMsgsFound := 0 + l, err := zap.NewDevelopment() + assert.NoError(t, err) + expired := make(chan struct{}) + logger := flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error { + // do nothing if we already found all the expectedMsgs + if numMsgsFound == len(expectedMsgs) { + return nil + } + for _, msg := range expectedMsgs { + if strings.Contains(entry.Message, msg) { + numMsgsFound++ + if numMsgsFound == len(expectedMsgs) { + expired <- struct{}{} + } + break + } + } + return nil + })) + + // Start all peers, connect to the anchor peer and verify full membership + for i := 0; i < peersNum; i++ { + id := fmt.Sprintf("d%d", i) + inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *proto.SignedGossipMessage) {}, config, mockTracker) + instances = append(instances, inst) + } + instances[peersNum-1].Discovery.(*gossipDiscoveryImpl).logger = logger + for i := 1; i < peersNum; i++ { + connect(instances[i], anchorPeer) + } + assertMembership(t, instances, peersNum-1) + + // Stop peer0 and peer1 so that peer2 would stay alone + stopInstances(t, instances[0:peersNum-1]) + + // waitTime is the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL) + // Add a second as buffer + waitTime := config.AliveExpirationTimeout*time.Duration(config.MsgExpirationFactor) + time.Second + select { + case <-expired: + case <-time.After(waitTime): + t.Fatalf("timed out") + } + // peer2's deadMembership should contain the anchor peer + deadMemeberShip := instances[peersNum-1].discoveryImpl().deadMembership + assert.Equal(t, 1, deadMemeberShip.Size()) + assertMembership(t, instances[peersNum-1:], 0) + + // Start again peer0 and peer1 and wait for all the peers to get full membership. + // Especially, we want to test that peer2 won't be isolated + for i := 0; i < peersNum-1; i++ { + id := fmt.Sprintf("d%d", i) + inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *proto.SignedGossipMessage) {}, config, mockTracker) + instances[i] = inst + } + connect(instances[1], anchorPeer) + assertMembership(t, instances, peersNum-1) +} + +func connect(inst *gossipInstance, endpoint string) { + inst.comm.lock.Lock() + inst.comm.mock = &mock.Mock{} + inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) { + inst := inst + msg := arguments.Get(1).(*proto.SignedGossipMessage) + if req := msg.GetMemReq(); req != nil { + inst.comm.lock.Lock() + inst.comm.mock = nil + inst.comm.lock.Unlock() + } + }) + inst.comm.mock.On("Ping", mock.Anything) + inst.comm.lock.Unlock() + netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)} + inst.Connect(netMember2Connect2, func() (identification *PeerIdentification, err error) { + return &PeerIdentification{SelfOrg: true, ID: nil}, nil + }) +} + func waitUntilOrFail(t *testing.T, pred func() bool) { waitUntilTimeoutOrFail(t, pred, timeout) } diff --git a/gossip/gossip/gossip.go b/gossip/gossip/gossip.go index 4ccc0c4b486..40fb8ce9431 100644 --- a/gossip/gossip/gossip.go +++ b/gossip/gossip/gossip.go @@ -153,5 +153,6 @@ type Config struct { AliveExpirationTimeout time.Duration // Alive expiration timeout AliveExpirationCheckInterval time.Duration // Alive expiration check interval ReconnectInterval time.Duration // Reconnect interval - + MsgExpirationFactor int // MsgExpirationFactor is the expiration factor for alive message TTL + MaxConnectionAttempts int // MaxConnectionAttempts is the max number of attempts to connect to a peer (wait for alive ack) } diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 2f6c5dac535..2492c97f8dd 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -70,7 +70,8 @@ type gossipServiceImpl struct { // NewGossipService creates a gossip instance attached to a gRPC server func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor, mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, - secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics) Gossip { + secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics, + anchorPeerTracker discovery.AnchorPeerTracker) Gossip { var err error lgr := util.GetLogger(util.GossipLogger, conf.ID) @@ -125,10 +126,12 @@ func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor, AliveExpirationTimeout: conf.AliveExpirationTimeout, AliveExpirationCheckInterval: conf.AliveExpirationCheckInterval, ReconnectInterval: conf.ReconnectInterval, + MaxConnectionAttempts: conf.MaxConnectionAttempts, + MsgExpirationFactor: conf.MsgExpirationFactor, BootstrapPeers: conf.BootstrapPeers, } g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy, - discoveryConfig) + discoveryConfig, anchorPeerTracker) g.logger.Infof("Creating gossip service with self membership of %s", g.selfNetworkMember()) g.certPuller = g.createCertStorePuller() diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index f2a62293933..04e189ea8dc 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -62,7 +62,6 @@ var tests = []func(t *testing.T){ func init() { util.SetupTestLogging() rand.Seed(int64(time.Now().Second())) - discovery.SetMaxConnAttempts(5) for range tests { testWG.Add(1) } @@ -75,6 +74,8 @@ var discoveryConfig = discovery.DiscoveryConfig{ AliveExpirationTimeout: 10 * aliveTimeInterval, AliveExpirationCheckInterval: aliveTimeInterval, ReconnectInterval: aliveTimeInterval, + MaxConnectionAttempts: 5, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } var expirationTimes map[string]time.Time = map[string]time.Time{} @@ -263,10 +264,12 @@ func newGossipInstanceWithGrpcMcsMetrics(id int, port int, gRPCServer *corecomm. AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, - secureDialOpts, metrics) + secureDialOpts, metrics, nil) go func() { gRPCServer.Start() }() @@ -313,10 +316,12 @@ func newGossipInstanceWithGRPCWithOnlyPull(id int, port int, gRPCServer *corecom AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, - secureDialOpts, metrics) + secureDialOpts, metrics, nil) go func() { gRPCServer.Start() }() diff --git a/gossip/gossip/orgs_test.go b/gossip/gossip/orgs_test.go index c48a85c5df1..0252e15a2a2 100644 --- a/gossip/gossip/orgs_test.go +++ b/gossip/gossip/orgs_test.go @@ -125,10 +125,12 @@ func newGossipInstanceWithGRPCWithExternalEndpoint(id int, port int, gRPCServer AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := NewGossipService(conf, gRPCServer.Server(), mcs, mcs, selfID, - secureDialOpts, metrics.NewGossipMetrics(&disabled.Provider{})) + secureDialOpts, metrics.NewGossipMetrics(&disabled.Provider{}), nil) go func() { gRPCServer.Start() }() diff --git a/gossip/integration/integration.go b/gossip/integration/integration.go index cec08767f76..9ac301fb912 100644 --- a/gossip/integration/integration.go +++ b/gossip/integration/integration.go @@ -67,6 +67,8 @@ func newConfig(selfEndpoint string, externalEndpoint string, certs *common.TLSCe SendBuffSize: util.GetIntOrDefault("peer.gossip.sendBuffSize", comm.DefSendBuffSize), MsgExpirationTimeout: util.GetDurationOrDefault("peer.gossip.election.leaderAliveThreshold", election.DefLeaderAliveThreshold) * 10, AliveTimeInterval: util.GetDurationOrDefault("peer.gossip.aliveTimeInterval", discovery.DefAliveTimeInterval), + MaxConnectionAttempts: util.GetIntOrDefault("peer.gossip.maxConnectionAttempts", discovery.DefMaxConnectionAttempts), + MsgExpirationFactor: util.GetIntOrDefault("peer.gossip.msgExpirationFactor", discovery.DefMsgExpirationFactor), } conf.AliveExpirationTimeout = util.GetDurationOrDefault("peer.gossip.aliveExpirationTimeout", 5*conf.AliveTimeInterval) @@ -80,7 +82,7 @@ func newConfig(selfEndpoint string, externalEndpoint string, certs *common.TLSCe func NewGossipComponent(peerIdentity []byte, endpoint string, s *grpc.Server, secAdv api.SecurityAdvisor, cryptSvc api.MessageCryptoService, secureDialOpts api.PeerSecureDialOpts, certs *common.TLSCertificates, gossipMetrics *metrics.GossipMetrics, - bootPeers ...string) (gossip.Gossip, error) { + anchorPeerTracker discovery.AnchorPeerTracker, bootPeers ...string) (gossip.Gossip, error) { externalEndpoint := viper.GetString("peer.gossip.externalEndpoint") @@ -89,7 +91,7 @@ func NewGossipComponent(peerIdentity []byte, endpoint string, s *grpc.Server, return nil, errors.WithStack(err) } gossipInstance := gossip.NewGossipService(conf, s, secAdv, cryptSvc, - peerIdentity, secureDialOpts, gossipMetrics) + peerIdentity, secureDialOpts, gossipMetrics, anchorPeerTracker) return gossipInstance, nil } diff --git a/gossip/integration/integration_test.go b/gossip/integration/integration_test.go index 0847916c60e..456695e9960 100644 --- a/gossip/integration/integration_test.go +++ b/gossip/integration/integration_test.go @@ -20,7 +20,7 @@ import ( "github.com/hyperledger/fabric/gossip/metrics" "github.com/hyperledger/fabric/gossip/util" "github.com/hyperledger/fabric/msp/mgmt" - "github.com/hyperledger/fabric/msp/mgmt/testtools" + msptesttools "github.com/hyperledger/fabric/msp/mgmt/testtools" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -56,13 +56,13 @@ func TestNewGossipCryptoService(t *testing.T) { peerIdentity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize() gossipMetrics := metrics.NewGossipMetrics(&disabled.Provider{}) g1, err := NewGossipComponent(peerIdentity, endpoint1, s1, secAdv, cryptSvc, - defaultSecureDialOpts, nil, gossipMetrics) + defaultSecureDialOpts, nil, gossipMetrics, nil) assert.NoError(t, err) g2, err := NewGossipComponent(peerIdentity, endpoint2, s2, secAdv, cryptSvc, - defaultSecureDialOpts, nil, gossipMetrics, endpoint1) + defaultSecureDialOpts, nil, gossipMetrics, nil, endpoint1) assert.NoError(t, err) g3, err := NewGossipComponent(peerIdentity, endpoint3, s3, secAdv, cryptSvc, - defaultSecureDialOpts, nil, gossipMetrics, endpoint1) + defaultSecureDialOpts, nil, gossipMetrics, nil, endpoint1) assert.NoError(t, err) defer g1.Stop() defer g2.Stop() diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index eba46bd38df..63381a5d378 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -104,16 +104,17 @@ func (p privateHandler) close() { type gossipServiceImpl struct { gossipSvc - privateHandlers map[string]privateHandler - chains map[string]state.GossipStateProvider - leaderElection map[string]election.LeaderElectionService - deliveryService map[string]deliverclient.DeliverService - deliveryFactory DeliveryServiceFactory - lock sync.RWMutex - mcs api.MessageCryptoService - peerIdentity []byte - secAdv api.SecurityAdvisor - metrics *gossipMetrics.GossipMetrics + privateHandlers map[string]privateHandler + chains map[string]state.GossipStateProvider + leaderElection map[string]election.LeaderElectionService + deliveryService map[string]deliverclient.DeliverService + deliveryFactory DeliveryServiceFactory + lock sync.RWMutex + mcs api.MessageCryptoService + peerIdentity []byte + secAdv api.SecurityAdvisor + metrics *gossipMetrics.GossipMetrics + anchorPeerTracker *anchorPeerTracker } // This is an implementation of api.JoinChannelMessage. @@ -142,6 +143,33 @@ func (jcm *joinChannelMessage) AnchorPeersOf(org api.OrgIdentityType) []api.Anch return jcm.members2AnchorPeers[string(org)] } +// anchorPeerTracker maintains anchor peer endpoints for all the channels. +type anchorPeerTracker struct { + // allEndpoints contains anchor peer endpoints for all the channels, + // its key is channel name, value is map of anchor peer endpoints + allEndpoints map[string]map[string]struct{} + mutex sync.RWMutex +} + +// update overwrites the anchor peer endpoints for the channel +func (t *anchorPeerTracker) update(channelName string, endpoints map[string]struct{}) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.allEndpoints[channelName] = endpoints +} + +// IsAnchorPeer checks if an endpoint is an anchor peer in any channel +func (t *anchorPeerTracker) IsAnchorPeer(endpoint string) bool { + t.mutex.RLock() + defer t.mutex.RUnlock() + for _, endpointsForChannel := range t.allEndpoints { + if _, ok := endpointsForChannel[endpoint]; ok { + return true + } + } + return false +} + var logger = util.GetLogger(util.ServiceLogger, "") // InitGossipService initialize gossip service @@ -200,19 +228,22 @@ func InitGossipServiceCustomDeliveryFactory( gossipMetrics := gossipMetrics.NewGossipMetrics(metricsProvider) + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} + gossip, err = integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv, - mcs, secureDialOpts, certs, gossipMetrics, bootPeers...) + mcs, secureDialOpts, certs, gossipMetrics, anchorPeerTracker, bootPeers...) gossipServiceInstance = &gossipServiceImpl{ - mcs: mcs, - gossipSvc: gossip, - privateHandlers: make(map[string]privateHandler), - chains: make(map[string]state.GossipStateProvider), - leaderElection: make(map[string]election.LeaderElectionService), - deliveryService: make(map[string]deliverclient.DeliverService), - deliveryFactory: factory, - peerIdentity: peerIdentity, - secAdv: secAdv, - metrics: gossipMetrics, + mcs: mcs, + gossipSvc: gossip, + privateHandlers: make(map[string]privateHandler), + chains: make(map[string]state.GossipStateProvider), + leaderElection: make(map[string]election.LeaderElectionService), + deliveryService: make(map[string]deliverclient.DeliverService), + deliveryFactory: factory, + peerIdentity: peerIdentity, + secAdv: secAdv, + metrics: gossipMetrics, + anchorPeerTracker: anchorPeerTracker, } }) return errors.WithStack(err) @@ -391,6 +422,7 @@ func (g *gossipServiceImpl) updateAnchors(config Config) { return } jcm := &joinChannelMessage{seqNum: config.Sequence(), members2AnchorPeers: map[string][]api.AnchorPeer{}} + anchorPeerEndpoints := map[string]struct{}{} for _, appOrg := range config.ApplicationOrgs() { logger.Debug(appOrg.MSPID(), "anchor peers:", appOrg.AnchorPeers()) jcm.members2AnchorPeers[appOrg.MSPID()] = []api.AnchorPeer{} @@ -400,8 +432,10 @@ func (g *gossipServiceImpl) updateAnchors(config Config) { Port: int(ap.Port), } jcm.members2AnchorPeers[appOrg.MSPID()] = append(jcm.members2AnchorPeers[appOrg.MSPID()], anchorPeer) + anchorPeerEndpoints[fmt.Sprintf("%s:%d", ap.Host, ap.Port)] = struct{}{} } } + g.anchorPeerTracker.update(config.ChainID(), anchorPeerEndpoints) // Initialize new state provider for given committer logger.Debug("Creating state provider for chainID", config.ChainID()) diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index 52f0c4a0971..7a525b4503d 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -713,12 +713,14 @@ func newGossipInstance(port int, id int, gRPCServer *comm.GRPCServer, certs *gos AliveExpirationTimeout: discovery.DefAliveExpirationTimeout, AliveExpirationCheckInterval: discovery.DefAliveExpirationCheckInterval, ReconnectInterval: discovery.DefReconnectInterval, + MaxConnectionAttempts: discovery.DefMaxConnectionAttempts, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) cryptoService := &naiveCryptoService{} metrics := gossipMetrics.NewGossipMetrics(&disabled.Provider{}) gossip := gossip.NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, cryptoService, selfID, - secureDialOpts, metrics) + secureDialOpts, metrics, nil) go func() { gRPCServer.Start() }() @@ -859,8 +861,15 @@ func TestChannelConfig(t *testing.T) { grpcServer := grpc.NewServer() endpoint, socket := getAvailablePort(t) + // Because gossipServiceInstance is initialized only once, we have to use the same identity + // if it is already initialized + orgIdentity := orgInChannelA + if gossipServiceInstance != nil { + orgIdentity = gossipServiceInstance.peerIdentity + } + secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager()) - error := InitGossipService(api.PeerIdentityType("IDENTITY"), &disabled.Provider{}, endpoint, grpcServer, nil, + error := InitGossipService(orgIdentity, &disabled.Provider{}, endpoint, grpcServer, nil, &naiveCryptoService{}, secAdv, nil, false) assert.NoError(t, error) gService := GetGossipService().(*gossipServiceImpl) @@ -878,13 +887,18 @@ func TestChannelConfig(t *testing.T) { mc := &mockConfig{ sequence: 1, appOrgs: map[string]channelconfig.ApplicationOrg{ - string(orgInChannelA): &appGrp{ - mspID: string(orgInChannelA), - anchorPeers: []*peer.AnchorPeer{}, + string(orgIdentity): &appGrp{ + mspID: string(orgIdentity), + anchorPeers: []*peer.AnchorPeer{{Host: "localhost", Port: 2001}}, }, }, } + gService.JoinChan(jcm, gossipCommon.ChainID("A")) + // use mock secAdv so that gService.secAdv.OrgByPeerIdentity can return the matched identity + gService.secAdv = &secAdvMock{} gService.updateAnchors(mc) - assert.True(t, gService.amIinChannel(string(orgInChannelA), mc)) + assert.True(t, gService.amIinChannel(string(orgIdentity), mc)) + assert.True(t, gService.anchorPeerTracker.IsAnchorPeer("localhost:2001")) + assert.False(t, gService.anchorPeerTracker.IsAnchorPeer("localhost:5000")) } diff --git a/gossip/service/join_test.go b/gossip/service/join_test.go index 3fdcaec0d46..88772d7fbbe 100644 --- a/gossip/service/join_test.go +++ b/gossip/service/join_test.go @@ -170,7 +170,8 @@ func TestJoinChannelConfig(t *testing.T) { g1SvcMock.On("JoinChan", mock.Anything, mock.Anything).Run(func(_ mock.Arguments) { failChan <- struct{}{} }) - g1 := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("OrgMSP0"), gossipSvc: g1SvcMock} + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} + g1 := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("OrgMSP0"), gossipSvc: g1SvcMock, anchorPeerTracker: anchorPeerTracker} g1.updateAnchors(&configMock{ orgs2AppOrgs: map[string]channelconfig.ApplicationOrg{ "Org0": &appOrgMock{id: "Org0"}, @@ -187,7 +188,7 @@ func TestJoinChannelConfig(t *testing.T) { g2SvcMock.On("JoinChan", mock.Anything, mock.Anything).Run(func(_ mock.Arguments) { succChan <- struct{}{} }) - g2 := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: g2SvcMock} + g2 := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: g2SvcMock, anchorPeerTracker: anchorPeerTracker} g2.updateAnchors(&configMock{ orgs2AppOrgs: map[string]channelconfig.ApplicationOrg{ "Org0": &appOrgMock{id: "Org0"}, @@ -219,8 +220,8 @@ func TestJoinChannelNoAnchorPeers(t *testing.T) { assert.Equal(t, "A", string(channel)) }) - g := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: gMock} - + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} + g := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: gMock, anchorPeerTracker: anchorPeerTracker} appOrg0 := &appOrgMock{id: "Org0"} appOrg1 := &appOrgMock{id: "Org1"} diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index f7268df64f5..f17d2e082c7 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -399,12 +399,13 @@ func newPeerNodeWithGossipWithValidatorWithMetrics(id int, committer committer.C AliveExpirationTimeout: discovery.DefAliveExpirationTimeout, AliveExpirationCheckInterval: discovery.DefAliveExpirationCheckInterval, ReconnectInterval: discovery.DefReconnectInterval, + MaxConnectionAttempts: discovery.DefMaxConnectionAttempts, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } selfID := api.PeerIdentityType(config.InternalEndpoint) mcs := &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor} - g = gossip.NewGossipService(config, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, secureDialOpts, gossipMetrics) - + g = gossip.NewGossipService(config, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, secureDialOpts, gossipMetrics, nil) } g.JoinChan(&joinChanMsg{}, common.ChainID(util.GetTestChainID())) diff --git a/integration/gossip/gossip_suite_test.go b/integration/gossip/gossip_suite_test.go new file mode 100644 index 00000000000..a753d1fb8c1 --- /dev/null +++ b/integration/gossip/gossip_suite_test.go @@ -0,0 +1,41 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package gossip + +import ( + "encoding/json" + "testing" + + "github.com/hyperledger/fabric/integration/nwo" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestGossip(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Gossip Communication Suite") +} + +var components *nwo.Components + +var _ = SynchronizedBeforeSuite(func() []byte { + components = &nwo.Components{} + components.Build() + + payload, err := json.Marshal(components) + Expect(err).NotTo(HaveOccurred()) + + return payload +}, func(payload []byte) { + err := json.Unmarshal(payload, &components) + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = SynchronizedAfterSuite(func() { +}, func() { + components.Cleanup() +}) diff --git a/integration/gossip/gossip_test.go b/integration/gossip/gossip_test.go new file mode 100644 index 00000000000..7a9d4dccb4a --- /dev/null +++ b/integration/gossip/gossip_test.go @@ -0,0 +1,221 @@ +/* + * Copyright IBM Corp. All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package gossip + +import ( + "fmt" + "io/ioutil" + "os" + "syscall" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/hyperledger/fabric/integration/nwo" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gbytes" + "github.com/tedsuo/ifrit" + "github.com/tedsuo/ifrit/ginkgomon" +) + +var _ = Describe("Gossip Membership", func() { + var ( + testDir string + network *nwo.Network + nwprocs *networkProcesses + orderer *nwo.Orderer + peerEndpoints map[string]string = map[string]string{} + channelName string + ) + + BeforeEach(func() { + var err error + testDir, err = ioutil.TempDir("", "gossip-membership") + Expect(err).NotTo(HaveOccurred()) + + dockerClient, err := docker.NewClientFromEnv() + Expect(err).NotTo(HaveOccurred()) + + channelName = "testchannel" + network = nwo.New(nwo.BasicSolo(), testDir, dockerClient, 25000+1000*GinkgoParallelNode(), components) + network.GenerateConfigTree() + + // modify peer config + for _, peer := range network.Peers { + core := network.ReadPeerConfig(peer) + core.Peer.Gossip.AliveTimeInterval = 1 * time.Second + core.Peer.Gossip.AliveExpirationTimeout = 2 * core.Peer.Gossip.AliveTimeInterval + core.Peer.Gossip.ReconnectInterval = 2 * time.Second + core.Peer.Gossip.MsgExpirationFactor = 2 + core.Peer.Gossip.MaxConnectionAttempts = 10 + network.WritePeerConfig(peer, core) + peerEndpoints[peer.ID()] = core.Peer.Address + } + + network.Bootstrap() + orderer = network.Orderer("orderer") + nwprocs = &networkProcesses{ + network: network, + peerRunners: map[string]*ginkgomon.Runner{}, + peerProcesses: map[string]ifrit.Process{}, + ordererRunner: network.OrdererRunner(orderer), + } + nwprocs.ordererProcess = ifrit.Invoke(nwprocs.ordererRunner) + Eventually(nwprocs.ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed()) + }) + + AfterEach(func() { + if nwprocs != nil { + nwprocs.terminateAll() + } + if network != nil { + network.Cleanup() + } + os.RemoveAll(testDir) + }) + + It("updates membership when peers in the same org are stopped and restarted", func() { + peer0Org1 := network.Peer("Org1", "peer0") + peer1Org1 := network.Peer("Org1", "peer1") + + By("bringing up all peers") + startPeers(nwprocs, false, peer0Org1, peer1Org1) + + By("creating and joining a channel") + network.CreateChannel(channelName, orderer, peer0Org1) + network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1) + network.UpdateChannelAnchors(orderer, channelName) + + By("verifying non-anchor peer (peer1Org1) discovers all the peers before testing membership change on it") + Eventually(nwo.DiscoverPeers(network, peer1Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf( + network.DiscoveredPeer(peer0Org1), + network.DiscoveredPeer(peer1Org1), + )) + + By("verifying membership change on non-anchor peer (peer1Org1) when an anchor peer in the same org is stopped and restarted") + expectedMsgFromExpirationCallback := fmt.Sprintf("Do not remove bootstrap or anchor peer endpoint %s from membership", peerEndpoints[peer0Org1.ID()]) + assertPeerMembershipUpdate(network, peer1Org1, []*nwo.Peer{peer0Org1}, nwprocs, expectedMsgFromExpirationCallback) + + By("verifying anchor peer (peer0Org1) discovers all the peers before testing membership change on it") + Eventually(nwo.DiscoverPeers(network, peer0Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf( + network.DiscoveredPeer(peer0Org1), + network.DiscoveredPeer(peer1Org1), + )) + + By("verifying membership change on anchor peer (peer0Org1) when a non-anchor peer in the same org is stopped and restarted") + expectedMsgFromExpirationCallback = fmt.Sprintf("Removing member: Endpoint: %s", peerEndpoints[peer1Org1.ID()]) + assertPeerMembershipUpdate(network, peer0Org1, []*nwo.Peer{peer1Org1}, nwprocs, expectedMsgFromExpirationCallback) + }) + + It("updates peer membership when peers in another org are stopped and restarted", func() { + peer0Org1, peer1Org1 := network.Peer("Org1", "peer0"), network.Peer("Org1", "peer1") + peer0Org2, peer1Org2 := network.Peer("Org2", "peer0"), network.Peer("Org2", "peer1") + + By("bringing up all peers") + startPeers(nwprocs, false, peer0Org1, peer1Org1, peer0Org2, peer1Org2) + + By("creating and joining a channel") + network.CreateChannel(channelName, orderer, peer0Org1) + network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1, peer0Org2, peer1Org2) + network.UpdateChannelAnchors(orderer, channelName) + + By("verifying membership on peer1Org1 before testing membership change on it") + Eventually(nwo.DiscoverPeers(network, peer1Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf( + network.DiscoveredPeer(peer0Org1), + network.DiscoveredPeer(peer1Org1), + network.DiscoveredPeer(peer0Org2), + network.DiscoveredPeer(peer1Org2), + )) + + By("stopping anchor peer peer0Org1 to have only one peer in org1") + stopPeers(nwprocs, peer0Org1) + + By("verifying peer membership update when peers in another org are stopped and restarted") + expectedMsgFromExpirationCallback := fmt.Sprintf("Do not remove bootstrap or anchor peer endpoint %s from membership", peerEndpoints[peer0Org2.ID()]) + assertPeerMembershipUpdate(network, peer1Org1, []*nwo.Peer{peer0Org2, peer1Org2}, nwprocs, expectedMsgFromExpirationCallback) + }) +}) + +// networkProcesses holds references to the network, its runners, and processes. +type networkProcesses struct { + network *nwo.Network + + ordererRunner *ginkgomon.Runner + ordererProcess ifrit.Process + + peerRunners map[string]*ginkgomon.Runner + peerProcesses map[string]ifrit.Process +} + +func (n *networkProcesses) terminateAll() { + if n.ordererProcess != nil { + n.ordererProcess.Signal(syscall.SIGTERM) + Eventually(n.ordererProcess.Wait(), n.network.EventuallyTimeout).Should(Receive()) + } + for _, process := range n.peerProcesses { + process.Signal(syscall.SIGTERM) + Eventually(process.Wait(), n.network.EventuallyTimeout).Should(Receive()) + } +} + +func startPeers(n *networkProcesses, forceStateTransfer bool, peersToStart ...*nwo.Peer) { + env := []string{"FABRIC_LOGGING_SPEC=info:gossip.state=debug:gossip.discovery=debug"} + + // Setting CORE_PEER_GOSSIP_STATE_CHECKINTERVAL to 200ms (from default of 10s) will ensure that state transfer happens quickly, + // before blocks are gossipped through normal mechanisms + if forceStateTransfer { + env = append(env, "CORE_PEER_GOSSIP_STATE_CHECKINTERVAL=200ms") + } + + for _, peer := range peersToStart { + runner := n.network.PeerRunner(peer, env...) + process := ifrit.Invoke(runner) + Eventually(process.Ready(), n.network.EventuallyTimeout).Should(BeClosed()) + + n.peerProcesses[peer.ID()] = process + n.peerRunners[peer.ID()] = runner + } +} + +func stopPeers(n *networkProcesses, peersToStop ...*nwo.Peer) { + for _, peer := range peersToStop { + id := peer.ID() + proc := n.peerProcesses[id] + proc.Signal(syscall.SIGTERM) + Eventually(proc.Wait(), n.network.EventuallyTimeout).Should(Receive()) + delete(n.peerProcesses, id) + } +} + +// assertPeerMembershipUpdate stops and restart peersToRestart and verify peer membership +func assertPeerMembershipUpdate(network *nwo.Network, peer *nwo.Peer, peersToRestart []*nwo.Peer, nwprocs *networkProcesses, expectedMsgFromExpirationCallback string) { + stopPeers(nwprocs, peersToRestart...) + + // timeout is the same amount of time as it takes to remove a message from the aliveMsgStore, and add a second as buffer + core := network.ReadPeerConfig(peer) + timeout := core.Peer.Gossip.AliveExpirationTimeout*time.Duration(core.Peer.Gossip.MsgExpirationFactor) + time.Second + By("verifying peer membership after all other peers are stopped") + Eventually(nwo.DiscoverPeers(network, peer, "User1", "testchannel"), timeout, 100*time.Millisecond).Should(ConsistOf( + network.DiscoveredPeer(peer), + )) + + By("verifying expected log message from expiration callback") + runner := nwprocs.peerRunners[peer.ID()] + Eventually(runner.Err(), network.EventuallyTimeout).Should(gbytes.Say(expectedMsgFromExpirationCallback)) + + By("restarting peers") + startPeers(nwprocs, false, peersToRestart...) + + By("verifying peer membership, expected to discover restarted peers") + expectedPeers := make([]nwo.DiscoveredPeer, len(peersToRestart)+1) + expectedPeers[0] = network.DiscoveredPeer(peer) + for i, p := range peersToRestart { + expectedPeers[i+1] = network.DiscoveredPeer(p) + } + timeout = 3 * core.Peer.Gossip.ReconnectInterval + Eventually(nwo.DiscoverPeers(network, peer, "User1", "testchannel"), timeout, 100*time.Millisecond).Should(ConsistOf(expectedPeers)) +} diff --git a/integration/nwo/core_template.go b/integration/nwo/core_template.go index 498c07c5a7b..df36d812f3e 100644 --- a/integration/nwo/core_template.go +++ b/integration/nwo/core_template.go @@ -28,9 +28,10 @@ peer: timeout: 20s gossip: bootstrap: 127.0.0.1:{{ .PeerPort Peer "Listen" }} + endpoint: 127.0.0.1:{{ .PeerPort Peer "Listen" }} + externalEndpoint: 127.0.0.1:{{ .PeerPort Peer "Listen" }} useLeaderElection: true orgLeader: false - endpoint: maxBlockCountToStore: 100 maxPropagationBurstLatency: 10ms maxPropagationBurstSize: 10 @@ -52,7 +53,6 @@ peer: aliveTimeInterval: 5s aliveExpirationTimeout: 25s reconnectInterval: 25s - externalEndpoint: 127.0.0.1:{{ .PeerPort Peer "Listen" }} election: startupGracePeriod: 15s membershipSampleInterval: 1s diff --git a/integration/nwo/fabricconfig/core.go b/integration/nwo/fabricconfig/core.go index ad5a49f90d4..771bee135a4 100644 --- a/integration/nwo/fabricconfig/core.go +++ b/integration/nwo/fabricconfig/core.go @@ -91,6 +91,8 @@ type Gossip struct { AliveTimeInterval time.Duration `yaml:"aliveTimeInterval,omitempty"` AliveExpirationTimeout time.Duration `yaml:"aliveExpirationTimeout,omitempty"` ReconnectInterval time.Duration `yaml:"reconnectInterval,omitempty"` + MsgExpirationFactor int `yaml:"msgExpirationFactor,omitempty"` + MaxConnectionAttempts int `yaml:"maxConnectionAttempts,omitempty"` ExternalEndpoint string `yaml:"externalEndpoint,omitempty"` Election *GossipElection `yaml:"election,omitempty"` PvtData *GossipPvtData `yaml:"pvtData,omitempty"` diff --git a/integration/nwo/network.go b/integration/nwo/network.go index 2d6d572b037..7baada8887a 100644 --- a/integration/nwo/network.go +++ b/integration/nwo/network.go @@ -95,7 +95,7 @@ type OrdererCapabilities struct { } // Peer defines a peer instance, it's owning organization, and the list of -// channels that the peer shoudl be joined to. +// channels that the peer should be joined to. type Peer struct { Name string `yaml:"name,omitempty"` Organization string `yaml:"organization,omitempty"` @@ -1025,11 +1025,12 @@ func (n *Network) OrdererGroupRunner() ifrit.Runner { // PeerRunner returns an ifrit.Runner for the specified peer. The runner can be // used to start and manage a peer process. -func (n *Network) PeerRunner(p *Peer) *ginkgomon.Runner { +func (n *Network) PeerRunner(p *Peer, env ...string) *ginkgomon.Runner { cmd := n.peerCommand( commands.NodeStart{PeerID: p.ID()}, fmt.Sprintf("FABRIC_CFG_PATH=%s", n.PeerDir(p)), ) + cmd.Env = append(cmd.Env, env...) return ginkgomon.New(ginkgomon.Config{ AnsiColorCode: n.nextColor(), diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index d34543da995..44f79b11b00 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -153,6 +153,10 @@ peer: aliveExpirationTimeout: 25s # Reconnect interval(unit: second) reconnectInterval: 25s + # Max number of attempts to connect to a peer + maxConnectionAttempts: 120 + # Message expiration factor for alive messages + msgExpirationFactor: 20 # This is an endpoint that is published to peers outside of the organization. # If this isn't set, the peer will not be known to other organizations. externalEndpoint: