Skip to content

Commit

Permalink
Merge "[FAB-15840] Fix peer isolation after a long disconnect" into r…
Browse files Browse the repository at this point in the history
…elease-1.4
  • Loading branch information
denyeart authored and Gerrit Code Review committed Jul 16, 2019
2 parents 34b6107 + af4a0ae commit 8049c6f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 26 deletions.
15 changes: 14 additions & 1 deletion gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@ type gossipDiscoveryImpl struct {
aliveExpirationTimeout time.Duration
aliveExpirationCheckInterval time.Duration
reconnectInterval time.Duration

bootstrapPeers []string
}

type DiscoveryConfig struct {
AliveTimeInterval time.Duration
AliveExpirationTimeout time.Duration
AliveExpirationCheckInterval time.Duration
ReconnectInterval time.Duration
BootstrapPeers []string
}

// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
Expand Down Expand Up @@ -109,6 +112,8 @@ func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoServi
aliveExpirationTimeout: config.AliveExpirationTimeout,
aliveExpirationCheckInterval: config.AliveExpirationCheckInterval,
reconnectInterval: config.ReconnectInterval,

bootstrapPeers: config.BootstrapPeers,
}

d.validateSelfConfig()
Expand Down Expand Up @@ -1021,7 +1026,15 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
if !msg.IsAliveMsg() {
return
}
id := msg.GetAliveMsg().Membership.PkiId
membership := msg.GetAliveMsg().Membership
id := membership.PkiId
endpoint := membership.Endpoint
internalEndpoint := msg.SecretEnvelope.InternalEndpoint()
if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) {
// Never remove a bootstrap peer
return
}
d.logger.Infof("Removing member: Endpoint: %s, InternalEndpoint: %s, PKIID: %x", endpoint, internalEndpoint, id)
d.aliveMembership.Remove(id)
d.deadMembership.Remove(id)
delete(d.id2Member, string(id))
Expand Down
107 changes: 82 additions & 25 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

var timeout = time.Second * time.Duration(15)

var aliveTimeInterval = time.Duration(time.Millisecond * 300)
var config = DiscoveryConfig{
var defaultTestConfig = DiscoveryConfig{
AliveTimeInterval: aliveTimeInterval,
AliveExpirationTimeout: 10 * aliveTimeInterval,
AliveExpirationCheckInterval: aliveTimeInterval,
Expand Down Expand Up @@ -84,6 +85,7 @@ type dummyCommModule struct {
incMsgs chan proto.ReceivedMessage
lastSeqs map[string]uint64
shouldGossip bool
disableComm bool
mock *mock.Mock
}

Expand Down Expand Up @@ -134,7 +136,7 @@ func (comm *dummyCommModule) SignMessage(am *proto.GossipMessage, internalEndpoi
}

func (comm *dummyCommModule) Gossip(msg *proto.SignedGossipMessage) {
if !comm.shouldGossip {
if !comm.shouldGossip || comm.disableComm {
return
}
comm.lock.Lock()
Expand All @@ -145,7 +147,7 @@ func (comm *dummyCommModule) Gossip(msg *proto.SignedGossipMessage) {
}

func (comm *dummyCommModule) Forward(msg proto.ReceivedMessage) {
if !comm.shouldGossip {
if !comm.shouldGossip || comm.disableComm {
return
}
comm.lock.Lock()
Expand All @@ -156,6 +158,9 @@ func (comm *dummyCommModule) Forward(msg proto.ReceivedMessage) {
}

func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.SignedGossipMessage) {
if comm.disableComm {
return
}
comm.lock.RLock()
_, exists := comm.streams[peer.Endpoint]
mock := comm.mock
Expand All @@ -179,6 +184,9 @@ func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *proto.SignedGo
}

func (comm *dummyCommModule) Ping(peer *NetworkMember) bool {
if comm.disableComm {
return false
}
comm.lock.Lock()
defer comm.lock.Unlock()

Expand All @@ -187,7 +195,8 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool {
}

_, alreadyExists := comm.streams[peer.Endpoint]
if !alreadyExists {
conn := comm.conns[peer.Endpoint]
if !alreadyExists || conn.GetState() == connectivity.Shutdown {
newConn, err := grpc.Dial(peer.Endpoint, grpc.WithInsecure())
if err != nil {
return false
Expand All @@ -199,7 +208,6 @@ func (comm *dummyCommModule) Ping(peer *NetworkMember) bool {
}
return false
}
conn := comm.conns[peer.Endpoint]
if _, err := proto.NewGossipClient(conn).Ping(context.Background(), &proto.Empty{}); err != nil {
return false
}
Expand Down Expand Up @@ -341,22 +349,26 @@ var noopPolicy = func(remotePeer *NetworkMember) (Sieve, EnvelopeFilter) {
}

func createDiscoveryInstance(port int, id string, bootstrapPeers []string) *gossipInstance {
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true, noopPolicy)
return createDiscoveryInstanceCustomConfig(port, id, bootstrapPeers, defaultTestConfig)
}

func createDiscoveryInstanceCustomConfig(port int, id string, bootstrapPeers []string, config DiscoveryConfig) *gossipInstance {
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, true, noopPolicy, config)
}

func createDiscoveryInstanceWithNoGossip(port int, id string, bootstrapPeers []string) *gossipInstance {
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, noopPolicy)
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, noopPolicy, defaultTestConfig)
}

func createDiscoveryInstanceWithNoGossipWithDisclosurePolicy(port int, id string, bootstrapPeers []string, pol DisclosurePolicy) *gossipInstance {
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, pol)
return createDiscoveryInstanceThatGossips(port, id, bootstrapPeers, false, pol, defaultTestConfig)
}

func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy) *gossipInstance {
return createDiscoveryInstanceThatGossipsWithInterceptors(port, id, bootstrapPeers, shouldGossip, pol, func(_ *proto.SignedGossipMessage) {})
func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, config DiscoveryConfig) *gossipInstance {
return createDiscoveryInstanceThatGossipsWithInterceptors(port, id, bootstrapPeers, shouldGossip, pol, func(_ *proto.SignedGossipMessage) {}, config)
}

func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*proto.SignedGossipMessage)) *gossipInstance {
func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*proto.SignedGossipMessage), config DiscoveryConfig) *gossipInstance {
comm := &dummyCommModule{
conns: make(map[string]*grpc.ClientConn),
streams: make(map[string]proto.Gossip_GossipStreamClient),
Expand All @@ -367,6 +379,7 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo
lock: &sync.RWMutex{},
lastSeqs: make(map[string]uint64),
shouldGossip: shouldGossip,
disableComm: false,
}

endpoint := fmt.Sprintf("localhost:%d", port)
Expand All @@ -384,6 +397,7 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo
}
s := grpc.NewServer()

config.BootstrapPeers = bootstrapPeers
discSvc := NewDiscoveryService(self, comm, comm, pol, config)
for _, bootPeer := range bootstrapPeers {
bp := bootPeer
Expand Down Expand Up @@ -581,7 +595,7 @@ func TestValidation(t *testing.T) {
// p1 sends a (membership) request to p3, and receives a (membership) response back.
// p2 sends a (membership) request to p1.
// Therefore, p1 receives both a membership request and a response.
p1 := createDiscoveryInstanceThatGossipsWithInterceptors(4675, "p1", []string{bootPeer(4677)}, true, noopPolicy, interceptor)
p1 := createDiscoveryInstanceThatGossipsWithInterceptors(4675, "p1", []string{bootPeer(4677)}, true, noopPolicy, interceptor, defaultTestConfig)
p2 := createDiscoveryInstance(4676, "p2", []string{bootPeer(4675)})
p3 := createDiscoveryInstance(4677, "p3", nil)
instances := []*gossipInstance{p1, p2, p3}
Expand Down Expand Up @@ -775,12 +789,12 @@ func TestInitiateSync(t *testing.T) {
if atomic.LoadInt32(&toDie) == int32(1) {
return
}
time.Sleep(config.AliveExpirationTimeout / 3)
time.Sleep(defaultTestConfig.AliveExpirationTimeout / 3)
inst.InitiateSync(9)
}
}()
}
time.Sleep(config.AliveExpirationTimeout * 4)
time.Sleep(defaultTestConfig.AliveExpirationTimeout * 4)
assertMembership(t, instances, nodeNum-1)
atomic.StoreInt32(&toDie, int32(1))
stopInstances(t, instances)
Expand Down Expand Up @@ -1041,7 +1055,7 @@ func createDisjointPeerGroupsWithNoGossip(bootPeerMap map[int]int) ([]*gossipIns
bootPeers := []string{bootPeer(bootPeerMap[port])}
pol := discPolForPeer(port)
inst := createDiscoveryInstanceWithNoGossipWithDisclosurePolicy(8610+group*5+i, id, bootPeers, pol)
inst.initiateSync(config.AliveExpirationTimeout/3, 10)
inst.initiateSync(defaultTestConfig.AliveExpirationTimeout/3, 10)
if group == 0 {
instances1 = append(instances1, inst)
} else {
Expand Down Expand Up @@ -1150,7 +1164,7 @@ func TestMsgStoreExpiration(t *testing.T) {
return true
}

waitUntilTimeoutOrFail(t, checkMessages, config.AliveExpirationTimeout*(msgExpirationFactor+5))
waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(msgExpirationFactor+5))

assertMembership(t, instances[:len(instances)-2], nodeNum-3)

Expand Down Expand Up @@ -1182,6 +1196,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
for i := 0; i < peersNum; i++ {
id := fmt.Sprintf("d%d", i)
inst := createDiscoveryInstanceWithNoGossip(22610+i, id, bootPeers)
inst.comm.disableComm = true
instances = append(instances, inst)
}

Expand Down Expand Up @@ -1302,11 +1317,11 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
}

// Sleep until expire
time.Sleep(config.AliveExpirationTimeout * (msgExpirationFactor + 5))
time.Sleep(defaultTestConfig.AliveExpirationTimeout * (msgExpirationFactor + 5))

// Checking Alive expired
for i := 0; i < peersNum; i++ {
checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step3 - expiration in msg store]")
checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step 3 - expiration in msg store]")
}

// Processing old MembershipRequest
Expand All @@ -1327,7 +1342,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {

// MembershipRequest processing didn't change anything
for i := 0; i < peersNum; i++ {
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step4 - memReq processing after expiration]")
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 4 - memReq processing after expiration]")
}

// Processing old (later) Alive messages
Expand All @@ -1344,8 +1359,8 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {

// Alive msg processing didn't change anything
for i := 0; i < peersNum; i++ {
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step5.1 - after lost old aliveMsg process]")
checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step5.2 - after lost new aliveMsg process]")
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 5.1 - after lost old aliveMsg process]")
checkAliveMsgNotExist(instances, newAliveMsgs, i, "[Step 5.2 - after lost new aliveMsg process]")
}

// Handling old MembershipResponse messages
Expand All @@ -1370,7 +1385,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {

// MembershipResponse msg processing didn't change anything
for i := 0; i < peersNum; i++ {
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step6 - after lost MembershipResp process]")
checkAliveMsgNotExist(instances, aliveMsgs, i, "[Step 6 - after lost MembershipResp process]")
}

for i := 0; i < peersNum; i++ {
Expand Down Expand Up @@ -1437,11 +1452,11 @@ func TestMemRespDisclosurePol(t *testing.T) {
return m.Envelope
}
}
d1 := createDiscoveryInstanceThatGossips(7878, "d1", []string{}, true, pol)
d1 := createDiscoveryInstanceThatGossips(7878, "d1", []string{}, true, pol, defaultTestConfig)
defer d1.Stop()
d2 := createDiscoveryInstanceThatGossips(7879, "d2", []string{"localhost:7878"}, true, noopPolicy)
d2 := createDiscoveryInstanceThatGossips(7879, "d2", []string{"localhost:7878"}, true, noopPolicy, defaultTestConfig)
defer d2.Stop()
d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, noopPolicy)
d3 := createDiscoveryInstanceThatGossips(7880, "d3", []string{"localhost:7878"}, true, noopPolicy, defaultTestConfig)
defer d3.Stop()
// Both d1 and d3 know each other, and also about d2
assertMembership(t, []*gossipInstance{d1, d3}, 2)
Expand Down Expand Up @@ -1513,6 +1528,48 @@ func TestMembersIntersect(t *testing.T) {
assert.Equal(t, Members{{PKIid: common.PKIidType("p1"), Endpoint: "p1"}}, members1.Intersect(members2))
}

func TestPeerIsolation(t *testing.T) {
t.Parallel()

// Scenario:
// Start 3 peers (peer0, peer1, peer2). Set peer1 as the bootstrap peer for all.
// Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership

config := defaultTestConfig
// Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test.
config.AliveExpirationTimeout = 2 * config.AliveTimeInterval

peersNum := 3
bootPeers := []string{bootPeer(7121)}
instances := []*gossipInstance{}
var inst *gossipInstance

// Start all peers and wait for full membership
for i := 0; i < peersNum; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstanceCustomConfig(7120+i, id, bootPeers, config)
instances = append(instances, inst)
}
assertMembership(t, instances, peersNum-1)

// Stop the first 2 peers so the third peer would stay alone
stopInstances(t, instances[:peersNum-1])
assertMembership(t, instances[peersNum-1:], 0)

// Sleep the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL)
// Add a second as buffer
time.Sleep(config.AliveExpirationTimeout*msgExpirationFactor + time.Second)

// Start again the first 2 peers and wait for all the peers to get full membership.
// Especially, we want to test that peer2 won't be isolated
for i := 0; i < peersNum-1; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstanceCustomConfig(7120+i, id, bootPeers, config)
instances[i] = inst
}
assertMembership(t, instances, peersNum-1)
}

func waitUntilOrFail(t *testing.T, pred func() bool) {
waitUntilTimeoutOrFail(t, pred, timeout)
}
Expand Down
1 change: 1 addition & 0 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor,
AliveExpirationTimeout: conf.AliveExpirationTimeout,
AliveExpirationCheckInterval: conf.AliveExpirationCheckInterval,
ReconnectInterval: conf.ReconnectInterval,
BootstrapPeers: conf.BootstrapPeers,
}
g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy,
discoveryConfig)
Expand Down

0 comments on commit 8049c6f

Please sign in to comment.