diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index 7f4faff4982..9e977e6c86c 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -71,6 +71,11 @@ type CommService interface { IdentitySwitch() <-chan common.PKIidType } +// AnchorPeerTracker is an interface that is passed to discovery to check if an endpoint is an anchor peer +type AnchorPeerTracker interface { + IsAnchorPeer(endpoint string) bool +} + // NetworkMember is a peer's representation type NetworkMember struct { Endpoint string diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 3e4bb37da55..4a457945ac6 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -27,15 +27,8 @@ const DefAliveTimeInterval = 5 * time.Second const DefAliveExpirationTimeout = 5 * DefAliveTimeInterval const DefAliveExpirationCheckInterval = DefAliveExpirationTimeout / 10 const DefReconnectInterval = DefAliveExpirationTimeout -const msgExpirationFactor = 20 - -var maxConnectionAttempts = 120 - -// SetMaxConnAttempts sets the maximum number of connection -// attempts the peer would perform when invoking Connect() -func SetMaxConnAttempts(attempts int) { - maxConnectionAttempts = attempts -} +const DefMsgExpirationFactor = 20 +const DefMaxConnectionAttempts = 120 type timestamp struct { incTime time.Time @@ -75,8 +68,11 @@ type gossipDiscoveryImpl struct { aliveExpirationTimeout time.Duration aliveExpirationCheckInterval time.Duration reconnectInterval time.Duration + msgExpirationFactor int + maxConnectionAttempts int - bootstrapPeers []string + bootstrapPeers []string + anchorPeerTracker AnchorPeerTracker } type DiscoveryConfig struct { @@ -84,12 +80,14 @@ type DiscoveryConfig struct { AliveExpirationTimeout time.Duration AliveExpirationCheckInterval time.Duration ReconnectInterval time.Duration + MaxConnectionAttempts int + MsgExpirationFactor int BootstrapPeers []string } // NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy, - config DiscoveryConfig) Discovery { + config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) Discovery { d := &gossipDiscoveryImpl{ self: self, incTime: uint64(time.Now().UnixNano()), @@ -112,8 +110,11 @@ func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoServi aliveExpirationTimeout: config.AliveExpirationTimeout, aliveExpirationCheckInterval: config.AliveExpirationCheckInterval, reconnectInterval: config.ReconnectInterval, + maxConnectionAttempts: config.MaxConnectionAttempts, + msgExpirationFactor: config.MsgExpirationFactor, - bootstrapPeers: config.BootstrapPeers, + bootstrapPeers: config.BootstrapPeers, + anchorPeerTracker: anchorPeerTracker, } d.validateSelfConfig() @@ -149,7 +150,7 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) { d.logger.Debug("Entering", member) defer d.logger.Debug("Exiting") go func() { - for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ { + for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ { id, err := id() if err != nil { if d.toDie() { @@ -216,7 +217,7 @@ func (d *gossipDiscoveryImpl) validateSelfConfig() { func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *proto.SignedGossipMessage) { nonce := message.Nonce - for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ { + for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ { sub := d.pubsub.Subscribe(fmt.Sprintf("%d", nonce), time.Second*5) d.comm.SendToPeer(peer, message) if _, timeoutErr := sub.Listen(); timeoutErr == nil { @@ -1032,7 +1033,7 @@ type aliveMsgStore struct { func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore { policy := proto.NewGossipMessageComparator(0) trigger := func(m interface{}) {} - aliveMsgTTL := d.aliveExpirationTimeout * msgExpirationFactor + aliveMsgTTL := d.aliveExpirationTimeout * time.Duration(d.msgExpirationFactor) externalLock := func() { d.lock.Lock() } externalUnlock := func() { d.lock.Unlock() } callback := func(m interface{}) { @@ -1044,8 +1045,10 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore { id := membership.PkiId endpoint := membership.Endpoint internalEndpoint := msg.SecretEnvelope.InternalEndpoint() - if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) { - // Never remove a bootstrap peer + if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) || + d.anchorPeerTracker.IsAnchorPeer(endpoint) || d.anchorPeerTracker.IsAnchorPeer(internalEndpoint) { + // Never remove a bootstrap peer or an anchor peer + d.logger.Debugf("Do not remove bootstrap or anchor peer endpoint %s from membership", endpoint) return } d.logger.Infof("Removing member: Endpoint: %s, InternalEndpoint: %s, PKIID: %x", endpoint, internalEndpoint, id) diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index ca3dd5a1da5..d2f2d6bdf11 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -43,11 +43,13 @@ var defaultTestConfig = DiscoveryConfig{ AliveExpirationTimeout: 10 * aliveTimeInterval, AliveExpirationCheckInterval: aliveTimeInterval, ReconnectInterval: 10 * aliveTimeInterval, + MaxConnectionAttempts: DefMaxConnectionAttempts, + MsgExpirationFactor: DefMsgExpirationFactor, } func init() { util.SetupTestLogging() - maxConnectionAttempts = 10000 + defaultTestConfig.MaxConnectionAttempts = 10000 } type dummyReceivedMessage struct { @@ -75,6 +77,15 @@ func (*dummyReceivedMessage) Ack(err error) { panic("implement me") } +// mockAnchorPeerTracker implements AnchorPeerTracker interface +type mockAnchorPeerTracker struct { + apEndpoints []string +} + +func (m *mockAnchorPeerTracker) IsAnchorPeer(endpoint string) bool { + return util.Contains(endpoint, m.apEndpoints) +} + type dummyCommModule struct { validatedMessages chan *proto.SignedGossipMessage msgsReceived uint32 @@ -377,6 +388,12 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st } func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*proto.SignedGossipMessage), config DiscoveryConfig) *gossipInstance { + mockTracker := &mockAnchorPeerTracker{} + return createDiscoveryInstanceWithAnchorPeerTracker(port, id, bootstrapPeers, shouldGossip, pol, f, config, mockTracker) +} + +func createDiscoveryInstanceWithAnchorPeerTracker(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, + f func(*proto.SignedGossipMessage), config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) *gossipInstance { comm := &dummyCommModule{ conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), @@ -407,7 +424,8 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo s := grpc.NewServer() config.BootstrapPeers = bootstrapPeers - discSvc := NewDiscoveryService(self, comm, comm, pol, config) + + discSvc := NewDiscoveryService(self, comm, comm, pol, config, anchorPeerTracker) for _, bootPeer := range bootstrapPeers { bp := bootPeer discSvc.Connect(NetworkMember{Endpoint: bp, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) { @@ -1127,6 +1145,7 @@ func TestExpirationNoSecretEnvelope(t *testing.T) { aliveMembership: util.NewMembershipStore(), deadMembership: util.NewMembershipStore(), logger: logger, + anchorPeerTracker: &mockAnchorPeerTracker{[]string{}}, }) msg := &proto.GossipMessage{ @@ -1281,7 +1300,7 @@ func TestMsgStoreExpiration(t *testing.T) { return true } - waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(msgExpirationFactor+5)) + waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(DefMsgExpirationFactor+5)) assertMembership(t, instances[:len(instances)-2], nodeNum-3) @@ -1434,7 +1453,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { } // Sleep until expire - time.Sleep(defaultTestConfig.AliveExpirationTimeout * (msgExpirationFactor + 5)) + time.Sleep(defaultTestConfig.AliveExpirationTimeout * (DefMsgExpirationFactor + 5)) // Checking Alive expired for i := 0; i < peersNum; i++ { @@ -1676,7 +1695,7 @@ func TestPeerIsolation(t *testing.T) { // Sleep the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL) // Add a second as buffer - time.Sleep(config.AliveExpirationTimeout*msgExpirationFactor + time.Second) + time.Sleep(config.AliveExpirationTimeout*DefMsgExpirationFactor + time.Second) // Start again the first 2 peers and wait for all the peers to get full membership. // Especially, we want to test that peer2 won't be isolated @@ -1688,6 +1707,110 @@ func TestPeerIsolation(t *testing.T) { assertMembership(t, instances, peersNum-1) } +func TestMembershipAfterExpiration(t *testing.T) { + // Scenario: + // Start 3 peers (peer0, peer1, peer2). Set peer0 as the anchor peer. + // Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership + + config := defaultTestConfig + // Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test. + config.AliveExpirationTimeout = 2 * config.AliveTimeInterval + config.ReconnectInterval = config.AliveExpirationTimeout + config.MsgExpirationFactor = 5 + + peersNum := 3 + ports := []int{9120, 9121, 9122} + anchorPeer := "localhost:9120" + bootPeers := []string{} + instances := []*gossipInstance{} + var inst *gossipInstance + mockTracker := &mockAnchorPeerTracker{[]string{anchorPeer}} + + // use a custom logger to verify messages from expiration callback + expectedMsgs := []string{ + "Do not remove bootstrap or anchor peer endpoint localhost:9120 from membership", + "Removing member: Endpoint: localhost:9121", + } + numMsgsFound := 0 + l, err := zap.NewDevelopment() + assert.NoError(t, err) + expired := make(chan struct{}) + logger := flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error { + // do nothing if we already found all the expectedMsgs + if numMsgsFound == len(expectedMsgs) { + return nil + } + for _, msg := range expectedMsgs { + if strings.Contains(entry.Message, msg) { + numMsgsFound++ + if numMsgsFound == len(expectedMsgs) { + expired <- struct{}{} + } + break + } + } + return nil + })) + + // Start all peers, connect to the anchor peer and verify full membership + for i := 0; i < peersNum; i++ { + id := fmt.Sprintf("d%d", i) + inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *proto.SignedGossipMessage) {}, config, mockTracker) + instances = append(instances, inst) + } + instances[peersNum-1].Discovery.(*gossipDiscoveryImpl).logger = logger + for i := 1; i < peersNum; i++ { + connect(instances[i], anchorPeer) + } + assertMembership(t, instances, peersNum-1) + + // Stop peer0 and peer1 so that peer2 would stay alone + stopInstances(t, instances[0:peersNum-1]) + + // waitTime is the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL) + // Add a second as buffer + waitTime := config.AliveExpirationTimeout*time.Duration(config.MsgExpirationFactor) + time.Second + select { + case <-expired: + case <-time.After(waitTime): + t.Fatalf("timed out") + } + // peer2's deadMembership should contain the anchor peer + deadMemeberShip := instances[peersNum-1].discoveryImpl().deadMembership + assert.Equal(t, 1, deadMemeberShip.Size()) + assertMembership(t, instances[peersNum-1:], 0) + + // Start again peer0 and peer1 and wait for all the peers to get full membership. + // Especially, we want to test that peer2 won't be isolated + for i := 0; i < peersNum-1; i++ { + id := fmt.Sprintf("d%d", i) + inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *proto.SignedGossipMessage) {}, config, mockTracker) + instances[i] = inst + } + connect(instances[1], anchorPeer) + assertMembership(t, instances, peersNum-1) +} + +func connect(inst *gossipInstance, endpoint string) { + inst.comm.lock.Lock() + inst.comm.mock = &mock.Mock{} + inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) { + inst := inst + msg := arguments.Get(1).(*proto.SignedGossipMessage) + if req := msg.GetMemReq(); req != nil { + inst.comm.lock.Lock() + inst.comm.mock = nil + inst.comm.lock.Unlock() + } + }) + inst.comm.mock.On("Ping", mock.Anything) + inst.comm.lock.Unlock() + netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)} + inst.Connect(netMember2Connect2, func() (identification *PeerIdentification, err error) { + return &PeerIdentification{SelfOrg: true, ID: nil}, nil + }) +} + func waitUntilOrFail(t *testing.T, pred func() bool) { waitUntilTimeoutOrFail(t, pred, timeout) } diff --git a/gossip/gossip/gossip.go b/gossip/gossip/gossip.go index 4ccc0c4b486..40fb8ce9431 100644 --- a/gossip/gossip/gossip.go +++ b/gossip/gossip/gossip.go @@ -153,5 +153,6 @@ type Config struct { AliveExpirationTimeout time.Duration // Alive expiration timeout AliveExpirationCheckInterval time.Duration // Alive expiration check interval ReconnectInterval time.Duration // Reconnect interval - + MsgExpirationFactor int // MsgExpirationFactor is the expiration factor for alive message TTL + MaxConnectionAttempts int // MaxConnectionAttempts is the max number of attempts to connect to a peer (wait for alive ack) } diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 2f6c5dac535..2492c97f8dd 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -70,7 +70,8 @@ type gossipServiceImpl struct { // NewGossipService creates a gossip instance attached to a gRPC server func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor, mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, - secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics) Gossip { + secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics, + anchorPeerTracker discovery.AnchorPeerTracker) Gossip { var err error lgr := util.GetLogger(util.GossipLogger, conf.ID) @@ -125,10 +126,12 @@ func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor, AliveExpirationTimeout: conf.AliveExpirationTimeout, AliveExpirationCheckInterval: conf.AliveExpirationCheckInterval, ReconnectInterval: conf.ReconnectInterval, + MaxConnectionAttempts: conf.MaxConnectionAttempts, + MsgExpirationFactor: conf.MsgExpirationFactor, BootstrapPeers: conf.BootstrapPeers, } g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy, - discoveryConfig) + discoveryConfig, anchorPeerTracker) g.logger.Infof("Creating gossip service with self membership of %s", g.selfNetworkMember()) g.certPuller = g.createCertStorePuller() diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index f2a62293933..04e189ea8dc 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -62,7 +62,6 @@ var tests = []func(t *testing.T){ func init() { util.SetupTestLogging() rand.Seed(int64(time.Now().Second())) - discovery.SetMaxConnAttempts(5) for range tests { testWG.Add(1) } @@ -75,6 +74,8 @@ var discoveryConfig = discovery.DiscoveryConfig{ AliveExpirationTimeout: 10 * aliveTimeInterval, AliveExpirationCheckInterval: aliveTimeInterval, ReconnectInterval: aliveTimeInterval, + MaxConnectionAttempts: 5, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } var expirationTimes map[string]time.Time = map[string]time.Time{} @@ -263,10 +264,12 @@ func newGossipInstanceWithGrpcMcsMetrics(id int, port int, gRPCServer *corecomm. AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, - secureDialOpts, metrics) + secureDialOpts, metrics, nil) go func() { gRPCServer.Start() }() @@ -313,10 +316,12 @@ func newGossipInstanceWithGRPCWithOnlyPull(id int, port int, gRPCServer *corecom AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, - secureDialOpts, metrics) + secureDialOpts, metrics, nil) go func() { gRPCServer.Start() }() diff --git a/gossip/gossip/orgs_test.go b/gossip/gossip/orgs_test.go index c48a85c5df1..0252e15a2a2 100644 --- a/gossip/gossip/orgs_test.go +++ b/gossip/gossip/orgs_test.go @@ -125,10 +125,12 @@ func newGossipInstanceWithGRPCWithExternalEndpoint(id int, port int, gRPCServer AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := NewGossipService(conf, gRPCServer.Server(), mcs, mcs, selfID, - secureDialOpts, metrics.NewGossipMetrics(&disabled.Provider{})) + secureDialOpts, metrics.NewGossipMetrics(&disabled.Provider{}), nil) go func() { gRPCServer.Start() }() diff --git a/gossip/integration/integration.go b/gossip/integration/integration.go index cec08767f76..9ac301fb912 100644 --- a/gossip/integration/integration.go +++ b/gossip/integration/integration.go @@ -67,6 +67,8 @@ func newConfig(selfEndpoint string, externalEndpoint string, certs *common.TLSCe SendBuffSize: util.GetIntOrDefault("peer.gossip.sendBuffSize", comm.DefSendBuffSize), MsgExpirationTimeout: util.GetDurationOrDefault("peer.gossip.election.leaderAliveThreshold", election.DefLeaderAliveThreshold) * 10, AliveTimeInterval: util.GetDurationOrDefault("peer.gossip.aliveTimeInterval", discovery.DefAliveTimeInterval), + MaxConnectionAttempts: util.GetIntOrDefault("peer.gossip.maxConnectionAttempts", discovery.DefMaxConnectionAttempts), + MsgExpirationFactor: util.GetIntOrDefault("peer.gossip.msgExpirationFactor", discovery.DefMsgExpirationFactor), } conf.AliveExpirationTimeout = util.GetDurationOrDefault("peer.gossip.aliveExpirationTimeout", 5*conf.AliveTimeInterval) @@ -80,7 +82,7 @@ func newConfig(selfEndpoint string, externalEndpoint string, certs *common.TLSCe func NewGossipComponent(peerIdentity []byte, endpoint string, s *grpc.Server, secAdv api.SecurityAdvisor, cryptSvc api.MessageCryptoService, secureDialOpts api.PeerSecureDialOpts, certs *common.TLSCertificates, gossipMetrics *metrics.GossipMetrics, - bootPeers ...string) (gossip.Gossip, error) { + anchorPeerTracker discovery.AnchorPeerTracker, bootPeers ...string) (gossip.Gossip, error) { externalEndpoint := viper.GetString("peer.gossip.externalEndpoint") @@ -89,7 +91,7 @@ func NewGossipComponent(peerIdentity []byte, endpoint string, s *grpc.Server, return nil, errors.WithStack(err) } gossipInstance := gossip.NewGossipService(conf, s, secAdv, cryptSvc, - peerIdentity, secureDialOpts, gossipMetrics) + peerIdentity, secureDialOpts, gossipMetrics, anchorPeerTracker) return gossipInstance, nil } diff --git a/gossip/integration/integration_test.go b/gossip/integration/integration_test.go index 0847916c60e..456695e9960 100644 --- a/gossip/integration/integration_test.go +++ b/gossip/integration/integration_test.go @@ -20,7 +20,7 @@ import ( "github.com/hyperledger/fabric/gossip/metrics" "github.com/hyperledger/fabric/gossip/util" "github.com/hyperledger/fabric/msp/mgmt" - "github.com/hyperledger/fabric/msp/mgmt/testtools" + msptesttools "github.com/hyperledger/fabric/msp/mgmt/testtools" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -56,13 +56,13 @@ func TestNewGossipCryptoService(t *testing.T) { peerIdentity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize() gossipMetrics := metrics.NewGossipMetrics(&disabled.Provider{}) g1, err := NewGossipComponent(peerIdentity, endpoint1, s1, secAdv, cryptSvc, - defaultSecureDialOpts, nil, gossipMetrics) + defaultSecureDialOpts, nil, gossipMetrics, nil) assert.NoError(t, err) g2, err := NewGossipComponent(peerIdentity, endpoint2, s2, secAdv, cryptSvc, - defaultSecureDialOpts, nil, gossipMetrics, endpoint1) + defaultSecureDialOpts, nil, gossipMetrics, nil, endpoint1) assert.NoError(t, err) g3, err := NewGossipComponent(peerIdentity, endpoint3, s3, secAdv, cryptSvc, - defaultSecureDialOpts, nil, gossipMetrics, endpoint1) + defaultSecureDialOpts, nil, gossipMetrics, nil, endpoint1) assert.NoError(t, err) defer g1.Stop() defer g2.Stop() diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index eba46bd38df..63381a5d378 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -104,16 +104,17 @@ func (p privateHandler) close() { type gossipServiceImpl struct { gossipSvc - privateHandlers map[string]privateHandler - chains map[string]state.GossipStateProvider - leaderElection map[string]election.LeaderElectionService - deliveryService map[string]deliverclient.DeliverService - deliveryFactory DeliveryServiceFactory - lock sync.RWMutex - mcs api.MessageCryptoService - peerIdentity []byte - secAdv api.SecurityAdvisor - metrics *gossipMetrics.GossipMetrics + privateHandlers map[string]privateHandler + chains map[string]state.GossipStateProvider + leaderElection map[string]election.LeaderElectionService + deliveryService map[string]deliverclient.DeliverService + deliveryFactory DeliveryServiceFactory + lock sync.RWMutex + mcs api.MessageCryptoService + peerIdentity []byte + secAdv api.SecurityAdvisor + metrics *gossipMetrics.GossipMetrics + anchorPeerTracker *anchorPeerTracker } // This is an implementation of api.JoinChannelMessage. @@ -142,6 +143,33 @@ func (jcm *joinChannelMessage) AnchorPeersOf(org api.OrgIdentityType) []api.Anch return jcm.members2AnchorPeers[string(org)] } +// anchorPeerTracker maintains anchor peer endpoints for all the channels. +type anchorPeerTracker struct { + // allEndpoints contains anchor peer endpoints for all the channels, + // its key is channel name, value is map of anchor peer endpoints + allEndpoints map[string]map[string]struct{} + mutex sync.RWMutex +} + +// update overwrites the anchor peer endpoints for the channel +func (t *anchorPeerTracker) update(channelName string, endpoints map[string]struct{}) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.allEndpoints[channelName] = endpoints +} + +// IsAnchorPeer checks if an endpoint is an anchor peer in any channel +func (t *anchorPeerTracker) IsAnchorPeer(endpoint string) bool { + t.mutex.RLock() + defer t.mutex.RUnlock() + for _, endpointsForChannel := range t.allEndpoints { + if _, ok := endpointsForChannel[endpoint]; ok { + return true + } + } + return false +} + var logger = util.GetLogger(util.ServiceLogger, "") // InitGossipService initialize gossip service @@ -200,19 +228,22 @@ func InitGossipServiceCustomDeliveryFactory( gossipMetrics := gossipMetrics.NewGossipMetrics(metricsProvider) + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} + gossip, err = integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv, - mcs, secureDialOpts, certs, gossipMetrics, bootPeers...) + mcs, secureDialOpts, certs, gossipMetrics, anchorPeerTracker, bootPeers...) gossipServiceInstance = &gossipServiceImpl{ - mcs: mcs, - gossipSvc: gossip, - privateHandlers: make(map[string]privateHandler), - chains: make(map[string]state.GossipStateProvider), - leaderElection: make(map[string]election.LeaderElectionService), - deliveryService: make(map[string]deliverclient.DeliverService), - deliveryFactory: factory, - peerIdentity: peerIdentity, - secAdv: secAdv, - metrics: gossipMetrics, + mcs: mcs, + gossipSvc: gossip, + privateHandlers: make(map[string]privateHandler), + chains: make(map[string]state.GossipStateProvider), + leaderElection: make(map[string]election.LeaderElectionService), + deliveryService: make(map[string]deliverclient.DeliverService), + deliveryFactory: factory, + peerIdentity: peerIdentity, + secAdv: secAdv, + metrics: gossipMetrics, + anchorPeerTracker: anchorPeerTracker, } }) return errors.WithStack(err) @@ -391,6 +422,7 @@ func (g *gossipServiceImpl) updateAnchors(config Config) { return } jcm := &joinChannelMessage{seqNum: config.Sequence(), members2AnchorPeers: map[string][]api.AnchorPeer{}} + anchorPeerEndpoints := map[string]struct{}{} for _, appOrg := range config.ApplicationOrgs() { logger.Debug(appOrg.MSPID(), "anchor peers:", appOrg.AnchorPeers()) jcm.members2AnchorPeers[appOrg.MSPID()] = []api.AnchorPeer{} @@ -400,8 +432,10 @@ func (g *gossipServiceImpl) updateAnchors(config Config) { Port: int(ap.Port), } jcm.members2AnchorPeers[appOrg.MSPID()] = append(jcm.members2AnchorPeers[appOrg.MSPID()], anchorPeer) + anchorPeerEndpoints[fmt.Sprintf("%s:%d", ap.Host, ap.Port)] = struct{}{} } } + g.anchorPeerTracker.update(config.ChainID(), anchorPeerEndpoints) // Initialize new state provider for given committer logger.Debug("Creating state provider for chainID", config.ChainID()) diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index 52f0c4a0971..7a525b4503d 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -713,12 +713,14 @@ func newGossipInstance(port int, id int, gRPCServer *comm.GRPCServer, certs *gos AliveExpirationTimeout: discovery.DefAliveExpirationTimeout, AliveExpirationCheckInterval: discovery.DefAliveExpirationCheckInterval, ReconnectInterval: discovery.DefReconnectInterval, + MaxConnectionAttempts: discovery.DefMaxConnectionAttempts, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) cryptoService := &naiveCryptoService{} metrics := gossipMetrics.NewGossipMetrics(&disabled.Provider{}) gossip := gossip.NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, cryptoService, selfID, - secureDialOpts, metrics) + secureDialOpts, metrics, nil) go func() { gRPCServer.Start() }() @@ -859,8 +861,15 @@ func TestChannelConfig(t *testing.T) { grpcServer := grpc.NewServer() endpoint, socket := getAvailablePort(t) + // Because gossipServiceInstance is initialized only once, we have to use the same identity + // if it is already initialized + orgIdentity := orgInChannelA + if gossipServiceInstance != nil { + orgIdentity = gossipServiceInstance.peerIdentity + } + secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager()) - error := InitGossipService(api.PeerIdentityType("IDENTITY"), &disabled.Provider{}, endpoint, grpcServer, nil, + error := InitGossipService(orgIdentity, &disabled.Provider{}, endpoint, grpcServer, nil, &naiveCryptoService{}, secAdv, nil, false) assert.NoError(t, error) gService := GetGossipService().(*gossipServiceImpl) @@ -878,13 +887,18 @@ func TestChannelConfig(t *testing.T) { mc := &mockConfig{ sequence: 1, appOrgs: map[string]channelconfig.ApplicationOrg{ - string(orgInChannelA): &appGrp{ - mspID: string(orgInChannelA), - anchorPeers: []*peer.AnchorPeer{}, + string(orgIdentity): &appGrp{ + mspID: string(orgIdentity), + anchorPeers: []*peer.AnchorPeer{{Host: "localhost", Port: 2001}}, }, }, } + gService.JoinChan(jcm, gossipCommon.ChainID("A")) + // use mock secAdv so that gService.secAdv.OrgByPeerIdentity can return the matched identity + gService.secAdv = &secAdvMock{} gService.updateAnchors(mc) - assert.True(t, gService.amIinChannel(string(orgInChannelA), mc)) + assert.True(t, gService.amIinChannel(string(orgIdentity), mc)) + assert.True(t, gService.anchorPeerTracker.IsAnchorPeer("localhost:2001")) + assert.False(t, gService.anchorPeerTracker.IsAnchorPeer("localhost:5000")) } diff --git a/gossip/service/join_test.go b/gossip/service/join_test.go index 3fdcaec0d46..88772d7fbbe 100644 --- a/gossip/service/join_test.go +++ b/gossip/service/join_test.go @@ -170,7 +170,8 @@ func TestJoinChannelConfig(t *testing.T) { g1SvcMock.On("JoinChan", mock.Anything, mock.Anything).Run(func(_ mock.Arguments) { failChan <- struct{}{} }) - g1 := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("OrgMSP0"), gossipSvc: g1SvcMock} + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} + g1 := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("OrgMSP0"), gossipSvc: g1SvcMock, anchorPeerTracker: anchorPeerTracker} g1.updateAnchors(&configMock{ orgs2AppOrgs: map[string]channelconfig.ApplicationOrg{ "Org0": &appOrgMock{id: "Org0"}, @@ -187,7 +188,7 @@ func TestJoinChannelConfig(t *testing.T) { g2SvcMock.On("JoinChan", mock.Anything, mock.Anything).Run(func(_ mock.Arguments) { succChan <- struct{}{} }) - g2 := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: g2SvcMock} + g2 := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: g2SvcMock, anchorPeerTracker: anchorPeerTracker} g2.updateAnchors(&configMock{ orgs2AppOrgs: map[string]channelconfig.ApplicationOrg{ "Org0": &appOrgMock{id: "Org0"}, @@ -219,8 +220,8 @@ func TestJoinChannelNoAnchorPeers(t *testing.T) { assert.Equal(t, "A", string(channel)) }) - g := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: gMock} - + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} + g := &gossipServiceImpl{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: gMock, anchorPeerTracker: anchorPeerTracker} appOrg0 := &appOrgMock{id: "Org0"} appOrg1 := &appOrgMock{id: "Org1"} diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index f7268df64f5..f17d2e082c7 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -399,12 +399,13 @@ func newPeerNodeWithGossipWithValidatorWithMetrics(id int, committer committer.C AliveExpirationTimeout: discovery.DefAliveExpirationTimeout, AliveExpirationCheckInterval: discovery.DefAliveExpirationCheckInterval, ReconnectInterval: discovery.DefReconnectInterval, + MaxConnectionAttempts: discovery.DefMaxConnectionAttempts, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } selfID := api.PeerIdentityType(config.InternalEndpoint) mcs := &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor} - g = gossip.NewGossipService(config, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, secureDialOpts, gossipMetrics) - + g = gossip.NewGossipService(config, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, secureDialOpts, gossipMetrics, nil) } g.JoinChan(&joinChanMsg{}, common.ChainID(util.GetTestChainID())) diff --git a/integration/gossip/gossip_suite_test.go b/integration/gossip/gossip_suite_test.go new file mode 100644 index 00000000000..a753d1fb8c1 --- /dev/null +++ b/integration/gossip/gossip_suite_test.go @@ -0,0 +1,41 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package gossip + +import ( + "encoding/json" + "testing" + + "github.com/hyperledger/fabric/integration/nwo" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestGossip(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Gossip Communication Suite") +} + +var components *nwo.Components + +var _ = SynchronizedBeforeSuite(func() []byte { + components = &nwo.Components{} + components.Build() + + payload, err := json.Marshal(components) + Expect(err).NotTo(HaveOccurred()) + + return payload +}, func(payload []byte) { + err := json.Unmarshal(payload, &components) + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = SynchronizedAfterSuite(func() { +}, func() { + components.Cleanup() +}) diff --git a/integration/gossip/gossip_test.go b/integration/gossip/gossip_test.go new file mode 100644 index 00000000000..7a9d4dccb4a --- /dev/null +++ b/integration/gossip/gossip_test.go @@ -0,0 +1,221 @@ +/* + * Copyright IBM Corp. All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package gossip + +import ( + "fmt" + "io/ioutil" + "os" + "syscall" + "time" + + docker "github.com/fsouza/go-dockerclient" + "github.com/hyperledger/fabric/integration/nwo" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gbytes" + "github.com/tedsuo/ifrit" + "github.com/tedsuo/ifrit/ginkgomon" +) + +var _ = Describe("Gossip Membership", func() { + var ( + testDir string + network *nwo.Network + nwprocs *networkProcesses + orderer *nwo.Orderer + peerEndpoints map[string]string = map[string]string{} + channelName string + ) + + BeforeEach(func() { + var err error + testDir, err = ioutil.TempDir("", "gossip-membership") + Expect(err).NotTo(HaveOccurred()) + + dockerClient, err := docker.NewClientFromEnv() + Expect(err).NotTo(HaveOccurred()) + + channelName = "testchannel" + network = nwo.New(nwo.BasicSolo(), testDir, dockerClient, 25000+1000*GinkgoParallelNode(), components) + network.GenerateConfigTree() + + // modify peer config + for _, peer := range network.Peers { + core := network.ReadPeerConfig(peer) + core.Peer.Gossip.AliveTimeInterval = 1 * time.Second + core.Peer.Gossip.AliveExpirationTimeout = 2 * core.Peer.Gossip.AliveTimeInterval + core.Peer.Gossip.ReconnectInterval = 2 * time.Second + core.Peer.Gossip.MsgExpirationFactor = 2 + core.Peer.Gossip.MaxConnectionAttempts = 10 + network.WritePeerConfig(peer, core) + peerEndpoints[peer.ID()] = core.Peer.Address + } + + network.Bootstrap() + orderer = network.Orderer("orderer") + nwprocs = &networkProcesses{ + network: network, + peerRunners: map[string]*ginkgomon.Runner{}, + peerProcesses: map[string]ifrit.Process{}, + ordererRunner: network.OrdererRunner(orderer), + } + nwprocs.ordererProcess = ifrit.Invoke(nwprocs.ordererRunner) + Eventually(nwprocs.ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed()) + }) + + AfterEach(func() { + if nwprocs != nil { + nwprocs.terminateAll() + } + if network != nil { + network.Cleanup() + } + os.RemoveAll(testDir) + }) + + It("updates membership when peers in the same org are stopped and restarted", func() { + peer0Org1 := network.Peer("Org1", "peer0") + peer1Org1 := network.Peer("Org1", "peer1") + + By("bringing up all peers") + startPeers(nwprocs, false, peer0Org1, peer1Org1) + + By("creating and joining a channel") + network.CreateChannel(channelName, orderer, peer0Org1) + network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1) + network.UpdateChannelAnchors(orderer, channelName) + + By("verifying non-anchor peer (peer1Org1) discovers all the peers before testing membership change on it") + Eventually(nwo.DiscoverPeers(network, peer1Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf( + network.DiscoveredPeer(peer0Org1), + network.DiscoveredPeer(peer1Org1), + )) + + By("verifying membership change on non-anchor peer (peer1Org1) when an anchor peer in the same org is stopped and restarted") + expectedMsgFromExpirationCallback := fmt.Sprintf("Do not remove bootstrap or anchor peer endpoint %s from membership", peerEndpoints[peer0Org1.ID()]) + assertPeerMembershipUpdate(network, peer1Org1, []*nwo.Peer{peer0Org1}, nwprocs, expectedMsgFromExpirationCallback) + + By("verifying anchor peer (peer0Org1) discovers all the peers before testing membership change on it") + Eventually(nwo.DiscoverPeers(network, peer0Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf( + network.DiscoveredPeer(peer0Org1), + network.DiscoveredPeer(peer1Org1), + )) + + By("verifying membership change on anchor peer (peer0Org1) when a non-anchor peer in the same org is stopped and restarted") + expectedMsgFromExpirationCallback = fmt.Sprintf("Removing member: Endpoint: %s", peerEndpoints[peer1Org1.ID()]) + assertPeerMembershipUpdate(network, peer0Org1, []*nwo.Peer{peer1Org1}, nwprocs, expectedMsgFromExpirationCallback) + }) + + It("updates peer membership when peers in another org are stopped and restarted", func() { + peer0Org1, peer1Org1 := network.Peer("Org1", "peer0"), network.Peer("Org1", "peer1") + peer0Org2, peer1Org2 := network.Peer("Org2", "peer0"), network.Peer("Org2", "peer1") + + By("bringing up all peers") + startPeers(nwprocs, false, peer0Org1, peer1Org1, peer0Org2, peer1Org2) + + By("creating and joining a channel") + network.CreateChannel(channelName, orderer, peer0Org1) + network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1, peer0Org2, peer1Org2) + network.UpdateChannelAnchors(orderer, channelName) + + By("verifying membership on peer1Org1 before testing membership change on it") + Eventually(nwo.DiscoverPeers(network, peer1Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf( + network.DiscoveredPeer(peer0Org1), + network.DiscoveredPeer(peer1Org1), + network.DiscoveredPeer(peer0Org2), + network.DiscoveredPeer(peer1Org2), + )) + + By("stopping anchor peer peer0Org1 to have only one peer in org1") + stopPeers(nwprocs, peer0Org1) + + By("verifying peer membership update when peers in another org are stopped and restarted") + expectedMsgFromExpirationCallback := fmt.Sprintf("Do not remove bootstrap or anchor peer endpoint %s from membership", peerEndpoints[peer0Org2.ID()]) + assertPeerMembershipUpdate(network, peer1Org1, []*nwo.Peer{peer0Org2, peer1Org2}, nwprocs, expectedMsgFromExpirationCallback) + }) +}) + +// networkProcesses holds references to the network, its runners, and processes. +type networkProcesses struct { + network *nwo.Network + + ordererRunner *ginkgomon.Runner + ordererProcess ifrit.Process + + peerRunners map[string]*ginkgomon.Runner + peerProcesses map[string]ifrit.Process +} + +func (n *networkProcesses) terminateAll() { + if n.ordererProcess != nil { + n.ordererProcess.Signal(syscall.SIGTERM) + Eventually(n.ordererProcess.Wait(), n.network.EventuallyTimeout).Should(Receive()) + } + for _, process := range n.peerProcesses { + process.Signal(syscall.SIGTERM) + Eventually(process.Wait(), n.network.EventuallyTimeout).Should(Receive()) + } +} + +func startPeers(n *networkProcesses, forceStateTransfer bool, peersToStart ...*nwo.Peer) { + env := []string{"FABRIC_LOGGING_SPEC=info:gossip.state=debug:gossip.discovery=debug"} + + // Setting CORE_PEER_GOSSIP_STATE_CHECKINTERVAL to 200ms (from default of 10s) will ensure that state transfer happens quickly, + // before blocks are gossipped through normal mechanisms + if forceStateTransfer { + env = append(env, "CORE_PEER_GOSSIP_STATE_CHECKINTERVAL=200ms") + } + + for _, peer := range peersToStart { + runner := n.network.PeerRunner(peer, env...) + process := ifrit.Invoke(runner) + Eventually(process.Ready(), n.network.EventuallyTimeout).Should(BeClosed()) + + n.peerProcesses[peer.ID()] = process + n.peerRunners[peer.ID()] = runner + } +} + +func stopPeers(n *networkProcesses, peersToStop ...*nwo.Peer) { + for _, peer := range peersToStop { + id := peer.ID() + proc := n.peerProcesses[id] + proc.Signal(syscall.SIGTERM) + Eventually(proc.Wait(), n.network.EventuallyTimeout).Should(Receive()) + delete(n.peerProcesses, id) + } +} + +// assertPeerMembershipUpdate stops and restart peersToRestart and verify peer membership +func assertPeerMembershipUpdate(network *nwo.Network, peer *nwo.Peer, peersToRestart []*nwo.Peer, nwprocs *networkProcesses, expectedMsgFromExpirationCallback string) { + stopPeers(nwprocs, peersToRestart...) + + // timeout is the same amount of time as it takes to remove a message from the aliveMsgStore, and add a second as buffer + core := network.ReadPeerConfig(peer) + timeout := core.Peer.Gossip.AliveExpirationTimeout*time.Duration(core.Peer.Gossip.MsgExpirationFactor) + time.Second + By("verifying peer membership after all other peers are stopped") + Eventually(nwo.DiscoverPeers(network, peer, "User1", "testchannel"), timeout, 100*time.Millisecond).Should(ConsistOf( + network.DiscoveredPeer(peer), + )) + + By("verifying expected log message from expiration callback") + runner := nwprocs.peerRunners[peer.ID()] + Eventually(runner.Err(), network.EventuallyTimeout).Should(gbytes.Say(expectedMsgFromExpirationCallback)) + + By("restarting peers") + startPeers(nwprocs, false, peersToRestart...) + + By("verifying peer membership, expected to discover restarted peers") + expectedPeers := make([]nwo.DiscoveredPeer, len(peersToRestart)+1) + expectedPeers[0] = network.DiscoveredPeer(peer) + for i, p := range peersToRestart { + expectedPeers[i+1] = network.DiscoveredPeer(p) + } + timeout = 3 * core.Peer.Gossip.ReconnectInterval + Eventually(nwo.DiscoverPeers(network, peer, "User1", "testchannel"), timeout, 100*time.Millisecond).Should(ConsistOf(expectedPeers)) +} diff --git a/integration/nwo/core_template.go b/integration/nwo/core_template.go index 498c07c5a7b..df36d812f3e 100644 --- a/integration/nwo/core_template.go +++ b/integration/nwo/core_template.go @@ -28,9 +28,10 @@ peer: timeout: 20s gossip: bootstrap: 127.0.0.1:{{ .PeerPort Peer "Listen" }} + endpoint: 127.0.0.1:{{ .PeerPort Peer "Listen" }} + externalEndpoint: 127.0.0.1:{{ .PeerPort Peer "Listen" }} useLeaderElection: true orgLeader: false - endpoint: maxBlockCountToStore: 100 maxPropagationBurstLatency: 10ms maxPropagationBurstSize: 10 @@ -52,7 +53,6 @@ peer: aliveTimeInterval: 5s aliveExpirationTimeout: 25s reconnectInterval: 25s - externalEndpoint: 127.0.0.1:{{ .PeerPort Peer "Listen" }} election: startupGracePeriod: 15s membershipSampleInterval: 1s diff --git a/integration/nwo/fabricconfig/core.go b/integration/nwo/fabricconfig/core.go index ad5a49f90d4..771bee135a4 100644 --- a/integration/nwo/fabricconfig/core.go +++ b/integration/nwo/fabricconfig/core.go @@ -91,6 +91,8 @@ type Gossip struct { AliveTimeInterval time.Duration `yaml:"aliveTimeInterval,omitempty"` AliveExpirationTimeout time.Duration `yaml:"aliveExpirationTimeout,omitempty"` ReconnectInterval time.Duration `yaml:"reconnectInterval,omitempty"` + MsgExpirationFactor int `yaml:"msgExpirationFactor,omitempty"` + MaxConnectionAttempts int `yaml:"maxConnectionAttempts,omitempty"` ExternalEndpoint string `yaml:"externalEndpoint,omitempty"` Election *GossipElection `yaml:"election,omitempty"` PvtData *GossipPvtData `yaml:"pvtData,omitempty"` diff --git a/integration/nwo/network.go b/integration/nwo/network.go index 2d6d572b037..7baada8887a 100644 --- a/integration/nwo/network.go +++ b/integration/nwo/network.go @@ -95,7 +95,7 @@ type OrdererCapabilities struct { } // Peer defines a peer instance, it's owning organization, and the list of -// channels that the peer shoudl be joined to. +// channels that the peer should be joined to. type Peer struct { Name string `yaml:"name,omitempty"` Organization string `yaml:"organization,omitempty"` @@ -1025,11 +1025,12 @@ func (n *Network) OrdererGroupRunner() ifrit.Runner { // PeerRunner returns an ifrit.Runner for the specified peer. The runner can be // used to start and manage a peer process. -func (n *Network) PeerRunner(p *Peer) *ginkgomon.Runner { +func (n *Network) PeerRunner(p *Peer, env ...string) *ginkgomon.Runner { cmd := n.peerCommand( commands.NodeStart{PeerID: p.ID()}, fmt.Sprintf("FABRIC_CFG_PATH=%s", n.PeerDir(p)), ) + cmd.Env = append(cmd.Env, env...) return ginkgomon.New(ginkgomon.Config{ AnsiColorCode: n.nextColor(), diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index d34543da995..44f79b11b00 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -153,6 +153,10 @@ peer: aliveExpirationTimeout: 25s # Reconnect interval(unit: second) reconnectInterval: 25s + # Max number of attempts to connect to a peer + maxConnectionAttempts: 120 + # Message expiration factor for alive messages + msgExpirationFactor: 20 # This is an endpoint that is published to peers outside of the organization. # If this isn't set, the peer will not be known to other organizations. externalEndpoint: