Skip to content

Commit

Permalink
Merge "[FAB-3114] Gossip identity expiration"
Browse files Browse the repository at this point in the history
  • Loading branch information
denyeart authored and Gerrit Code Review committed Apr 26, 2017
2 parents 5f91834 + 55d96b2 commit f0c3bfc
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 79 deletions.
8 changes: 6 additions & 2 deletions gossip/gossip/certstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newCertStore(puller pull.Mediator, idMapper identity.Mapper, selfIdentity a
}

puller.Add(certStore.createIdentityMessage())
puller.RegisterMsgHook(pull.ResponseMsgType, func(_ []string, msgs []*proto.SignedGossipMessage, _ proto.ReceivedMessage) {
puller.RegisterMsgHook(pull.RequestMsgType, func(_ []string, msgs []*proto.SignedGossipMessage, _ proto.ReceivedMessage) {
for _, msg := range msgs {
pkiID := common.PKIidType(msg.GetPeerIdentity().PkiId)
cert := api.PeerIdentityType(msg.GetPeerIdentity().Cert)
Expand Down Expand Up @@ -144,7 +144,11 @@ func (cs *certStore) createIdentityMessage() *proto.SignedGossipMessage {
}

func (cs *certStore) listRevokedPeers(isSuspected api.PeerSuspector) []common.PKIidType {
return cs.idMapper.ListRevokedPeers(isSuspected)
revokedPeers := cs.idMapper.ListInvalidIdentities(isSuspected)
for _, pkiID := range revokedPeers {
cs.pull.Remove(string(pkiID))
}
return revokedPeers
}

func (cs *certStore) stop() {
Expand Down
200 changes: 161 additions & 39 deletions gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/gossip/algo"
"github.com/hyperledger/fabric/gossip/gossip/pull"
Expand All @@ -41,6 +42,12 @@ func init() {
algo.SetResponseWaitTime(shortenedWaitTime)
}

var (
cs = &naiveCryptoService{
revokedPkiIDS: make(map[string]struct{}),
}
)

type pullerMock struct {
mock.Mock
pull.Mediator
Expand Down Expand Up @@ -90,78 +97,135 @@ func TestCertStoreBadSignature(t *testing.T) {
badSignature := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createBadlySignedUpdateMessage())
}

testCertificateUpdate(t, badSignature, false)
pm, cs, _ := createObjects(badSignature, nil)
defer pm.Stop()
testCertificateUpdate(t, false, cs)
}

func TestCertStoreMismatchedIdentity(t *testing.T) {
mismatchedIdentity := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createMismatchedUpdateMessage())
}

testCertificateUpdate(t, mismatchedIdentity, false)
pm, cs, _ := createObjects(mismatchedIdentity, nil)
defer pm.Stop()
testCertificateUpdate(t, false, cs)
}

func TestCertStoreShouldSucceed(t *testing.T) {
totallyFineIdentity := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createValidUpdateMessage())
}

testCertificateUpdate(t, totallyFineIdentity, true)
pm, cs, _ := createObjects(totallyFineIdentity, nil)
defer pm.Stop()
testCertificateUpdate(t, true, cs)
}

func testCertificateUpdate(t *testing.T, updateFactory func(uint64) proto.ReceivedMessage, shouldSucceed bool) {
config := pull.Config{
MsgType: proto.PullMsgType_IDENTITY_MSG,
PeerCountToSelect: 1,
PullInterval: time.Millisecond * 500,
Tag: proto.GossipMessage_EMPTY,
Channel: nil,
ID: "id1",
}
sender := &senderMock{}
memberSvc := &membershipSvcMock{}
memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}})
adapter := pull.PullAdapter{
Sndr: sender,
MemSvc: memberSvc,
IdExtractor: func(msg *proto.SignedGossipMessage) string {
return string(msg.GetPeerIdentity().PkiId)
},
MsgCons: func(msg *proto.SignedGossipMessage) {
func TestCertExpiration(t *testing.T) {
identityExpCheckInterval := identityExpirationCheckInterval
defer func() {
identityExpirationCheckInterval = identityExpCheckInterval
cs.revokedPkiIDS = map[string]struct{}{}
}()

},
identityExpirationCheckInterval = time.Second

totallyFineIdentity := func(nonce uint64) proto.ReceivedMessage {
return createUpdateMessage(nonce, createValidUpdateMessage())
}
pullMediator := pull.NewPullMediator(config, adapter)
certStore := newCertStore(&pullerMock{
Mediator: pullMediator,
}, identity.NewIdentityMapper(&naiveCryptoService{}), api.PeerIdentityType("SELF"), &naiveCryptoService{})

defer pullMediator.Stop()
askedForIdentity := make(chan struct{}, 1)

pm, cStore, sender := createObjects(totallyFineIdentity, func(message *proto.SignedGossipMessage) {
askedForIdentity <- struct{}{}
})
defer pm.Stop()
testCertificateUpdate(t, true, cStore)
// Should have asked for an identity for the first time
assert.Len(t, askedForIdentity, 1)
// Drain channel
<-askedForIdentity
// Now it's 0
assert.Len(t, askedForIdentity, 0)

wg := sync.WaitGroup{}
wg.Add(1)
sentHello := false
sentDataReq := false
l := sync.Mutex{}
sender.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
senderMock := mock.Mock{}
senderMock.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.SignedGossipMessage)
l.Lock()
defer l.Unlock()

if hello := msg.GetHello(); hello != nil && !sentHello {
sentHello = true
go certStore.handleMessage(createDigest(hello.Nonce))
dig := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_DataDig{
DataDig: &proto.DataDigest{
Nonce: hello.Nonce,
MsgType: proto.PullMsgType_IDENTITY_MSG,
Digests: []string{"B"},
},
},
}
go cStore.handleMessage(&sentMsg{msg: dig.NoopSign()})
}

if dataReq := msg.GetDataReq(); dataReq != nil && !sentDataReq {
sentDataReq = true
certStore.handleMessage(updateFactory(dataReq.Nonce))
wg.Done()
if dataReq := msg.GetDataReq(); dataReq != nil {
askedForIdentity <- struct{}{}
}
})
sender.Mock = senderMock
testCertificateUpdate(t, true, cStore)
// Shouldn't have asked, because already got identity
select {
case <-time.After(time.Second * 3):
case <-askedForIdentity:
assert.Fail(t, "Shouldn't have asked for an identity, becase we already have it")
}
assert.Len(t, askedForIdentity, 0)
// Revoke the identity
cs.revoke(common.PKIidType("B"))
cStore.listRevokedPeers(func(id api.PeerIdentityType) bool {
return string(id) == "B"
})
sentHello = false
l = sync.Mutex{}
senderMock = mock.Mock{}
senderMock.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.SignedGossipMessage)
l.Lock()
defer l.Unlock()

if hello := msg.GetHello(); hello != nil && !sentHello {
sentHello = true
dig := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Content: &proto.GossipMessage_DataDig{
DataDig: &proto.DataDigest{
Nonce: hello.Nonce,
MsgType: proto.PullMsgType_IDENTITY_MSG,
Digests: []string{"B"},
},
},
}
go cStore.handleMessage(&sentMsg{msg: dig.NoopSign()})
}

if dataReq := msg.GetDataReq(); dataReq != nil {
askedForIdentity <- struct{}{}
}
})
wg.Wait()

select {
case <-time.After(time.Second * 5):
assert.Fail(t, "Didn't ask for identity, but should have. Looks like identity hasn't expired")
case <-askedForIdentity:
}
}

func testCertificateUpdate(t *testing.T, shouldSucceed bool, certStore *certStore) {
hello := &sentMsg{
msg: (&proto.GossipMessage{
Channel: []byte(""),
Expand Down Expand Up @@ -302,3 +366,61 @@ func createDigest(nonce uint64) proto.ReceivedMessage {
}
return &sentMsg{msg: digest.NoopSign()}
}

func createObjects(updateFactory func(uint64) proto.ReceivedMessage, msgCons proto.MsgConsumer) (pull.Mediator, *certStore, *senderMock) {
if msgCons == nil {
msgCons = func(_ *proto.SignedGossipMessage) {}
}
config := pull.Config{
MsgType: proto.PullMsgType_IDENTITY_MSG,
PeerCountToSelect: 1,
PullInterval: time.Millisecond * 500,
Tag: proto.GossipMessage_EMPTY,
Channel: nil,
ID: "id1",
}
sender := &senderMock{}
memberSvc := &membershipSvcMock{}
memberSvc.On("GetMembership").Return([]discovery.NetworkMember{{PKIid: []byte("bla bla"), Endpoint: "localhost:5611"}})

var certStore *certStore
adapter := pull.PullAdapter{
Sndr: sender,
MsgCons: func(msg *proto.SignedGossipMessage) {
certStore.idMapper.Put(msg.GetPeerIdentity().PkiId, msg.GetPeerIdentity().Cert)
msgCons(msg)
},
IdExtractor: func(msg *proto.SignedGossipMessage) string {
return string(msg.GetPeerIdentity().PkiId)
},
MemSvc: memberSvc,
}
pullMediator := pull.NewPullMediator(config, adapter)
certStore = newCertStore(&pullerMock{
Mediator: pullMediator,
}, identity.NewIdentityMapper(cs), api.PeerIdentityType("SELF"), cs)

wg := sync.WaitGroup{}
wg.Add(1)
sentHello := false
sentDataReq := false
l := sync.Mutex{}
sender.On("Send", mock.Anything, mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.SignedGossipMessage)
l.Lock()
defer l.Unlock()

if hello := msg.GetHello(); hello != nil && !sentHello {
sentHello = true
go certStore.handleMessage(createDigest(hello.Nonce))
}

if dataReq := msg.GetDataReq(); dataReq != nil && !sentDataReq {
sentDataReq = true
certStore.handleMessage(updateFactory(dataReq.Nonce))
wg.Done()
}
})
wg.Wait()
return pullMediator, certStore, sender
}
7 changes: 5 additions & 2 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,13 @@ func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.M

gc.blocksPuller = gc.createBlockPuller()

seqNumFromMsg := func(m interface{}) string {
return fmt.Sprintf("%d", m.(*proto.SignedGossipMessage).GetDataMsg().Payload.SeqNum)
}
gc.blockMsgStore = msgstore.NewMessageStoreExpirable(comparator, func(m interface{}) {
gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage))
gc.blocksPuller.Remove(seqNumFromMsg(m))
}, gc.GetConf().BlockExpirationInterval, nil, nil, func(m interface{}) {
gc.blocksPuller.Remove(m.(*proto.SignedGossipMessage))
gc.blocksPuller.Remove(seqNumFromMsg(m))
})

gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoExpirationInterval)
Expand Down
33 changes: 33 additions & 0 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ const (
acceptChanSize = 100
)

var (
identityExpirationCheckInterval = time.Hour * 24
identityInactivityCheckInterval = time.Minute * 10
)

type channelRoutingFilterFactory func(channel.GossipChannel) filter.RoutingFilter

type gossipServiceImpl struct {
Expand Down Expand Up @@ -128,6 +133,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
}

go g.start()
go g.periodicalIdentityValidationAndExpiration()

return g
}
Expand Down Expand Up @@ -187,6 +193,33 @@ func (g *gossipServiceImpl) SuspectPeers(isSuspected api.PeerSuspector) {
}
}

func (g *gossipServiceImpl) periodicalIdentityValidationAndExpiration() {
// We check once every identityExpirationCheckInterval for identities that have been expired
go g.periodicalIdentityValidation(func(identity api.PeerIdentityType) bool {
// We need to validate every identity to check if it has been expired
return true
}, identityExpirationCheckInterval)

// We check once every identityInactivityCheckInterval for identities that have not been used for a long time
go g.periodicalIdentityValidation(func(identity api.PeerIdentityType) bool {
// We don't validate any identity, because we just want to know whether
// it has not been used for a long time
return false
}, identityInactivityCheckInterval)
}

func (g *gossipServiceImpl) periodicalIdentityValidation(suspectFunc api.PeerSuspector, interval time.Duration) {
for {
select {
case s := <-g.toDieChan:
g.toDieChan <- s
return
case <-time.After(interval):
g.SuspectPeers(suspectFunc)
}
}
}

func (g *gossipServiceImpl) learnAnchorPeers(orgOfAnchorPeers api.OrgIdentityType, anchorPeers []api.AnchorPeer) {
for _, ap := range anchorPeers {
if ap.Host == "" {
Expand Down
12 changes: 5 additions & 7 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func init() {
discovery.SetReconnectInterval(aliveTimeInterval)
testWG.Add(7)
factory.InitFactories(nil)
identityExpirationCheckInterval = time.Second
}

var orgInChannelA = api.OrgIdentityType("ORG1")
Expand Down Expand Up @@ -984,11 +985,11 @@ func TestDisseminateAll2All(t *testing.T) {
testWG.Done()
}

func TestRevocation(t *testing.T) {
func TestIdentityExpiration(t *testing.T) {
t.Parallel()
// Scenario: spawn 4 peers and revoke one of them.
// The rest of the peers should not be able to communicate with
// the revoked peer at all.
// Scenario: spawn 4 peers and make the MessageCryptoService revoke one of them.
// Eventually, the rest of the peers should not be able to communicate with
// the revoked peer at all because its identity would seem to them as expired

portPrefix := 7000
g1 := newGossipInstance(portPrefix, 0, 100)
Expand Down Expand Up @@ -1016,9 +1017,6 @@ func TestRevocation(t *testing.T) {
continue
}
p.(*gossipServiceImpl).mcs.(*naiveCryptoService).revoke(revokedPkiID)
p.SuspectPeers(func(_ api.PeerIdentityType) bool {
return true
})
}
// Ensure that no one talks to the peer that is revoked
ensureRevokedPeerIsIgnored := func() bool {
Expand Down
Loading

0 comments on commit f0c3bfc

Please sign in to comment.