diff --git a/gossip/discovery/discovery.go b/gossip/discovery/discovery.go index 99c5a50314e..09bd56e3751 100644 --- a/gossip/discovery/discovery.go +++ b/gossip/discovery/discovery.go @@ -73,6 +73,11 @@ type CommService interface { IdentitySwitch() <-chan common.PKIidType } +// AnchorPeerTracker is an interface that is passed to discovery to check if an endpoint is an anchor peer +type AnchorPeerTracker interface { + IsAnchorPeer(endpoint string) bool +} + // NetworkMember is a peer's representation type NetworkMember struct { Endpoint string diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 8c0304a9a64..1fddfa02719 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -27,15 +27,8 @@ const DefAliveTimeInterval = 5 * time.Second const DefAliveExpirationTimeout = 5 * DefAliveTimeInterval const DefAliveExpirationCheckInterval = DefAliveExpirationTimeout / 10 const DefReconnectInterval = DefAliveExpirationTimeout -const msgExpirationFactor = 20 - -var maxConnectionAttempts = 120 - -// SetMaxConnAttempts sets the maximum number of connection -// attempts the peer would perform when invoking Connect() -func SetMaxConnAttempts(attempts int) { - maxConnectionAttempts = attempts -} +const DefMsgExpirationFactor = 20 +const DefMaxConnectionAttempts = 120 type timestamp struct { incTime time.Time @@ -74,8 +67,11 @@ type gossipDiscoveryImpl struct { aliveExpirationTimeout time.Duration aliveExpirationCheckInterval time.Duration reconnectInterval time.Duration + msgExpirationFactor int + maxConnectionAttempts int - bootstrapPeers []string + bootstrapPeers []string + anchorPeerTracker AnchorPeerTracker } type DiscoveryConfig struct { @@ -83,12 +79,14 @@ type DiscoveryConfig struct { AliveExpirationTimeout time.Duration AliveExpirationCheckInterval time.Duration ReconnectInterval time.Duration + MaxConnectionAttempts int + MsgExpirationFactor int BootstrapPeers []string } // NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy, - config DiscoveryConfig) Discovery { + config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) Discovery { d := &gossipDiscoveryImpl{ self: self, incTime: uint64(time.Now().UnixNano()), @@ -110,8 +108,11 @@ func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoServi aliveExpirationTimeout: config.AliveExpirationTimeout, aliveExpirationCheckInterval: config.AliveExpirationCheckInterval, reconnectInterval: config.ReconnectInterval, + maxConnectionAttempts: config.MaxConnectionAttempts, + msgExpirationFactor: config.MsgExpirationFactor, - bootstrapPeers: config.BootstrapPeers, + bootstrapPeers: config.BootstrapPeers, + anchorPeerTracker: anchorPeerTracker, } d.validateSelfConfig() @@ -147,7 +148,7 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) { d.logger.Debug("Entering", member) defer d.logger.Debug("Exiting") go func() { - for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ { + for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ { id, err := id() if err != nil { if d.toDie() { @@ -214,7 +215,7 @@ func (d *gossipDiscoveryImpl) validateSelfConfig() { func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *protoext.SignedGossipMessage) { nonce := message.Nonce - for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ { + for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ { sub := d.pubsub.Subscribe(fmt.Sprintf("%d", nonce), time.Second*5) d.comm.SendToPeer(peer, message) if _, timeoutErr := sub.Listen(); timeoutErr == nil { @@ -1040,7 +1041,7 @@ type aliveMsgStore struct { func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore { policy := protoext.NewGossipMessageComparator(0) trigger := func(m interface{}) {} - aliveMsgTTL := d.aliveExpirationTimeout * msgExpirationFactor + aliveMsgTTL := d.aliveExpirationTimeout * time.Duration(d.msgExpirationFactor) externalLock := func() { d.lock.Lock() } externalUnlock := func() { d.lock.Unlock() } callback := func(m interface{}) { @@ -1052,8 +1053,10 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore { id := membership.PkiId endpoint := membership.Endpoint internalEndpoint := protoext.InternalEndpoint(msg.SecretEnvelope) - if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) { - // Never remove a bootstrap peer + if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) || + d.anchorPeerTracker.IsAnchorPeer(endpoint) || d.anchorPeerTracker.IsAnchorPeer(internalEndpoint) { + // Never remove a bootstrap peer or an anchor peer + d.logger.Debugf("Do not remove bootstrap or anchor peer endpoint %s from membership", endpoint) return } d.logger.Infof("Removing member: Endpoint: %s, InternalEndpoint: %s, PKIID: %x", endpoint, internalEndpoint, id) diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index e35c39ec724..908adb37910 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -44,11 +44,13 @@ var defaultTestConfig = DiscoveryConfig{ AliveExpirationTimeout: 10 * aliveTimeInterval, AliveExpirationCheckInterval: aliveTimeInterval, ReconnectInterval: 10 * aliveTimeInterval, + MaxConnectionAttempts: DefMaxConnectionAttempts, + MsgExpirationFactor: DefMsgExpirationFactor, } func init() { util.SetupTestLogging() - maxConnectionAttempts = 10000 + defaultTestConfig.MaxConnectionAttempts = 10000 } type dummyReceivedMessage struct { @@ -76,6 +78,15 @@ func (*dummyReceivedMessage) Ack(err error) { panic("implement me") } +// mockAnchorPeerTracker implements AnchorPeerTracker interface +type mockAnchorPeerTracker struct { + apEndpoints []string +} + +func (m *mockAnchorPeerTracker) IsAnchorPeer(endpoint string) bool { + return util.Contains(endpoint, m.apEndpoints) +} + type dummyCommModule struct { validatedMessages chan *protoext.SignedGossipMessage msgsReceived uint32 @@ -378,6 +389,12 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st } func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*protoext.SignedGossipMessage), config DiscoveryConfig) *gossipInstance { + mockTracker := &mockAnchorPeerTracker{} + return createDiscoveryInstanceWithAnchorPeerTracker(port, id, bootstrapPeers, shouldGossip, pol, f, config, mockTracker) +} + +func createDiscoveryInstanceWithAnchorPeerTracker(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, + f func(*protoext.SignedGossipMessage), config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) *gossipInstance { comm := &dummyCommModule{ conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), @@ -409,7 +426,8 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo s := grpc.NewServer() config.BootstrapPeers = bootstrapPeers - discSvc := NewDiscoveryService(self, comm, comm, pol, config) + + discSvc := NewDiscoveryService(self, comm, comm, pol, config, anchorPeerTracker) for _, bootPeer := range bootstrapPeers { bp := bootPeer discSvc.Connect(NetworkMember{Endpoint: bp, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) { @@ -1246,7 +1264,7 @@ func TestMsgStoreExpiration(t *testing.T) { return true } - waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(msgExpirationFactor+5)) + waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(DefMsgExpirationFactor+5)) assertMembership(t, instances[:len(instances)-2], nodeNum-3) @@ -1265,12 +1283,14 @@ func TestExpirationNoSecretEnvelope(t *testing.T) { return nil })) + mockTracker := &mockAnchorPeerTracker{} msgStore := newAliveMsgStore(&gossipDiscoveryImpl{ aliveExpirationTimeout: time.Millisecond, lock: &sync.RWMutex{}, aliveMembership: util.NewMembershipStore(), deadMembership: util.NewMembershipStore(), logger: logger, + anchorPeerTracker: mockTracker, }) msg := &proto.GossipMessage{ @@ -1436,7 +1456,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { } // Sleep until expire - time.Sleep(defaultTestConfig.AliveExpirationTimeout * (msgExpirationFactor + 5)) + time.Sleep(defaultTestConfig.AliveExpirationTimeout * (DefMsgExpirationFactor + 5)) // Checking Alive expired for i := 0; i < peersNum; i++ { @@ -1672,7 +1692,7 @@ func TestPeerIsolation(t *testing.T) { // Sleep the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL) // Add a second as buffer - time.Sleep(config.AliveExpirationTimeout*msgExpirationFactor + time.Second) + time.Sleep(config.AliveExpirationTimeout*DefMsgExpirationFactor + time.Second) // Start again the first 2 peers and wait for all the peers to get full membership. // Especially, we want to test that peer2 won't be isolated @@ -1684,6 +1704,110 @@ func TestPeerIsolation(t *testing.T) { assertMembership(t, instances, peersNum-1) } +func TestMembershipAfterExpiration(t *testing.T) { + // Scenario: + // Start 3 peers (peer0, peer1, peer2). Set peer0 as the anchor peer. + // Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership + + config := defaultTestConfig + // Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test. + config.AliveExpirationTimeout = 2 * config.AliveTimeInterval + config.ReconnectInterval = config.AliveExpirationTimeout + config.MsgExpirationFactor = 5 + + peersNum := 3 + ports := []int{9120, 9121, 9122} + anchorPeer := "localhost:9120" + bootPeers := []string{} + instances := []*gossipInstance{} + var inst *gossipInstance + mockTracker := &mockAnchorPeerTracker{[]string{anchorPeer}} + + // use a custom logger to verify messages from expiration callback + expectedMsgs := []string{ + "Do not remove bootstrap or anchor peer endpoint localhost:9120 from membership", + "Removing member: Endpoint: localhost:9121", + } + numMsgsFound := 0 + l, err := zap.NewDevelopment() + assert.NoError(t, err) + expired := make(chan struct{}) + logger := flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error { + // do nothing if we already found all the expectedMsgs + if numMsgsFound == len(expectedMsgs) { + return nil + } + for _, msg := range expectedMsgs { + if strings.Contains(entry.Message, msg) { + numMsgsFound++ + if numMsgsFound == len(expectedMsgs) { + expired <- struct{}{} + } + break + } + } + return nil + })) + + // Start all peers, connect to the anchor peer and verify full membership + for i := 0; i < peersNum; i++ { + id := fmt.Sprintf("d%d", i) + inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *protoext.SignedGossipMessage) {}, config, mockTracker) + instances = append(instances, inst) + } + instances[peersNum-1].Discovery.(*gossipDiscoveryImpl).logger = logger + for i := 1; i < peersNum; i++ { + connect(instances[i], anchorPeer) + } + assertMembership(t, instances, peersNum-1) + + // Stop peer0 and peer1 so that peer2 would stay alone + stopInstances(t, instances[0:peersNum-1]) + + // waitTime is the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL) + // Add a second as buffer + waitTime := config.AliveExpirationTimeout*time.Duration(config.MsgExpirationFactor) + time.Second + select { + case <-expired: + case <-time.After(waitTime): + t.Fatalf("timed out") + } + // peer2's deadMembership should contain the anchor peer + deadMemeberShip := instances[peersNum-1].discoveryImpl().deadMembership + assert.Equal(t, 1, deadMemeberShip.Size()) + assertMembership(t, instances[peersNum-1:], 0) + + // Start again peer0 and peer1 and wait for all the peers to get full membership. + // Especially, we want to test that peer2 won't be isolated + for i := 0; i < peersNum-1; i++ { + id := fmt.Sprintf("d%d", i) + inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *protoext.SignedGossipMessage) {}, config, mockTracker) + instances[i] = inst + } + connect(instances[1], anchorPeer) + assertMembership(t, instances, peersNum-1) +} + +func connect(inst *gossipInstance, endpoint string) { + inst.comm.lock.Lock() + inst.comm.mock = &mock.Mock{} + inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) { + inst := inst + msg := arguments.Get(1).(*protoext.SignedGossipMessage) + if req := msg.GetMemReq(); req != nil { + inst.comm.lock.Lock() + inst.comm.mock = nil + inst.comm.lock.Unlock() + } + }) + inst.comm.mock.On("Ping", mock.Anything) + inst.comm.lock.Unlock() + netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)} + inst.Connect(netMember2Connect2, func() (identification *PeerIdentification, err error) { + return &PeerIdentification{SelfOrg: true, ID: nil}, nil + }) +} + func waitUntilOrFail(t *testing.T, pred func() bool) { waitUntilTimeoutOrFail(t, pred, timeout) } diff --git a/gossip/gossip/config.go b/gossip/gossip/config.go index 0e6c1961676..c1c970c6b5d 100644 --- a/gossip/gossip/config.go +++ b/gossip/gossip/config.go @@ -93,6 +93,10 @@ type Config struct { AliveExpirationCheckInterval time.Duration // ReconnectInterval is the Reconnect interval. ReconnectInterval time.Duration + // MsgExpirationFactor is the expiration factor for alive message TTL + MsgExpirationFactor int + // MaxConnectionAttempts is the max number of attempts to connect to a peer (wait for alive ack) + MaxConnectionAttempts int } // GlobalConfig builds a Config from the given endpoint, certificate and bootstrap peers. @@ -142,6 +146,8 @@ func (c *Config) loadConfig(endpoint string, certs *common.TLSCertificates, boot c.AliveExpirationTimeout = util.GetDurationOrDefault("peer.gossip.aliveExpirationTimeout", 5*c.AliveTimeInterval) c.AliveExpirationCheckInterval = c.AliveExpirationTimeout / 10 c.ReconnectInterval = util.GetDurationOrDefault("peer.gossip.reconnectInterval", c.AliveExpirationTimeout) + c.MaxConnectionAttempts = util.GetIntOrDefault("peer.gossip.maxConnectionAttempts", discovery.DefMaxConnectionAttempts) + c.MsgExpirationFactor = util.GetIntOrDefault("peer.gossip.msgExpirationFactor", discovery.DefMsgExpirationFactor) return nil } diff --git a/gossip/gossip/config_test.go b/gossip/gossip/config_test.go index 12d044f6a2f..dccd5778f8a 100644 --- a/gossip/gossip/config_test.go +++ b/gossip/gossip/config_test.go @@ -55,6 +55,8 @@ func TestGlobalConfig(t *testing.T) { viper.Set("peer.gossip.aliveTimeInterval", "20s") viper.Set("peer.gossip.aliveExpirationTimeout", "21s") viper.Set("peer.gossip.reconnectInterval", "22s") + viper.Set("peer.gossip.maxConnectionAttempts", "100") + viper.Set("peer.gossip.msgExpirationFactor", "10") coreConfig, err := gossip.GlobalConfig(endpoint, nil, bootstrap...) assert.NoError(t, err) @@ -95,6 +97,8 @@ func TestGlobalConfig(t *testing.T) { AliveExpirationTimeout: 21 * time.Second, AliveExpirationCheckInterval: 21 * time.Second / 10, // AliveExpirationTimeout / 10 ReconnectInterval: 22 * time.Second, + MaxConnectionAttempts: 100, + MsgExpirationFactor: 10, } assert.Equal(t, expectedConfig, coreConfig) @@ -148,6 +152,8 @@ func TestGlobalConfigDefaults(t *testing.T) { AliveExpirationTimeout: 5 * discovery.DefAliveTimeInterval, AliveExpirationCheckInterval: 5 * discovery.DefAliveTimeInterval / 10, ReconnectInterval: 5 * discovery.DefAliveTimeInterval, + MaxConnectionAttempts: 120, + MsgExpirationFactor: 20, } assert.Equal(t, expectedConfig, coreConfig) diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 12ea1041ea7..c75e3f4418d 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -71,7 +71,8 @@ type Node struct { // New creates a gossip instance attached to a gRPC server func New(conf *Config, s *grpc.Server, sa api.SecurityAdvisor, mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, - secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics) *Node { + secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics, + anchorPeerTracker discovery.AnchorPeerTracker) *Node { var err error lgr := util.GetLogger(util.GossipLogger, conf.ID) @@ -126,10 +127,12 @@ func New(conf *Config, s *grpc.Server, sa api.SecurityAdvisor, AliveExpirationTimeout: conf.AliveExpirationTimeout, AliveExpirationCheckInterval: conf.AliveExpirationCheckInterval, ReconnectInterval: conf.ReconnectInterval, + MaxConnectionAttempts: conf.MaxConnectionAttempts, + MsgExpirationFactor: conf.MsgExpirationFactor, BootstrapPeers: conf.BootstrapPeers, } g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy, - discoveryConfig) + discoveryConfig, anchorPeerTracker) g.logger.Infof("Creating gossip service with self membership of %s", g.selfNetworkMember()) g.certPuller = g.createCertStorePuller() diff --git a/gossip/gossip/gossip_test.go b/gossip/gossip/gossip_test.go index 3a9dcd143e3..c81e95f39ca 100644 --- a/gossip/gossip/gossip_test.go +++ b/gossip/gossip/gossip_test.go @@ -41,7 +41,6 @@ var timeout = time.Second * time.Duration(180) func TestMain(m *testing.M) { util.SetupTestLogging() rand.Seed(int64(time.Now().Second())) - discovery.SetMaxConnAttempts(5) factory.InitFactories(nil) os.Exit(m.Run()) } @@ -52,6 +51,8 @@ var discoveryConfig = discovery.DiscoveryConfig{ AliveExpirationTimeout: 10 * aliveTimeInterval, AliveExpirationCheckInterval: aliveTimeInterval, ReconnectInterval: aliveTimeInterval, + MaxConnectionAttempts: 5, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } var orgInChannelA = api.OrgIdentityType("ORG1") @@ -244,10 +245,12 @@ func newGossipInstanceWithGrpcMcsMetrics(id int, port int, gRPCServer *corecomm. AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := New(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, - secureDialOpts, metrics) + secureDialOpts, metrics, nil) go func() { gRPCServer.Start() }() @@ -301,10 +304,12 @@ func newGossipInstanceWithGRPCWithOnlyPull(id int, port int, gRPCServer *corecom AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := New(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, - secureDialOpts, metrics) + secureDialOpts, metrics, nil) go func() { gRPCServer.Start() }() diff --git a/gossip/gossip/orgs_test.go b/gossip/gossip/orgs_test.go index 879b2e922ca..dc6c275c716 100644 --- a/gossip/gossip/orgs_test.go +++ b/gossip/gossip/orgs_test.go @@ -132,10 +132,12 @@ func newGossipInstanceWithGRPCWithExternalEndpoint(id int, port int, gRPCServer AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout, AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval, ReconnectInterval: discoveryConfig.ReconnectInterval, + MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts, + MsgExpirationFactor: discoveryConfig.MsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) g := New(conf, gRPCServer.Server(), mcs, mcs, selfID, - secureDialOpts, metrics.NewGossipMetrics(&disabled.Provider{})) + secureDialOpts, metrics.NewGossipMetrics(&disabled.Provider{}), nil) go func() { gRPCServer.Start() }() diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 8734f415f75..110ac4b00b3 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package service import ( + "fmt" "sync" gproto "github.com/hyperledger/fabric-protos-go/gossip" @@ -159,20 +160,22 @@ func (p privateHandler) close() { p.reconciler.Stop() } +// GossipService handles the interaction between gossip service and peer type GossipService struct { gossipSvc - privateHandlers map[string]privateHandler - chains map[string]state.GossipStateProvider - leaderElection map[string]election.LeaderElectionService - deliveryService map[string]deliverservice.DeliverService - deliveryFactory DeliveryServiceFactory - lock sync.RWMutex - mcs api.MessageCryptoService - peerIdentity []byte - secAdv api.SecurityAdvisor - metrics *gossipmetrics.GossipMetrics - serviceConfig *ServiceConfig - privdataConfig *gossipprivdata.PrivdataConfig + privateHandlers map[string]privateHandler + chains map[string]state.GossipStateProvider + leaderElection map[string]election.LeaderElectionService + deliveryService map[string]deliverservice.DeliverService + deliveryFactory DeliveryServiceFactory + lock sync.RWMutex + mcs api.MessageCryptoService + peerIdentity []byte + secAdv api.SecurityAdvisor + metrics *gossipmetrics.GossipMetrics + serviceConfig *ServiceConfig + privdataConfig *gossipprivdata.PrivdataConfig + anchorPeerTracker *anchorPeerTracker } // This is an implementation of api.JoinChannelMessage. @@ -199,6 +202,33 @@ func (jcm *joinChannelMessage) AnchorPeersOf(org api.OrgIdentityType) []api.Anch return jcm.members2AnchorPeers[string(org)] } +// anchorPeerTracker maintains anchor peer endpoints for all the channels. +type anchorPeerTracker struct { + // allEndpoints contains anchor peer endpoints for all the channels, + // its key is channel name, value is map of anchor peer endpoints + allEndpoints map[string]map[string]struct{} + mutex sync.RWMutex +} + +// update overwrites the anchor peer endpoints for the channel +func (t *anchorPeerTracker) update(channelName string, endpoints map[string]struct{}) { + t.mutex.Lock() + defer t.mutex.Unlock() + t.allEndpoints[channelName] = endpoints +} + +// IsAnchorPeer checks if an endpoint is an anchor peer in any channel +func (t *anchorPeerTracker) IsAnchorPeer(endpoint string) bool { + t.mutex.RLock() + defer t.mutex.RUnlock() + for _, endpointsForChannel := range t.allEndpoints { + if _, ok := endpointsForChannel[endpoint]; ok { + return true + } + } + return false +} + var logger = util.GetLogger(util.ServiceLogger, "") // New creates the gossip service. @@ -224,6 +254,7 @@ func New( logger.Infof("Initialize gossip with endpoint %s", endpoint) + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} gossipComponent := gossip.New( gossipConfig, s, @@ -232,6 +263,7 @@ func New( serializedIdentity, secureDialOpts, gossipMetrics, + anchorPeerTracker, ) return &GossipService{ @@ -247,11 +279,12 @@ func New( deliverGRPCClient: deliverGRPCClient, deliverServiceConfig: deliverServiceConfig, }, - peerIdentity: serializedIdentity, - secAdv: secAdv, - metrics: gossipMetrics, - serviceConfig: serviceConfig, - privdataConfig: privdataConfig, + peerIdentity: serializedIdentity, + secAdv: secAdv, + metrics: gossipMetrics, + serviceConfig: serviceConfig, + privdataConfig: privdataConfig, + anchorPeerTracker: anchorPeerTracker, }, nil } @@ -410,6 +443,7 @@ func (g *GossipService) updateAnchors(config Config) { return } jcm := &joinChannelMessage{seqNum: config.Sequence(), members2AnchorPeers: map[string][]api.AnchorPeer{}} + anchorPeerEndpoints := map[string]struct{}{} for _, appOrg := range config.Organizations() { logger.Debug(appOrg.MSPID(), "anchor peers:", appOrg.AnchorPeers()) jcm.members2AnchorPeers[appOrg.MSPID()] = []api.AnchorPeer{} @@ -419,8 +453,10 @@ func (g *GossipService) updateAnchors(config Config) { Port: int(ap.Port), } jcm.members2AnchorPeers[appOrg.MSPID()] = append(jcm.members2AnchorPeers[appOrg.MSPID()], anchorPeer) + anchorPeerEndpoints[fmt.Sprintf("%s:%d", ap.Host, ap.Port)] = struct{}{} } } + g.anchorPeerTracker.update(config.ChannelID(), anchorPeerEndpoints) // Initialize new state provider for given committer logger.Debug("Creating state provider for channelID", config.ChannelID()) diff --git a/gossip/service/gossip_service_test.go b/gossip/service/gossip_service_test.go index c49357c7149..485dc38f607 100644 --- a/gossip/service/gossip_service_test.go +++ b/gossip/service/gossip_service_test.go @@ -740,6 +740,8 @@ func newGossipInstance(serviceConfig *ServiceConfig, port int, id int, gRPCServe AliveExpirationTimeout: discovery.DefAliveExpirationTimeout, AliveExpirationCheckInterval: discovery.DefAliveExpirationCheckInterval, ReconnectInterval: time.Duration(1) * time.Second, + MaxConnectionAttempts: discovery.DefMaxConnectionAttempts, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } selfID := api.PeerIdentityType(conf.InternalEndpoint) cryptoService := &naiveCryptoService{} @@ -752,6 +754,7 @@ func newGossipInstance(serviceConfig *ServiceConfig, port int, id int, gRPCServe selfID, secureDialOpts, metrics, + nil, ) go gRPCServer.Start() @@ -918,7 +921,7 @@ func TestChannelConfig(t *testing.T) { assert.NoError(t, err) mockSignerSerializer := &mocks.SignerSerializer{} - mockSignerSerializer.SerializeReturns(api.PeerIdentityType("peer-identity"), nil) + mockSignerSerializer.SerializeReturns(api.PeerIdentityType(string(orgInChannelA)), nil) secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager(cryptoProvider)) gossipConfig, err := gossip.GlobalConfig(endpoint, nil) assert.NoError(t, err) @@ -962,11 +965,15 @@ func TestChannelConfig(t *testing.T) { orgs: map[string]channelconfig.ApplicationOrg{ string(orgInChannelA): &appGrp{ mspID: string(orgInChannelA), - anchorPeers: []*peer.AnchorPeer{}, + anchorPeers: []*peer.AnchorPeer{{Host: "localhost", Port: 2001}}, }, }, } gService.JoinChan(jcm, gossipcommon.ChannelID("A")) + // use mock secAdv so that gService.secAdv.OrgByPeerIdentity can return the matched identity + gService.secAdv = &secAdvMock{} gService.updateAnchors(mc) assert.True(t, gService.amIinChannel(string(orgInChannelA), mc)) + assert.True(t, gService.anchorPeerTracker.IsAnchorPeer("localhost:2001")) + assert.False(t, gService.anchorPeerTracker.IsAnchorPeer("localhost:5000")) } diff --git a/gossip/service/join_test.go b/gossip/service/join_test.go index 50c8ad00da0..59db08b333a 100644 --- a/gossip/service/join_test.go +++ b/gossip/service/join_test.go @@ -168,7 +168,8 @@ func TestJoinChannelConfig(t *testing.T) { g1SvcMock.On("JoinChan", mock.Anything, mock.Anything).Run(func(_ mock.Arguments) { failChan <- struct{}{} }) - g1 := &GossipService{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("OrgMSP0"), gossipSvc: g1SvcMock} + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} + g1 := &GossipService{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("OrgMSP0"), gossipSvc: g1SvcMock, anchorPeerTracker: anchorPeerTracker} g1.updateAnchors(&configMock{ orgs2AppOrgs: map[string]channelconfig.ApplicationOrg{ "Org0": &appOrgMock{id: "Org0"}, @@ -185,7 +186,7 @@ func TestJoinChannelConfig(t *testing.T) { g2SvcMock.On("JoinChan", mock.Anything, mock.Anything).Run(func(_ mock.Arguments) { succChan <- struct{}{} }) - g2 := &GossipService{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: g2SvcMock} + g2 := &GossipService{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: g2SvcMock, anchorPeerTracker: anchorPeerTracker} g2.updateAnchors(&configMock{ orgs2AppOrgs: map[string]channelconfig.ApplicationOrg{ "Org0": &appOrgMock{id: "Org0"}, @@ -217,7 +218,8 @@ func TestJoinChannelNoAnchorPeers(t *testing.T) { assert.Equal(t, "A", string(channel)) }) - g := &GossipService{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: gMock} + anchorPeerTracker := &anchorPeerTracker{allEndpoints: map[string]map[string]struct{}{}} + g := &GossipService{secAdv: &secAdvMock{}, peerIdentity: api.PeerIdentityType("Org0"), gossipSvc: gMock, anchorPeerTracker: anchorPeerTracker} appOrg0 := &appOrgMock{id: "Org0"} appOrg1 := &appOrgMock{id: "Org1"} diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index e91dfff8405..c5c20b97746 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -387,11 +387,13 @@ func newPeerNodeWithGossipWithValidatorWithMetrics(logger gutil.Logger, id int, AliveExpirationTimeout: discovery.DefAliveExpirationTimeout, AliveExpirationCheckInterval: discovery.DefAliveExpirationCheckInterval, ReconnectInterval: discovery.DefReconnectInterval, + MaxConnectionAttempts: discovery.DefMaxConnectionAttempts, + MsgExpirationFactor: discovery.DefMsgExpirationFactor, } selfID := api.PeerIdentityType(config.InternalEndpoint) mcs := &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor} - g = gossip.New(config, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, secureDialOpts, gossipMetrics) + g = gossip.New(config, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID, secureDialOpts, gossipMetrics, nil) } g.JoinChan(&joinChanMsg{}, common.ChannelID("testchannelid")) diff --git a/integration/gossip/gossip_test.go b/integration/gossip/gossip_test.go index 2d600401482..8302483ee4e 100644 --- a/integration/gossip/gossip_test.go +++ b/integration/gossip/gossip_test.go @@ -11,6 +11,7 @@ import ( "io/ioutil" "os" "syscall" + "time" docker "github.com/fsouza/go-dockerclient" "github.com/hyperledger/fabric/integration/nwo" @@ -23,12 +24,13 @@ import ( "github.com/tedsuo/ifrit/ginkgomon" ) -var _ = Describe("Gossip State Transfer", func() { +var _ = Describe("Gossip State Transfer and Membership", func() { var ( - testDir string - network *nwo.Network - nwprocs *networkProcesses - chaincode nwo.Chaincode + testDir string + network *nwo.Network + nwprocs *networkProcesses + chaincode nwo.Chaincode + channelName string ) BeforeEach(func() { @@ -39,31 +41,10 @@ var _ = Describe("Gossip State Transfer", func() { dockerClient, err := docker.NewClientFromEnv() Expect(err).NotTo(HaveOccurred()) + channelName = "testchannel" network = nwo.New(nwo.FullSolo(), testDir, dockerClient, StartPort(), components) network.GenerateConfigTree() - // modify peer config - // Org1: leader election - // Org2: no leader election - // peer0: follower - // peer1: leader - for _, peer := range network.Peers { - if peer.Organization == "Org1" { - if peer.Name == "peer1" { - core := network.ReadPeerConfig(peer) - core.Peer.Gossip.Bootstrap = fmt.Sprintf("127.0.0.1:%d", network.ReservePort()) - network.WritePeerConfig(peer, core) - } - } - if peer.Organization == "Org2" { - core := network.ReadPeerConfig(peer) - core.Peer.Gossip.UseLeaderElection = false - core.Peer.Gossip.OrgLeader = peer.Name == "peer1" - network.WritePeerConfig(peer, core) - } - } - - network.Bootstrap() nwprocs = &networkProcesses{ network: network, peerRunners: map[string]*ginkgomon.Runner{}, @@ -90,6 +71,28 @@ var _ = Describe("Gossip State Transfer", func() { }) It("syncs blocks from the peer when no orderer is available", func() { + // modify peer config + // Org1: leader election + // Org2: no leader election + // peer0: follower + // peer1: leader + for _, peer := range network.Peers { + if peer.Organization == "Org1" { + if peer.Name == "peer1" { + core := network.ReadPeerConfig(peer) + core.Peer.Gossip.Bootstrap = fmt.Sprintf("127.0.0.1:%d", network.ReservePort()) + network.WritePeerConfig(peer, core) + } + } + if peer.Organization == "Org2" { + core := network.ReadPeerConfig(peer) + core.Peer.Gossip.UseLeaderElection = false + core.Peer.Gossip.OrgLeader = peer.Name == "peer1" + network.WritePeerConfig(peer, core) + } + } + + network.Bootstrap() orderer := network.Orderer("orderer") nwprocs.ordererRunner = network.OrdererRunner(orderer) nwprocs.ordererProcess = ifrit.Invoke(nwprocs.ordererRunner) @@ -101,7 +104,6 @@ var _ = Describe("Gossip State Transfer", func() { By("bringing up all four peers") startPeers(nwprocs, false, peer0Org1, peer1Org1, peer0Org2, peer1Org2) - channelName := "testchannel" network.CreateChannel(channelName, orderer, peer0Org1) By("joining all peers to channel") network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1, peer0Org2, peer1Org2) @@ -140,6 +142,88 @@ var _ = Describe("Gossip State Transfer", func() { // that receives blocks from orderer and will therefore serve as the provider of blocks to all other peers. sendTransactionsAndSyncUpPeers(nwprocs, orderer, basePeerForTransactions, channelName, peer0Org1, peer1Org2) }) + + When("gossip connection is lost and restored", func() { + var ( + orderer *nwo.Orderer + peerEndpoints map[string]string = map[string]string{} + ) + + BeforeEach(func() { + // modify peer config + for _, peer := range network.Peers { + core := network.ReadPeerConfig(peer) + core.Peer.Gossip.AliveTimeInterval = 1 * time.Second + core.Peer.Gossip.AliveExpirationTimeout = 2 * core.Peer.Gossip.AliveTimeInterval + core.Peer.Gossip.ReconnectInterval = 2 * time.Second + core.Peer.Gossip.MsgExpirationFactor = 2 + core.Peer.Gossip.MaxConnectionAttempts = 10 + network.WritePeerConfig(peer, core) + peerEndpoints[peer.ID()] = core.Peer.Address + } + + network.Bootstrap() + orderer = network.Orderer("orderer") + nwprocs.ordererRunner = network.OrdererRunner(orderer) + nwprocs.ordererProcess = ifrit.Invoke(nwprocs.ordererRunner) + Eventually(nwprocs.ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed()) + }) + + It("updates membership when peers in the same org are stopped and restarted", func() { + peer0Org1 := network.Peer("Org1", "peer0") + peer1Org1 := network.Peer("Org1", "peer1") + + By("bringing up all peers") + startPeers(nwprocs, false, peer0Org1, peer1Org1) + + By("creating and joining a channel") + network.CreateChannel(channelName, orderer, peer0Org1) + network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1) + network.UpdateChannelAnchors(orderer, channelName) + + By("verifying membership on anchor peer peer0Org1") + Eventually(nwo.DiscoverPeers(network, peer0Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf( + network.DiscoveredPeer(peer0Org1, "_lifecycle"), + network.DiscoveredPeer(peer1Org1, "_lifecycle"), + )) + + By("verifying peer membership when an anchor peer in the same org is stopped and restarted") + expectedMsgFromExpirationCallback := fmt.Sprintf("Do not remove bootstrap or anchor peer endpoint %s from membership", peerEndpoints[peer0Org1.ID()]) + assertPeerMembershipUpdate(network, peer1Org1, []*nwo.Peer{peer0Org1}, nwprocs, expectedMsgFromExpirationCallback) + + By("verifying peer membership when a non-anchor peer in the same org is stopped and restarted") + expectedMsgFromExpirationCallback = fmt.Sprintf("Removing member: Endpoint: %s", peerEndpoints[peer1Org1.ID()]) + assertPeerMembershipUpdate(network, peer0Org1, []*nwo.Peer{peer1Org1}, nwprocs, expectedMsgFromExpirationCallback) + }) + + It("updates peer membership when peers in another org are stopped and restarted", func() { + peer0Org1, peer1Org1 := network.Peer("Org1", "peer0"), network.Peer("Org1", "peer1") + peer0Org2, peer1Org2 := network.Peer("Org2", "peer0"), network.Peer("Org2", "peer1") + + By("bringing up all peers") + startPeers(nwprocs, false, peer0Org1, peer1Org1, peer0Org2, peer1Org2) + + By("creating and joining a channel") + network.CreateChannel(channelName, orderer, peer0Org1) + network.JoinChannel(channelName, orderer, peer0Org1, peer1Org1, peer0Org2, peer1Org2) + network.UpdateChannelAnchors(orderer, channelName) + + By("verifying membership on peer1Org1") + Eventually(nwo.DiscoverPeers(network, peer1Org1, "User1", "testchannel"), network.EventuallyTimeout).Should(ConsistOf( + network.DiscoveredPeer(peer0Org1, "_lifecycle"), + network.DiscoveredPeer(peer1Org1, "_lifecycle"), + network.DiscoveredPeer(peer0Org2, "_lifecycle"), + network.DiscoveredPeer(peer1Org2, "_lifecycle"), + )) + + By("stopping anchor peer peer0Org1 to have only one peer in org1") + stopPeers(nwprocs, peer0Org1) + + By("verifying peer membership update when peers in another org are stopped and restarted") + expectedMsgFromExpirationCallback := fmt.Sprintf("Do not remove bootstrap or anchor peer endpoint %s from membership", peerEndpoints[peer0Org2.ID()]) + assertPeerMembershipUpdate(network, peer1Org1, []*nwo.Peer{peer0Org2, peer1Org2}, nwprocs, expectedMsgFromExpirationCallback) + }) + }) }) func runTransactions(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, chaincodeName string, channelID string) { @@ -183,7 +267,7 @@ func (n *networkProcesses) terminateAll() { } func startPeers(n *networkProcesses, forceStateTransfer bool, peersToStart ...*nwo.Peer) { - env := []string{"FABRIC_LOGGING_SPEC=info:gossip.state=debug"} + env := []string{"FABRIC_LOGGING_SPEC=info:gossip.state=debug:gossip.discovery=debug"} // Setting CORE_PEER_GOSSIP_STATE_CHECKINTERVAL to 200ms (from default of 10s) will ensure that state transfer happens quickly, // before blocks are gossipped through normal mechanisms @@ -241,3 +325,32 @@ func sendTransactionsAndSyncUpPeers(n *networkProcesses, orderer *nwo.Orderer, b n.ordererProcess = ifrit.Invoke(n.ordererRunner) Eventually(n.ordererProcess.Ready(), n.network.EventuallyTimeout).Should(BeClosed()) } + +// assertPeerMembershipUpdate stops and restart peersToRestart and verify peer membership +func assertPeerMembershipUpdate(network *nwo.Network, peer *nwo.Peer, peersToRestart []*nwo.Peer, nwprocs *networkProcesses, expectedMsgFromExpirationCallback string) { + stopPeers(nwprocs, peersToRestart...) + + // timeout is the same amount of time as it takes to remove a message from the aliveMsgStore, and add a second as buffer + core := network.ReadPeerConfig(peer) + timeout := core.Peer.Gossip.AliveExpirationTimeout*time.Duration(core.Peer.Gossip.MsgExpirationFactor) + time.Second + By("verifying peer membership after all other peers are stopped") + Eventually(nwo.DiscoverPeers(network, peer, "User1", "testchannel"), timeout, 100*time.Millisecond).Should(ConsistOf( + network.DiscoveredPeer(peer, "_lifecycle"), + )) + + By("verifying expected log message from expiration callback") + runner := nwprocs.peerRunners[peer.ID()] + Eventually(runner.Err(), network.EventuallyTimeout).Should(gbytes.Say(expectedMsgFromExpirationCallback)) + + By("restarting peers") + startPeers(nwprocs, false, peersToRestart...) + + By("verifying peer membership, expected to discover restarted peers") + expectedPeers := make([]nwo.DiscoveredPeer, len(peersToRestart)+1) + expectedPeers[0] = network.DiscoveredPeer(peer, "_lifecycle") + for i, p := range peersToRestart { + expectedPeers[i+1] = network.DiscoveredPeer(p, "_lifecycle") + } + timeout = 3 * core.Peer.Gossip.ReconnectInterval + Eventually(nwo.DiscoverPeers(network, peer, "User1", "testchannel"), timeout, 100*time.Millisecond).Should(ConsistOf(expectedPeers)) +} diff --git a/integration/nwo/fabricconfig/core.go b/integration/nwo/fabricconfig/core.go index 77e653c13c2..fda3dfcb462 100644 --- a/integration/nwo/fabricconfig/core.go +++ b/integration/nwo/fabricconfig/core.go @@ -93,6 +93,8 @@ type Gossip struct { AliveTimeInterval time.Duration `yaml:"aliveTimeInterval,omitempty"` AliveExpirationTimeout time.Duration `yaml:"aliveExpirationTimeout,omitempty"` ReconnectInterval time.Duration `yaml:"reconnectInterval,omitempty"` + MsgExpirationFactor int `yaml:"msgExpirationFactor,omitempty"` + MaxConnectionAttempts int `yaml:"maxConnectionAttempts,omitempty"` ExternalEndpoint string `yaml:"externalEndpoint,omitempty"` Election *GossipElection `yaml:"election,omitempty"` PvtData *GossipPvtData `yaml:"pvtData,omitempty"` diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 9fc4bd42ee7..c4a6cb115c8 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -160,6 +160,10 @@ peer: aliveExpirationTimeout: 25s # Reconnect interval(unit: second) reconnectInterval: 25s + # Max number of attempts to connect to a peer + maxConnectionAttempts: 120 + # Message expiration factor for alive messages + msgExpirationFactor: 20 # This is an endpoint that is published to peers outside of the organization. # If this isn't set, the peer will not be known to other organizations. externalEndpoint: