Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FAB-17539] Always remember anchor peers in membership (bp #1422) #1815

Merged
merged 1 commit into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions gossip/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ type CommService interface {
IdentitySwitch() <-chan common.PKIidType
}

// AnchorPeerTracker is an interface that is passed to discovery to check if an endpoint is an anchor peer
type AnchorPeerTracker interface {
IsAnchorPeer(endpoint string) bool
}

// NetworkMember is a peer's representation
type NetworkMember struct {
Endpoint string
Expand Down
37 changes: 20 additions & 17 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,8 @@ const DefAliveTimeInterval = 5 * time.Second
const DefAliveExpirationTimeout = 5 * DefAliveTimeInterval
const DefAliveExpirationCheckInterval = DefAliveExpirationTimeout / 10
const DefReconnectInterval = DefAliveExpirationTimeout
const msgExpirationFactor = 20

var maxConnectionAttempts = 120

// SetMaxConnAttempts sets the maximum number of connection
// attempts the peer would perform when invoking Connect()
func SetMaxConnAttempts(attempts int) {
maxConnectionAttempts = attempts
}
const DefMsgExpirationFactor = 20
const DefMaxConnectionAttempts = 120

type timestamp struct {
incTime time.Time
Expand Down Expand Up @@ -75,21 +68,26 @@ type gossipDiscoveryImpl struct {
aliveExpirationTimeout time.Duration
aliveExpirationCheckInterval time.Duration
reconnectInterval time.Duration
msgExpirationFactor int
maxConnectionAttempts int

bootstrapPeers []string
bootstrapPeers []string
anchorPeerTracker AnchorPeerTracker
}

type DiscoveryConfig struct {
AliveTimeInterval time.Duration
AliveExpirationTimeout time.Duration
AliveExpirationCheckInterval time.Duration
ReconnectInterval time.Duration
MaxConnectionAttempts int
MsgExpirationFactor int
BootstrapPeers []string
}

// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy,
config DiscoveryConfig) Discovery {
config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) Discovery {
d := &gossipDiscoveryImpl{
self: self,
incTime: uint64(time.Now().UnixNano()),
Expand All @@ -112,8 +110,11 @@ func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoServi
aliveExpirationTimeout: config.AliveExpirationTimeout,
aliveExpirationCheckInterval: config.AliveExpirationCheckInterval,
reconnectInterval: config.ReconnectInterval,
maxConnectionAttempts: config.MaxConnectionAttempts,
msgExpirationFactor: config.MsgExpirationFactor,

bootstrapPeers: config.BootstrapPeers,
bootstrapPeers: config.BootstrapPeers,
anchorPeerTracker: anchorPeerTracker,
}

d.validateSelfConfig()
Expand Down Expand Up @@ -149,7 +150,7 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) {
d.logger.Debug("Entering", member)
defer d.logger.Debug("Exiting")
go func() {
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ {
id, err := id()
if err != nil {
if d.toDie() {
Expand Down Expand Up @@ -216,7 +217,7 @@ func (d *gossipDiscoveryImpl) validateSelfConfig() {

func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *proto.SignedGossipMessage) {
nonce := message.Nonce
for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
for i := 0; i < d.maxConnectionAttempts && !d.toDie(); i++ {
sub := d.pubsub.Subscribe(fmt.Sprintf("%d", nonce), time.Second*5)
d.comm.SendToPeer(peer, message)
if _, timeoutErr := sub.Listen(); timeoutErr == nil {
Expand Down Expand Up @@ -1032,7 +1033,7 @@ type aliveMsgStore struct {
func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := d.aliveExpirationTimeout * msgExpirationFactor
aliveMsgTTL := d.aliveExpirationTimeout * time.Duration(d.msgExpirationFactor)
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
Expand All @@ -1044,8 +1045,10 @@ func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
id := membership.PkiId
endpoint := membership.Endpoint
internalEndpoint := msg.SecretEnvelope.InternalEndpoint()
if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) {
// Never remove a bootstrap peer
if util.Contains(endpoint, d.bootstrapPeers) || util.Contains(internalEndpoint, d.bootstrapPeers) ||
d.anchorPeerTracker.IsAnchorPeer(endpoint) || d.anchorPeerTracker.IsAnchorPeer(internalEndpoint) {
// Never remove a bootstrap peer or an anchor peer
d.logger.Debugf("Do not remove bootstrap or anchor peer endpoint %s from membership", endpoint)
return
}
d.logger.Infof("Removing member: Endpoint: %s, InternalEndpoint: %s, PKIID: %x", endpoint, internalEndpoint, id)
Expand Down
133 changes: 128 additions & 5 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ var defaultTestConfig = DiscoveryConfig{
AliveExpirationTimeout: 10 * aliveTimeInterval,
AliveExpirationCheckInterval: aliveTimeInterval,
ReconnectInterval: 10 * aliveTimeInterval,
MaxConnectionAttempts: DefMaxConnectionAttempts,
MsgExpirationFactor: DefMsgExpirationFactor,
}

func init() {
util.SetupTestLogging()
maxConnectionAttempts = 10000
defaultTestConfig.MaxConnectionAttempts = 10000
}

type dummyReceivedMessage struct {
Expand Down Expand Up @@ -75,6 +77,15 @@ func (*dummyReceivedMessage) Ack(err error) {
panic("implement me")
}

// mockAnchorPeerTracker implements AnchorPeerTracker interface
type mockAnchorPeerTracker struct {
apEndpoints []string
}

func (m *mockAnchorPeerTracker) IsAnchorPeer(endpoint string) bool {
return util.Contains(endpoint, m.apEndpoints)
}

type dummyCommModule struct {
validatedMessages chan *proto.SignedGossipMessage
msgsReceived uint32
Expand Down Expand Up @@ -377,6 +388,12 @@ func createDiscoveryInstanceThatGossips(port int, id string, bootstrapPeers []st
}

func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy, f func(*proto.SignedGossipMessage), config DiscoveryConfig) *gossipInstance {
mockTracker := &mockAnchorPeerTracker{}
return createDiscoveryInstanceWithAnchorPeerTracker(port, id, bootstrapPeers, shouldGossip, pol, f, config, mockTracker)
}

func createDiscoveryInstanceWithAnchorPeerTracker(port int, id string, bootstrapPeers []string, shouldGossip bool, pol DisclosurePolicy,
f func(*proto.SignedGossipMessage), config DiscoveryConfig, anchorPeerTracker AnchorPeerTracker) *gossipInstance {
comm := &dummyCommModule{
conns: make(map[string]*grpc.ClientConn),
streams: make(map[string]proto.Gossip_GossipStreamClient),
Expand Down Expand Up @@ -407,7 +424,8 @@ func createDiscoveryInstanceThatGossipsWithInterceptors(port int, id string, boo
s := grpc.NewServer()

config.BootstrapPeers = bootstrapPeers
discSvc := NewDiscoveryService(self, comm, comm, pol, config)

discSvc := NewDiscoveryService(self, comm, comm, pol, config, anchorPeerTracker)
for _, bootPeer := range bootstrapPeers {
bp := bootPeer
discSvc.Connect(NetworkMember{Endpoint: bp, InternalEndpoint: bootPeer}, func() (*PeerIdentification, error) {
Expand Down Expand Up @@ -1127,6 +1145,7 @@ func TestExpirationNoSecretEnvelope(t *testing.T) {
aliveMembership: util.NewMembershipStore(),
deadMembership: util.NewMembershipStore(),
logger: logger,
anchorPeerTracker: &mockAnchorPeerTracker{[]string{}},
})

msg := &proto.GossipMessage{
Expand Down Expand Up @@ -1281,7 +1300,7 @@ func TestMsgStoreExpiration(t *testing.T) {
return true
}

waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(msgExpirationFactor+5))
waitUntilTimeoutOrFail(t, checkMessages, defaultTestConfig.AliveExpirationTimeout*(DefMsgExpirationFactor+5))

assertMembership(t, instances[:len(instances)-2], nodeNum-3)

Expand Down Expand Up @@ -1434,7 +1453,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
}

// Sleep until expire
time.Sleep(defaultTestConfig.AliveExpirationTimeout * (msgExpirationFactor + 5))
time.Sleep(defaultTestConfig.AliveExpirationTimeout * (DefMsgExpirationFactor + 5))

// Checking Alive expired
for i := 0; i < peersNum; i++ {
Expand Down Expand Up @@ -1676,7 +1695,7 @@ func TestPeerIsolation(t *testing.T) {

// Sleep the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL)
// Add a second as buffer
time.Sleep(config.AliveExpirationTimeout*msgExpirationFactor + time.Second)
time.Sleep(config.AliveExpirationTimeout*DefMsgExpirationFactor + time.Second)

// Start again the first 2 peers and wait for all the peers to get full membership.
// Especially, we want to test that peer2 won't be isolated
Expand All @@ -1688,6 +1707,110 @@ func TestPeerIsolation(t *testing.T) {
assertMembership(t, instances, peersNum-1)
}

func TestMembershipAfterExpiration(t *testing.T) {
// Scenario:
// Start 3 peers (peer0, peer1, peer2). Set peer0 as the anchor peer.
// Stop peer0 and peer1 for a while, start them again and test if peer2 still gets full membership

config := defaultTestConfig
// Use a smaller AliveExpirationTimeout than the default to reduce the running time of the test.
config.AliveExpirationTimeout = 2 * config.AliveTimeInterval
config.ReconnectInterval = config.AliveExpirationTimeout
config.MsgExpirationFactor = 5

peersNum := 3
ports := []int{9120, 9121, 9122}
anchorPeer := "localhost:9120"
bootPeers := []string{}
instances := []*gossipInstance{}
var inst *gossipInstance
mockTracker := &mockAnchorPeerTracker{[]string{anchorPeer}}

// use a custom logger to verify messages from expiration callback
expectedMsgs := []string{
"Do not remove bootstrap or anchor peer endpoint localhost:9120 from membership",
"Removing member: Endpoint: localhost:9121",
}
numMsgsFound := 0
l, err := zap.NewDevelopment()
assert.NoError(t, err)
expired := make(chan struct{})
logger := flogging.NewFabricLogger(l, zap.Hooks(func(entry zapcore.Entry) error {
// do nothing if we already found all the expectedMsgs
if numMsgsFound == len(expectedMsgs) {
return nil
}
for _, msg := range expectedMsgs {
if strings.Contains(entry.Message, msg) {
numMsgsFound++
if numMsgsFound == len(expectedMsgs) {
expired <- struct{}{}
}
break
}
}
return nil
}))

// Start all peers, connect to the anchor peer and verify full membership
for i := 0; i < peersNum; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *proto.SignedGossipMessage) {}, config, mockTracker)
instances = append(instances, inst)
}
instances[peersNum-1].Discovery.(*gossipDiscoveryImpl).logger = logger
for i := 1; i < peersNum; i++ {
connect(instances[i], anchorPeer)
}
assertMembership(t, instances, peersNum-1)

// Stop peer0 and peer1 so that peer2 would stay alone
stopInstances(t, instances[0:peersNum-1])

// waitTime is the same amount of time as it takes to remove a message from the aliveMsgStore (aliveMsgTTL)
// Add a second as buffer
waitTime := config.AliveExpirationTimeout*time.Duration(config.MsgExpirationFactor) + time.Second
select {
case <-expired:
case <-time.After(waitTime):
t.Fatalf("timed out")
}
// peer2's deadMembership should contain the anchor peer
deadMemeberShip := instances[peersNum-1].discoveryImpl().deadMembership
assert.Equal(t, 1, deadMemeberShip.Size())
assertMembership(t, instances[peersNum-1:], 0)

// Start again peer0 and peer1 and wait for all the peers to get full membership.
// Especially, we want to test that peer2 won't be isolated
for i := 0; i < peersNum-1; i++ {
id := fmt.Sprintf("d%d", i)
inst = createDiscoveryInstanceWithAnchorPeerTracker(ports[i], id, bootPeers, true, noopPolicy, func(_ *proto.SignedGossipMessage) {}, config, mockTracker)
instances[i] = inst
}
connect(instances[1], anchorPeer)
assertMembership(t, instances, peersNum-1)
}

func connect(inst *gossipInstance, endpoint string) {
inst.comm.lock.Lock()
inst.comm.mock = &mock.Mock{}
inst.comm.mock.On("SendToPeer", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
inst := inst
msg := arguments.Get(1).(*proto.SignedGossipMessage)
if req := msg.GetMemReq(); req != nil {
inst.comm.lock.Lock()
inst.comm.mock = nil
inst.comm.lock.Unlock()
}
})
inst.comm.mock.On("Ping", mock.Anything)
inst.comm.lock.Unlock()
netMember2Connect2 := NetworkMember{Endpoint: endpoint, PKIid: []byte(endpoint)}
inst.Connect(netMember2Connect2, func() (identification *PeerIdentification, err error) {
return &PeerIdentification{SelfOrg: true, ID: nil}, nil
})
}

func waitUntilOrFail(t *testing.T, pred func() bool) {
waitUntilTimeoutOrFail(t, pred, timeout)
}
Expand Down
3 changes: 2 additions & 1 deletion gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,6 @@ type Config struct {
AliveExpirationTimeout time.Duration // Alive expiration timeout
AliveExpirationCheckInterval time.Duration // Alive expiration check interval
ReconnectInterval time.Duration // Reconnect interval

MsgExpirationFactor int // MsgExpirationFactor is the expiration factor for alive message TTL
MaxConnectionAttempts int // MaxConnectionAttempts is the max number of attempts to connect to a peer (wait for alive ack)
}
7 changes: 5 additions & 2 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type gossipServiceImpl struct {
// NewGossipService creates a gossip instance attached to a gRPC server
func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor,
mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics) Gossip {
secureDialOpts api.PeerSecureDialOpts, gossipMetrics *metrics.GossipMetrics,
anchorPeerTracker discovery.AnchorPeerTracker) Gossip {
var err error

lgr := util.GetLogger(util.GossipLogger, conf.ID)
Expand Down Expand Up @@ -125,10 +126,12 @@ func NewGossipService(conf *Config, s *grpc.Server, sa api.SecurityAdvisor,
AliveExpirationTimeout: conf.AliveExpirationTimeout,
AliveExpirationCheckInterval: conf.AliveExpirationCheckInterval,
ReconnectInterval: conf.ReconnectInterval,
MaxConnectionAttempts: conf.MaxConnectionAttempts,
MsgExpirationFactor: conf.MsgExpirationFactor,
BootstrapPeers: conf.BootstrapPeers,
}
g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy,
discoveryConfig)
discoveryConfig, anchorPeerTracker)
g.logger.Infof("Creating gossip service with self membership of %s", g.selfNetworkMember())

g.certPuller = g.createCertStorePuller()
Expand Down
11 changes: 8 additions & 3 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ var tests = []func(t *testing.T){
func init() {
util.SetupTestLogging()
rand.Seed(int64(time.Now().Second()))
discovery.SetMaxConnAttempts(5)
for range tests {
testWG.Add(1)
}
Expand All @@ -75,6 +74,8 @@ var discoveryConfig = discovery.DiscoveryConfig{
AliveExpirationTimeout: 10 * aliveTimeInterval,
AliveExpirationCheckInterval: aliveTimeInterval,
ReconnectInterval: aliveTimeInterval,
MaxConnectionAttempts: 5,
MsgExpirationFactor: discovery.DefMsgExpirationFactor,
}

var expirationTimes map[string]time.Time = map[string]time.Time{}
Expand Down Expand Up @@ -263,10 +264,12 @@ func newGossipInstanceWithGrpcMcsMetrics(id int, port int, gRPCServer *corecomm.
AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout,
AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval,
ReconnectInterval: discoveryConfig.ReconnectInterval,
MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts,
MsgExpirationFactor: discoveryConfig.MsgExpirationFactor,
}
selfID := api.PeerIdentityType(conf.InternalEndpoint)
g := NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID,
secureDialOpts, metrics)
secureDialOpts, metrics, nil)
go func() {
gRPCServer.Start()
}()
Expand Down Expand Up @@ -313,10 +316,12 @@ func newGossipInstanceWithGRPCWithOnlyPull(id int, port int, gRPCServer *corecom
AliveExpirationTimeout: discoveryConfig.AliveExpirationTimeout,
AliveExpirationCheckInterval: discoveryConfig.AliveExpirationCheckInterval,
ReconnectInterval: discoveryConfig.ReconnectInterval,
MaxConnectionAttempts: discoveryConfig.MaxConnectionAttempts,
MsgExpirationFactor: discoveryConfig.MsgExpirationFactor,
}
selfID := api.PeerIdentityType(conf.InternalEndpoint)
g := NewGossipService(conf, gRPCServer.Server(), &orgCryptoService{}, mcs, selfID,
secureDialOpts, metrics)
secureDialOpts, metrics, nil)
go func() {
gRPCServer.Start()
}()
Expand Down
Loading