Skip to content

Commit

Permalink
[FAB-17539] Always remember anchor peers in membership (#1422)
Browse files Browse the repository at this point in the history
Gossip service removes a peer from its membership if the peer's alive message is expired.
However, it should always remember the anchor peers and bootstrap peers in order for
the peer to reconnect. Gossip already remembers bootstrap peers. This PR adds code to track
all anchor peers' endpoints and updates the expiration callback function to not delete
anchor peers.

Signed-off-by: Wenjian Qiao <wenjianq@gmail.com>
  • Loading branch information
wenjianqiao committed Aug 27, 2020
1 parent a19c9ec commit d73f193
Show file tree
Hide file tree
Showing 19 changed files with 535 additions and 72 deletions.
5 changes: 5 additions & 0 deletions gossip/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ type CommService interface {
IdentitySwitch() <-chan common.PKIidType
}

// AnchorPeerTracker is an interface that is passed to discovery to check if an endpoint is an anchor peer
type AnchorPeerTracker interface {
IsAnchorPeer(endpoint string) bool
}

// NetworkMember is a peer's representation
type NetworkMember struct {
Endpoint string
Expand Down
37 changes: 20 additions & 17 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,8 @@ const DefAliveTimeInterval = 5 * time.Second
const DefAliveExpirationTimeout = 5 * DefAliveTimeInterval
const DefAliveExpirationCheckInterval = DefAliveExpirationTimeout / 10
const DefReconnectInterval = DefAliveExpirationTimeout
const msgExpirationFactor = 20

var maxConnectionAttempts = 120

// SetMaxConnAttempts sets the maximum number of connection
// attempts the peer would perform when invoking Connect()
func SetMaxConnAttempts(attempts int) {
maxConnectionAttempts = attempts
}
const DefMsgExpirationFactor = 20
const DefMaxConnectionAttempts = 120

type timestamp struct {
incTime time.Time
Expand Down Expand Up @@ -75,21 +68,26 @@ type gossipDiscoveryImpl struct {
aliveExpirationTimeout time.Duration
aliveExpirationCheckInterval time.Duration
reconnectInterval time.Duration
msgExpirationFactor int
maxConnectionAttempts int

bootstrapPeers []string
bootstrapPeers []string
anchorPeerTracker AnchorPeerTracker
}

type DiscoveryConfig struct {
AliveTimeInterval time.Duration
AliveExpirationTimeout time.Duration
AliveExpirationCheckInterval time.Duration
ReconnectInterval time.Duration
MaxConnectionAttempts int
MsgExpirationFactor int
BootstrapPeers []string
}

// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy,
config DiscoveryConfig) Discovery {
config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) Discovery {
d := &gossipDiscoveryImpl{
self: self,
incTime: uint64(time.Now().UnixNano()),
Expand All @@ -112,8 +110,11 @@ func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoServi
aliveExpirationTimeout: config.AliveExpirationTimeout,
aliveExpirationCheckInterval: config.AliveExpirationCheckInterval,
reconnectInterval: config.ReconnectInterval,
maxConnectionAttempts: config.MaxConnectionAttempts,
msgExpirationFactor: config.MsgExpirationFactor,

bootstrapPeers: config.BootstrapPeers,
bootstrapPeers: config.BootstrapPeers,
anchorPeerTracker: anchorPeerTracker,
}

d.validateSelfConfig()
Expand Down Expand Up @@ -149,7 +150,7 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) {
d.logger.Debug("Entering", member)
defer d.logger.Debug("Exiting")
go func() {
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ {
id, err := id()
if err != nil {
if d.toDie() {
Expand Down Expand Up @@ -216,7 +217,7 @@ func (d *gossipDiscoveryImpl) validateSelfConfig() {

func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *proto.SignedGossipMessage) {
nonce := message.Nonce
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ {
sub := d.pubsub.Subscribe(fmt.Sprintf("%d", nonce), time.Second*5)
d.comm.SendToPeer(peer, message)
if _, timeoutErr := sub.Listen(); timeoutErr == nil {
Expand Down Expand Up @@ -1032,7 +1033,7 @@ type aliveMsgStore struct {
func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := d.aliveExpirationTimeout * msgExpirationFactor
aliveMsgTTL := d.aliveExpirationTimeout * time.Duration(d.msgExpirationFactor)
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
Expand All @@ -1044,8 +1045,10 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
id := membership.PkiId
endpoint := membership.Endpoint
internalEndpoint := msg.SecretEnvelope.InternalEndpoint()
if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) {
// Never remove a bootstrap peer
if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) ||
d.anchorPeerTracker.IsAnchorPeer(endpoint) || d.anchorPeerTracker.IsAnchorPeer(internalEndpoint) {
// Never remove a bootstrap peer or an anchor peer
d.logger.Debugf("Do not remove bootstrap or anchor peer endpoint %s from membership", endpoint)
return
}
d.logger.Infof("Removing member: Endpoint: %s, InternalEndpoint: %s, PKIID: %x", endpoint, internalEndpoint, id)
Expand Down
133 changes: 128 additions & 5 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ var defaultTestConfig = DiscoveryConfig{
AliveExpirationTimeout: 10 * aliveTimeInterval,
AliveExpirationCheckInterval: aliveTimeInterval,
ReconnectInterval: 10 * aliveTimeInterval,
MaxConnectionAttempts: DefMaxConnectionAttempts,
MsgExpirationFactor: DefMsgExpirationFactor,
}

func init() {
util.SetupTestLogging()
maxConnectionAttempts = 10000
defaultTestConfig.MaxConnectionAttempts = 10000
}

type dummyReceivedMessage struct {
Expand Down Expand Up @@ -75,6 +77,15 @@ func (*dummyReceivedMessage) Ack(err error) {
panic("implement me")
}

// mockAnchorPeerTracker implements AnchorPeerTracker interface
type mockAnchorPeerTracker struct {
apEndpoints []string
}

func (m *mockAnchorPeerTracker) IsAnchorPeer(endpoint string) bool {
return util.Contains(endpoint, m.apEndpoints)
}

type dummyCommModule struct {
validatedMessages chan *proto.SignedGossipMessage
msgsReceived uint32
Expand Down Expand Up @@ -377,6 +388,12 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st
}

func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*proto.SignedGossipMessage), config DiscoveryConfig) *gossipInstance {
mockTracker := &mockAnchorPeerTracker{}
return createDiscoveryInstanceWithAnchorPeerTracker(port, id, bootstrapPeers, shouldGossip, pol, f, config, mockTracker)
}

func createDiscoveryInstanceWithAnchorPeerTracker(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy,
f func(*proto.SignedGossipMessage), config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) *gossipInstance {
comm := &dummyCommModule{
conns: make(map[string]*grpc.ClientConn),
streams: make(map[string]proto.Gossip_GossipStreamClient),
Expand Down Expand Up @@ -407,7 +424,8 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo
s := grpc.NewServer()

config.BootstrapPeers = bootstrapPeers
discSvc := NewDiscoveryService(self, comm, comm, pol, config)

discSvc := NewDiscoveryService(self, comm, comm, pol, config, anchorPeerTracker)
for _, bootPeer := range bootstrapPeers {
bp := bootPeer
discSvc.Connect(NetworkMember{Endpoint: bp, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) {
Expand Down Expand Up @@ -1127,6 +1145,7 @@ func TestExpirationNoSecretEnvelope(t *testing.T) {
aliveMembership: util.NewMembershipStore(),
deadMembership: util.NewMembershipStore(),
logger: logger,
anchorPeerTracker: &mockAnchorPeerTracker{[]string{}},
})

msg := &proto.GossipMessage{
Expand Down Expand Up @@ -1281,7 +1300,7 @@ func TestMsgStoreExpiration(t *testing.T) {
return true
}

waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(msgExpirationFactor+5))
waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(DefMsgExpirationFactor+5))

assertMembership(t, instances[:len(instances)-2], nodeNum-3)

Expand Down Expand Up @@ -1434,7 +1453,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
}

// Sleep until expire
time.Sleep(defaultTestConfig.AliveExpirationTimeout * (msgExpirationFactor + 5))
time.Sleep(defaultTestConfig.AliveExpirationTimeout * (DefMsgExpirationFactor + 5))

// Checking Alive expired
for i := 0; i < peersNum; i++ {
Expand Down Expand Up @@ -1676,7 +1695,7 @@ func TestPeerIsolation(t *testing.T) {

// Sleep the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL)
// Add a second as buffer
time.Sleep(config.AliveExpirationTimeout*msgExpirationFactor + time.Second)
time.Sleep(config.AliveExpirationTimeout*DefMsgExpirationFactor + time.Second)

// Start again the first 2 peers and wait for all the peers to get full membership.
// Especially, we want to test that peer2 won't be isolated
Expand All @@ -1688,6 +1707,110 @@ func TestPeerIsolation(t *testing.T) {
assertMembership(t, instances, peersNum-1)
}

func TestMembershipAfterExpiration(t *testing.T) {
// Scenario:
// Start 3 peers (peer0, peer1, peer2). Set peer0 as the anchor peer.
// Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership

config := defaultTestConfig
// Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test.
config.AliveExpirationTimeout = 2 * config.AliveTimeInterval
config.ReconnectInterval = config.AliveExpirationTimeout
config.MsgExpirationFactor = 5

peersNum := 3
ports := []int{9120, 9121, 9122}
anchorPeer := "localhost:9120"
bootPeers := []string{}
instances := []*gossipInstance{}
var inst *gossipInstance
mockTracker := &mockAnchorPeerTracker{[]string{anchorPeer}}

// use a custom logger to verify messages from expiration callback
expectedMsgs := []string{
"Do not remove bootstrap or anchor peer endpoint localhost:9120 from membership",
"Removing member: Endpoint: localhost:9121",
}
numMsgsFound := 0
l, err := zap.NewDevelopment()
assert.NoError(t, err)
expired := make(chan struct{})
logger := flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error {
// do nothing if we already found all the expectedMsgs
if numMsgsFound == len(expectedMsgs) {
return nil
}
for _, msg := range expectedMsgs {
if strings.Contains(entry.Message, msg) {
numMsgsFound++
if numMsgsFound == len(expectedMsgs) {
expired <- struct{}{}
}
break
}
}
return nil
}))

// Start all peers, connect to the anchor peer and verify full membership
for i := 0; i < peersNum; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *proto.SignedGossipMessage) {}, config, mockTracker)
instances = append(instances, inst)
}
instances[peersNum-1].Discovery.(*gossipDiscoveryImpl).logger = logger
for i := 1; i < peersNum; i++ {
connect(instances[i], anchorPeer)
}
assertMembership(t, instances, peersNum-1)

// Stop peer0 and peer1 so that peer2 would stay alone
stopInstances(t, instances[0:peersNum-1])

// waitTime is the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL)
// Add a second as buffer
waitTime := config.AliveExpirationTimeout*time.Duration(config.MsgExpirationFactor) + time.Second
select {
case <-expired:
case <-time.After(waitTime):
t.Fatalf("timed out")
}
// peer2's deadMembership should contain the anchor peer
deadMemeberShip := instances[peersNum-1].discoveryImpl().deadMembership
assert.Equal(t, 1, deadMemeberShip.Size())
assertMembership(t, instances[peersNum-1:], 0)

// Start again peer0 and peer1 and wait for all the peers to get full membership.
// Especially, we want to test that peer2 won't be isolated
for i := 0; i < peersNum-1; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *proto.SignedGossipMessage) {}, config, mockTracker)
instances[i] = inst
}
connect(instances[1], anchorPeer)
assertMembership(t, instances, peersNum-1)
}

func connect(inst *gossipInstance, endpoint string) {
inst.comm.lock.Lock()
inst.comm.mock = &mock.Mock{}
inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
inst := inst
msg := arguments.Get(1).(*proto.SignedGossipMessage)
if req := msg.GetMemReq(); req != nil {
inst.comm.lock.Lock()
inst.comm.mock = nil
inst.comm.lock.Unlock()
}
})
inst.comm.mock.On("Ping", mock.Anything)
inst.comm.lock.Unlock()
netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)}
inst.Connect(netMember2Connect2, func() (identification *PeerIdentification, err error) {
return &PeerIdentification{SelfOrg: true, ID: nil}, nil
})
}

func waitUntilOrFail(t *testing.T, pred func() bool) {
waitUntilTimeoutOrFail(t, pred, timeout)
}
Expand Down
3 changes: 2 additions & 1 deletion gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,6 @@ type Config struct {
AliveExpirationTimeout time.Duration // Alive expiration timeout
AliveExpirationCheckInterval time.Duration // Alive expiration check interval
ReconnectInterval time.Duration // Reconnect interval

MsgExpirationFactor int // MsgExpirationFactor is the expiration factor for alive message TTL
MaxConnectionAttempts int // MaxConnectionAttempts is the max number of attempts to connect to a peer (wait for alive ack)
}
7 changes: 5 additions & 2 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type gossipServiceImpl struct {
// NewGossipService creates a gossip instance attached to a gRPC server
func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor,
mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics) Gossip {
secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics,
anchorPeerTracker discovery.AnchorPeerTracker) Gossip {
var err error

lgr := util.GetLogger(util.GossipLogger, conf.ID)
Expand Down Expand Up @@ -125,10 +126,12 @@ func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor,
AliveExpirationTimeout: conf.AliveExpirationTimeout,
AliveExpirationCheckInterval: conf.AliveExpirationCheckInterval,
ReconnectInterval: conf.ReconnectInterval,
MaxConnectionAttempts: conf.MaxConnectionAttempts,
MsgExpirationFactor: conf.MsgExpirationFactor,
BootstrapPeers: conf.BootstrapPeers,
}
g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy,
discoveryConfig)
discoveryConfig, anchorPeerTracker)
g.logger.Infof("Creating gossip service with self membership of %s", g.selfNetworkMember())

g.certPuller = g.createCertStorePuller()
Expand Down
11 changes: 8 additions & 3 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ var tests = []func(t *testing.T){
func init() {
util.SetupTestLogging()
rand.Seed(int64(time.Now().Second()))
discovery.SetMaxConnAttempts(5)
for range tests {
testWG.Add(1)
}
Expand All @@ -75,6 +74,8 @@ var discoveryConfig = discovery.DiscoveryConfig{
AliveExpirationTimeout: 10 * aliveTimeInterval,
AliveExpirationCheckInterval: aliveTimeInterval,
ReconnectInterval: aliveTimeInterval,
MaxConnectionAttempts: 5,
MsgExpirationFactor: discovery.DefMsgExpirationFactor,
}

var expirationTimes map[string]time.Time = map[string]time.Time{}
Expand Down Expand Up @@ -263,10 +264,12 @@ func newGossipInstanceWithGrpcMcsMetrics(id int, port int, gRPCServer *corecomm.
AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout,
AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval,
ReconnectInterval: discoveryConfig.ReconnectInterval,
MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts,
MsgExpirationFactor: discoveryConfig.MsgExpirationFactor,
}
selfID := api.PeerIdentityType(conf.InternalEndpoint)
g := NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID,
secureDialOpts, metrics)
secureDialOpts, metrics, nil)
go func() {
gRPCServer.Start()
}()
Expand Down Expand Up @@ -313,10 +316,12 @@ func newGossipInstanceWithGRPCWithOnlyPull(id int, port int, gRPCServer *corecom
AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout,
AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval,
ReconnectInterval: discoveryConfig.ReconnectInterval,
MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts,
MsgExpirationFactor: discoveryConfig.MsgExpirationFactor,
}
selfID := api.PeerIdentityType(conf.InternalEndpoint)
g := NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID,
secureDialOpts, metrics)
secureDialOpts, metrics, nil)
go func() {
gRPCServer.Start()
}()
Expand Down
Loading

0 comments on commit d73f193

Please sign in to comment.