diff --git a/gossip/discovery/discovery_impl.go b/gossip/discovery/discovery_impl.go index 6900210dc7f..c184f752a79 100644 --- a/gossip/discovery/discovery_impl.go +++ b/gossip/discovery/discovery_impl.go @@ -75,6 +75,8 @@ type gossipDiscoveryImpl struct { aliveExpirationTimeout time.Duration aliveExpirationCheckInterval time.Duration reconnectInterval time.Duration + + bootstrapPeers []string } type DiscoveryConfig struct { @@ -82,6 +84,7 @@ type DiscoveryConfig struct { AliveExpirationTimeout time.Duration AliveExpirationCheckInterval time.Duration ReconnectInterval time.Duration + BootstrapPeers []string } // NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed @@ -109,6 +112,8 @@ func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoServi aliveExpirationTimeout: config.AliveExpirationTimeout, aliveExpirationCheckInterval: config.AliveExpirationCheckInterval, reconnectInterval: config.ReconnectInterval, + + bootstrapPeers: config.BootstrapPeers, } d.validateSelfConfig() @@ -1021,7 +1026,15 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore { if !msg.IsAliveMsg() { return } - id := msg.GetAliveMsg().Membership.PkiId + membership := msg.GetAliveMsg().Membership + id := membership.PkiId + endpoint := membership.Endpoint + internalEndpoint := msg.SecretEnvelope.InternalEndpoint() + if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) { + // Never remove a bootstrap peer + return + } + d.logger.Infof("Removing member: Endpoint: %s, InternalEndpoint: %s, PKIID: %x", endpoint, internalEndpoint, id) d.aliveMembership.Remove(id) d.deadMembership.Remove(id) delete(d.id2Member, string(id)) diff --git a/gossip/discovery/discovery_test.go b/gossip/discovery/discovery_test.go index d7ee548f8e4..8f12cc693a4 100644 --- a/gossip/discovery/discovery_test.go +++ b/gossip/discovery/discovery_test.go @@ -29,12 +29,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" ) var timeout = time.Second * time.Duration(15) var aliveTimeInterval = time.Duration(time.Millisecond * 300) -var config = DiscoveryConfig{ +var defaultTestConfig = DiscoveryConfig{ AliveTimeInterval: aliveTimeInterval, AliveExpirationTimeout: 10 * aliveTimeInterval, AliveExpirationCheckInterval: aliveTimeInterval, @@ -84,6 +85,7 @@ type dummyCommModule struct { incMsgs chan proto.ReceivedMessage lastSeqs map[string]uint64 shouldGossip bool + disableComm bool mock *mock.Mock } @@ -134,7 +136,7 @@ func (comm *dummyCommModule) SignMessage(am *proto.GossipMessage, internalEndpoi } func (comm *dummyCommModule) Gossip(msg *proto.SignedGossipMessage) { - if !comm.shouldGossip { + if !comm.shouldGossip || comm.disableComm { return } comm.lock.Lock() @@ -145,7 +147,7 @@ func (comm *dummyCommModule) Gossip(msg *proto.SignedGossipMessage) { } func (comm *dummyCommModule) Forward(msg proto.ReceivedMessage) { - if !comm.shouldGossip { + if !comm.shouldGossip || comm.disableComm { return } comm.lock.Lock() @@ -156,6 +158,9 @@ func (comm *dummyCommModule) Forward(msg proto.ReceivedMessage) { } func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.SignedGossipMessage) { + if comm.disableComm { + return + } comm.lock.RLock() _, exists := comm.streams[peer.Endpoint] mock := comm.mock @@ -179,6 +184,9 @@ func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.SignedGo } func (comm *dummyCommModule) Ping(peer *NetworkMember) bool { + if comm.disableComm { + return false + } comm.lock.Lock() defer comm.lock.Unlock() @@ -187,7 +195,8 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool { } _, alreadyExists := comm.streams[peer.Endpoint] - if !alreadyExists { + conn := comm.conns[peer.Endpoint] + if !alreadyExists || conn.GetState() == connectivity.Shutdown { newConn, err := grpc.Dial(peer.Endpoint, grpc.WithInsecure()) if err != nil { return false @@ -199,7 +208,6 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool { } return false } - conn := comm.conns[peer.Endpoint] if _, err := proto.NewGossipClient(conn).Ping(context.Background(), &proto.Empty{}); err != nil { return false } @@ -341,22 +349,26 @@ var noopPolicy = func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) { } func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *gossipInstance { - return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true, noopPolicy) + return createDiscoveryInstanceCustomConfig(port, id, bootstrapPeers, defaultTestConfig) +} + +func createDiscoveryInstanceCustomConfig(port int, id string, bootstrapPeers []string, config DiscoveryConfig) *gossipInstance { + return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true, noopPolicy, config) } func createDiscoveryInstanceWithNoGossip(port int, id string, bootstrapPeers []string) *gossipInstance { - return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, noopPolicy) + return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, noopPolicy, defaultTestConfig) } func createDiscoveryInstanceWithNoGossipWithDisclosurePolicy(port int, id string, bootstrapPeers []string, pol DisclosurePolicy) *gossipInstance { - return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, pol) + return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, pol, defaultTestConfig) } -func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy) *gossipInstance { - return createDiscoveryInstanceThatGossipsWithInterceptors(port, id, bootstrapPeers, shouldGossip, pol, func(_ *proto.SignedGossipMessage) {}) +func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, config DiscoveryConfig) *gossipInstance { + return createDiscoveryInstanceThatGossipsWithInterceptors(port, id, bootstrapPeers, shouldGossip, pol, func(_ *proto.SignedGossipMessage) {}, config) } -func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*proto.SignedGossipMessage)) *gossipInstance { +func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*proto.SignedGossipMessage), config DiscoveryConfig) *gossipInstance { comm := &dummyCommModule{ conns: make(map[string]*grpc.ClientConn), streams: make(map[string]proto.Gossip_GossipStreamClient), @@ -367,6 +379,7 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo lock: &sync.RWMutex{}, lastSeqs: make(map[string]uint64), shouldGossip: shouldGossip, + disableComm: false, } endpoint := fmt.Sprintf("localhost:%d", port) @@ -384,6 +397,7 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo } s := grpc.NewServer() + config.BootstrapPeers = bootstrapPeers discSvc := NewDiscoveryService(self, comm, comm, pol, config) for _, bootPeer := range bootstrapPeers { bp := bootPeer @@ -581,7 +595,7 @@ func TestValidation(t *testing.T) { // p1 sends a (membership) request to p3, and receives a (membership) response back. // p2 sends a (membership) request to p1. // Therefore, p1 receives both a membership request and a response. - p1 := createDiscoveryInstanceThatGossipsWithInterceptors(4675, "p1", []string{bootPeer(4677)}, true, noopPolicy, interceptor) + p1 := createDiscoveryInstanceThatGossipsWithInterceptors(4675, "p1", []string{bootPeer(4677)}, true, noopPolicy, interceptor, defaultTestConfig) p2 := createDiscoveryInstance(4676, "p2", []string{bootPeer(4675)}) p3 := createDiscoveryInstance(4677, "p3", nil) instances := []*gossipInstance{p1, p2, p3} @@ -775,12 +789,12 @@ func TestInitiateSync(t *testing.T) { if atomic.LoadInt32(&toDie) == int32(1) { return } - time.Sleep(config.AliveExpirationTimeout / 3) + time.Sleep(defaultTestConfig.AliveExpirationTimeout / 3) inst.InitiateSync(9) } }() } - time.Sleep(config.AliveExpirationTimeout * 4) + time.Sleep(defaultTestConfig.AliveExpirationTimeout * 4) assertMembership(t, instances, nodeNum-1) atomic.StoreInt32(&toDie, int32(1)) stopInstances(t, instances) @@ -1041,7 +1055,7 @@ func createDisjointPeerGroupsWithNoGossip(bootPeerMap map[int]int) ([]*gossipIns bootPeers := []string{bootPeer(bootPeerMap[port])} pol := discPolForPeer(port) inst := createDiscoveryInstanceWithNoGossipWithDisclosurePolicy(8610+group*5+i, id, bootPeers, pol) - inst.initiateSync(config.AliveExpirationTimeout/3, 10) + inst.initiateSync(defaultTestConfig.AliveExpirationTimeout/3, 10) if group == 0 { instances1 = append(instances1, inst) } else { @@ -1150,7 +1164,7 @@ func TestMsgStoreExpiration(t *testing.T) { return true } - waitUntilTimeoutOrFail(t, checkMessages, config.AliveExpirationTimeout*(msgExpirationFactor+5)) + waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(msgExpirationFactor+5)) assertMembership(t, instances[:len(instances)-2], nodeNum-3) @@ -1182,6 +1196,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { for i := 0; i < peersNum; i++ { id := fmt.Sprintf("d%d", i) inst := createDiscoveryInstanceWithNoGossip(22610+i, id, bootPeers) + inst.comm.disableComm = true instances = append(instances, inst) } @@ -1302,11 +1317,11 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { } // Sleep until expire - time.Sleep(config.AliveExpirationTimeout * (msgExpirationFactor + 5)) + time.Sleep(defaultTestConfig.AliveExpirationTimeout * (msgExpirationFactor + 5)) // Checking Alive expired for i := 0; i < peersNum; i++ { - checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step3 - expiration in msg store]") + checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step 3 - expiration in msg store]") } // Processing old MembershipRequest @@ -1327,7 +1342,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // MembershipRequest processing didn't change anything for i := 0; i < peersNum; i++ { - checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step4 - memReq processing after expiration]") + checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 4 - memReq processing after expiration]") } // Processing old (later) Alive messages @@ -1344,8 +1359,8 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // Alive msg processing didn't change anything for i := 0; i < peersNum; i++ { - checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step5.1 - after lost old aliveMsg process]") - checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step5.2 - after lost new aliveMsg process]") + checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 5.1 - after lost old aliveMsg process]") + checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step 5.2 - after lost new aliveMsg process]") } // Handling old MembershipResponse messages @@ -1370,7 +1385,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) { // MembershipResponse msg processing didn't change anything for i := 0; i < peersNum; i++ { - checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step6 - after lost MembershipResp process]") + checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 6 - after lost MembershipResp process]") } for i := 0; i < peersNum; i++ { @@ -1437,11 +1452,11 @@ func TestMemRespDisclosurePol(t *testing.T) { return m.Envelope } } - d1 := createDiscoveryInstanceThatGossips(7878, "d1", []string{}, true, pol) + d1 := createDiscoveryInstanceThatGossips(7878, "d1", []string{}, true, pol, defaultTestConfig) defer d1.Stop() - d2 := createDiscoveryInstanceThatGossips(7879, "d2", []string{"localhost:7878"}, true, noopPolicy) + d2 := createDiscoveryInstanceThatGossips(7879, "d2", []string{"localhost:7878"}, true, noopPolicy, defaultTestConfig) defer d2.Stop() - d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, noopPolicy) + d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, noopPolicy, defaultTestConfig) defer d3.Stop() // Both d1 and d3 know each other, and also about d2 assertMembership(t, []*gossipInstance{d1, d3}, 2) @@ -1513,6 +1528,48 @@ func TestMembersIntersect(t *testing.T) { assert.Equal(t, Members{{PKIid: common.PKIidType("p1"), Endpoint: "p1"}}, members1.Intersect(members2)) } +func TestPeerIsolation(t *testing.T) { + t.Parallel() + + // Scenario: + // Start 3 peers (peer0, peer1, peer2). Set peer1 as the bootstrap peer for all. + // Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership + + config := defaultTestConfig + // Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test. + config.AliveExpirationTimeout = 2 * config.AliveTimeInterval + + peersNum := 3 + bootPeers := []string{bootPeer(7121)} + instances := []*gossipInstance{} + var inst *gossipInstance + + // Start all peers and wait for full membership + for i := 0; i < peersNum; i++ { + id := fmt.Sprintf("d%d", i) + inst = createDiscoveryInstanceCustomConfig(7120+i, id, bootPeers, config) + instances = append(instances, inst) + } + assertMembership(t, instances, peersNum-1) + + // Stop the first 2 peers so the third peer would stay alone + stopInstances(t, instances[:peersNum-1]) + assertMembership(t, instances[peersNum-1:], 0) + + // Sleep the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL) + // Add a second as buffer + time.Sleep(config.AliveExpirationTimeout*msgExpirationFactor + time.Second) + + // Start again the first 2 peers and wait for all the peers to get full membership. + // Especially, we want to test that peer2 won't be isolated + for i := 0; i < peersNum-1; i++ { + id := fmt.Sprintf("d%d", i) + inst = createDiscoveryInstanceCustomConfig(7120+i, id, bootPeers, config) + instances[i] = inst + } + assertMembership(t, instances, peersNum-1) +} + func waitUntilOrFail(t *testing.T, pred func() bool) { waitUntilTimeoutOrFail(t, pred, timeout) } diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 25da402df1f..8c4e35ac015 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -125,6 +125,7 @@ func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor, AliveExpirationTimeout: conf.AliveExpirationTimeout, AliveExpirationCheckInterval: conf.AliveExpirationCheckInterval, ReconnectInterval: conf.ReconnectInterval, + BootstrapPeers: conf.BootstrapPeers, } g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy, discoveryConfig)