Skip to content

Commit

Permalink
[FAB-4085] Prevent expiration of self identity
Browse files Browse the repository at this point in the history
In gossip we have time-based expiration (purge) of identities that haven't
been used for a long time.
This may cause an expiration of the peer's own certificate if it doesn't
gossip with any peers (i.e if it's alone in the world).

The problem is that when an identity expires from the identity store,
it is also expired from the pull mediator that is used for identities.
This is important because the only way identities are gossiped
transitively is via the pull mechanism.
If a peer's own identity disappears from the pull mediator,
it will never be sent to peers transitively.

Change-Id: Ic2138ebaa60bdf2454d65d3884f141c3736254a0
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed May 24, 2017
1 parent ee77584 commit 08a2515
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 43 deletions.
3 changes: 0 additions & 3 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
subscriptions: make([]chan proto.ReceivedMessage, 0),
}
commInst.connStore = newConnStore(commInst, commInst.logger)
if err := commInst.idMapper.Put(idMapper.GetPKIidOfCert(peerIdentity), peerIdentity); err != nil {
commInst.logger.Panic("Failed associating self PKIID to cert:", err)
}

if port > 0 {
commInst.stopWG.Add(1)
Expand Down
9 changes: 6 additions & 3 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ func (*naiveSecProvider) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityTyp

func newCommInstance(port int, sec api.MessageCryptoService) (Comm, error) {
endpoint := fmt.Sprintf("localhost:%d", port)
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec), []byte(endpoint), nil)
id := []byte(endpoint)
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec, id), id, nil)
return inst, err
}

Expand Down Expand Up @@ -285,15 +286,17 @@ func TestProdConstructor(t *testing.T) {
srv, lsnr, dialOpts, certHash := createGRPCLayer(20000)
defer srv.Stop()
defer lsnr.Close()
comm1, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec), []byte("localhost:20000"), dialOpts)
id := []byte("localhost:20000")
comm1, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec, id), id, dialOpts)
comm1.(*commImpl).selfCertHash = certHash
go srv.Serve(lsnr)

peerIdentity = GenerateCertificatesOrPanic()
srv, lsnr, dialOpts, certHash = createGRPCLayer(30000)
defer srv.Stop()
defer lsnr.Close()
comm2, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec), []byte("localhost:30000"), dialOpts)
id = []byte("localhost:30000")
comm2, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec, id), id, dialOpts)
comm2.(*commImpl).selfCertHash = certHash
go srv.Serve(lsnr)
defer comm1.Stop()
Expand Down
4 changes: 0 additions & 4 deletions gossip/gossip/certstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ type certStore struct {
func newCertStore(puller pull.Mediator, idMapper identity.Mapper, selfIdentity api.PeerIdentityType, mcs api.MessageCryptoService) *certStore {
selfPKIID := idMapper.GetPKIidOfCert(selfIdentity)
logger := util.GetLogger(util.LoggingGossipModule, string(selfPKIID))
if err := idMapper.Put(selfPKIID, selfIdentity); err != nil {
logger.Error("Failed associating self PKIID to cert:", err)
panic(fmt.Errorf("Failed associating self PKIID to cert: %v", err))
}

certStore := &certStore{
mcs: mcs,
Expand Down
66 changes: 64 additions & 2 deletions gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestCertStoreShouldSucceed(t *testing.T) {
testCertificateUpdate(t, true, cs)
}

func TestCertExpiration(t *testing.T) {
func TestCertRevocation(t *testing.T) {
identityExpCheckInterval := identityExpirationCheckInterval
defer func() {
identityExpirationCheckInterval = identityExpCheckInterval
Expand Down Expand Up @@ -225,6 +225,67 @@ func TestCertExpiration(t *testing.T) {
}
}

func TestCertExpiration(t *testing.T) {
// Scenario: In this test we make sure that a peer may not expire
// its own identity.
// This is important because the only way identities are gossiped
// transitively is via the pull mechanism.
// If a peer's own identity disappears from the pull mediator,
// it will never be sent to peers transitively.
// The test ensures that self identities don't expire
// in the following manner:
// It starts a peer and then sleeps twice the identity usage threshold,
// in order to make sure that its own identity should be expired.
// Then, it starts another peer, and listens to the messages sent
// between both peers, and looks for a few identity digests of the first peer.
// If such identity digest are detected, it means that the peer
// didn't expire its own identity.

// Backup original usageThreshold value
idUsageThreshold := identity.GetIdentityUsageThreshold()
identity.SetIdentityUsageThreshold(time.Second)
// Restore original usageThreshold value
defer identity.SetIdentityUsageThreshold(idUsageThreshold)

// Backup original identityInactivityCheckInterval value
inactivityCheckInterval := identityInactivityCheckInterval
identityInactivityCheckInterval = time.Second * 1
// Restore original identityInactivityCheckInterval value
defer func() {
identityInactivityCheckInterval = inactivityCheckInterval
}()

g1 := newGossipInstance(4321, 0, 0, 1)
defer g1.Stop()
time.Sleep(identity.GetIdentityUsageThreshold() * 2)
g2 := newGossipInstance(4322, 0, 0)
defer g2.Stop()

identities2Detect := 3
// Make the channel bigger than needed so goroutines won't get stuck
identitiesGotViaPull := make(chan struct{}, identities2Detect+100)
acceptIdentityPullMsgs := func(o interface{}) bool {
m := o.(proto.ReceivedMessage).GetGossipMessage()
if m.IsPullMsg() && m.IsDigestMsg() {
for _, dig := range m.GetDataDig().Digests {
if dig == "localhost:4321" {
identitiesGotViaPull <- struct{}{}
}
}
}
return false
}
g1.Accept(acceptIdentityPullMsgs, true)
for i := 0; i < identities2Detect; i++ {
select {
case <-identitiesGotViaPull:
case <-time.After(time.Second * 15):
assert.Fail(t, "Didn't detect an identity gossiped via pull in a timely manner")
return
}
}
}

func testCertificateUpdate(t *testing.T, shouldSucceed bool, certStore *certStore) {
hello := &sentMsg{
msg: (&proto.GossipMessage{
Expand Down Expand Up @@ -396,9 +457,10 @@ func createObjects(updateFactory func(uint64) proto.ReceivedMessage, msgCons pro
MemSvc: memberSvc,
}
pullMediator := pull.NewPullMediator(config, adapter)
selfIdentity := api.PeerIdentityType("SELF")
certStore = newCertStore(&pullerMock{
Mediator: pullMediator,
}, identity.NewIdentityMapper(cs), api.PeerIdentityType("SELF"), cs)
}, identity.NewIdentityMapper(cs, selfIdentity), selfIdentity, cs)

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down
11 changes: 6 additions & 5 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ func newGossipInstanceWithCustomMCS(portPrefix int, id int, maxMsgCount int, mcs
PublishStateInfoInterval: time.Duration(1) * time.Second,
RequestStateInfoInterval: time.Duration(1) * time.Second,
}

idMapper := identity.NewIdentityMapper(mcs)
selfId := api.PeerIdentityType(conf.InternalEndpoint)
idMapper := identity.NewIdentityMapper(mcs, selfId)
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs, idMapper,
api.PeerIdentityType(conf.InternalEndpoint), nil)
selfId, nil)

return g
}
Expand Down Expand Up @@ -251,10 +251,11 @@ func newGossipInstanceWithOnlyPull(portPrefix int, id int, maxMsgCount int, boot
}

cryptoService := &naiveCryptoService{}
idMapper := identity.NewIdentityMapper(cryptoService)
selfId := api.PeerIdentityType(conf.InternalEndpoint)
idMapper := identity.NewIdentityMapper(cryptoService, selfId)

g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService, idMapper,
api.PeerIdentityType(conf.InternalEndpoint), nil)
selfId, nil)
return g
}

Expand Down
6 changes: 3 additions & 3 deletions gossip/gossip/orgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func newGossipInstanceWithExternalEndpoint(portPrefix int, id int, mcs *configur
PublishStateInfoInterval: time.Duration(1) * time.Second,
RequestStateInfoInterval: time.Duration(1) * time.Second,
}

idMapper := identity.NewIdentityMapper(mcs)
g := NewGossipServiceWithServer(conf, mcs, mcs, idMapper, api.PeerIdentityType(conf.InternalEndpoint),
selfId := api.PeerIdentityType(conf.InternalEndpoint)
idMapper := identity.NewIdentityMapper(mcs, selfId)
g := NewGossipServiceWithServer(conf, mcs, mcs, idMapper, selfId,
nil)

return g
Expand Down
33 changes: 27 additions & 6 deletions gossip/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package identity
import (
"bytes"
"errors"
"fmt"
"sync"
"time"

"sync/atomic"
"time"

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
Expand All @@ -31,7 +31,7 @@ import (
var (
// identityUsageThreshold sets the maximum time that an identity
// can not be used to verify some signature before it will be deleted
identityUsageThreshold = time.Hour
usageThreshold = time.Hour
)

// Mapper holds mappings between pkiID
Expand Down Expand Up @@ -66,14 +66,21 @@ type identityMapperImpl struct {
mcs api.MessageCryptoService
pkiID2Cert map[string]*storedIdentity
sync.RWMutex
selfPKIID string
}

// NewIdentityMapper method, all we need is a reference to a MessageCryptoService
func NewIdentityMapper(mcs api.MessageCryptoService) Mapper {
return &identityMapperImpl{
func NewIdentityMapper(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType) Mapper {
selfPKIID := mcs.GetPKIidOfCert(selfIdentity)
idMapper := &identityMapperImpl{
mcs: mcs,
pkiID2Cert: make(map[string]*storedIdentity),
selfPKIID: string(selfPKIID),
}
if err := idMapper.Put(selfPKIID, selfIdentity); err != nil {
panic(fmt.Errorf("Failed putting our own identity into the identity mapper: %v", err))
}
return idMapper
}

// put associates an identity to its given pkiID, and returns an error
Expand Down Expand Up @@ -157,7 +164,7 @@ func (is *identityMapperImpl) validateIdentities(isSuspected api.PeerSuspector)
defer is.RUnlock()
var revokedIds []common.PKIidType
for pkiID, storedIdentity := range is.pkiID2Cert {
if storedIdentity.fetchLastAccessTime().Add(identityUsageThreshold).Before(now) {
if pkiID != is.selfPKIID && storedIdentity.fetchLastAccessTime().Add(usageThreshold).Before(now) {
revokedIds = append(revokedIds, common.PKIidType(pkiID))
continue
}
Expand Down Expand Up @@ -191,3 +198,17 @@ func (si *storedIdentity) fetchIdentity() api.PeerIdentityType {
func (si *storedIdentity) fetchLastAccessTime() time.Time {
return time.Unix(0, atomic.LoadInt64(&si.lastAccessTime))
}

// SetIdentityUsageThreshold sets the usage threshold of identities.
// Identities that are not used at least once during the given time
// are purged
func SetIdentityUsageThreshold(duration time.Duration) {
usageThreshold = duration
}

// GetIdentityUsageThreshold returns the usage threshold of identities.
// Identities that are not used at least once during the usage threshold
// duration are purged.
func GetIdentityUsageThreshold() time.Duration {
return usageThreshold
}
15 changes: 9 additions & 6 deletions gossip/identity/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"github.com/stretchr/testify/assert"
)

var msgCryptoService = &naiveCryptoService{revokedIdentities: map[string]struct{}{}}
var (
msgCryptoService = &naiveCryptoService{revokedIdentities: map[string]struct{}{}}
dummyID = api.PeerIdentityType{}
)

type naiveCryptoService struct {
revokedIdentities map[string]struct{}
Expand Down Expand Up @@ -82,7 +85,7 @@ func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature,
}

func TestPut(t *testing.T) {
idStore := NewIdentityMapper(msgCryptoService)
idStore := NewIdentityMapper(msgCryptoService, dummyID)
identity := []byte("yacovm")
identity2 := []byte("not-yacovm")
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
Expand All @@ -95,7 +98,7 @@ func TestPut(t *testing.T) {
}

func TestGet(t *testing.T) {
idStore := NewIdentityMapper(msgCryptoService)
idStore := NewIdentityMapper(msgCryptoService, dummyID)
identity := []byte("yacovm")
identity2 := []byte("not-yacovm")
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
Expand All @@ -110,7 +113,7 @@ func TestGet(t *testing.T) {
}

func TestVerify(t *testing.T) {
idStore := NewIdentityMapper(msgCryptoService)
idStore := NewIdentityMapper(msgCryptoService, dummyID)
identity := []byte("yacovm")
identity2 := []byte("not-yacovm")
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
Expand All @@ -123,7 +126,7 @@ func TestVerify(t *testing.T) {
}

func TestListInvalidIdentities(t *testing.T) {
idStore := NewIdentityMapper(msgCryptoService)
idStore := NewIdentityMapper(msgCryptoService, dummyID)
identity := []byte("yacovm")
// Test for a revoked identity
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
Expand All @@ -150,7 +153,7 @@ func TestListInvalidIdentities(t *testing.T) {
pkiID = msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
assert.NoError(t, idStore.Put(pkiID, api.PeerIdentityType(identity)))
// set the time-based expiration time limit to something small
identityUsageThreshold = time.Millisecond * 500
usageThreshold = time.Millisecond * 500
idStore.ListInvalidIdentities(func(_ api.PeerIdentityType) bool {
return false
})
Expand Down
4 changes: 2 additions & 2 deletions gossip/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestNewGossipCryptoService(t *testing.T) {
endpoint3 := "localhost:5613"
msptesttools.LoadMSPSetupForTesting()
peerIdentity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
idMapper := identity.NewIdentityMapper(cryptSvc)
idMapper := identity.NewIdentityMapper(cryptSvc, peerIdentity)

g1 := NewGossipComponent(peerIdentity, endpoint1, s1, secAdv, cryptSvc, idMapper,
defaultSecureDialOpts)
Expand All @@ -82,7 +82,7 @@ func TestBadInitialization(t *testing.T) {
msptesttools.LoadMSPSetupForTesting()
peerIdentity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
s1 := grpc.NewServer()
idMapper := identity.NewIdentityMapper(cryptSvc)
idMapper := identity.NewIdentityMapper(cryptSvc, peerIdentity)
assert.Panics(t, func() {
newConfig("anEndpointWithoutAPort", "anEndpointWithoutAPort")
})
Expand Down
6 changes: 1 addition & 5 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,7 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string

logger.Info("Initialize gossip with endpoint", endpoint, "and bootstrap set", bootPeers)

idMapper := identity.NewIdentityMapper(mcs)
if err := idMapper.Put(mcs.GetPKIidOfCert(peerIdentity), peerIdentity); err != nil {
logger.Panic("Failed associating self PKIID to cert:", err)
}

idMapper := identity.NewIdentityMapper(mcs, peerIdentity)
gossip := integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv,
mcs, idMapper, secureDialOpts, bootPeers...)
gossipServiceInstance = &gossipServiceImpl{
Expand Down
5 changes: 3 additions & 2 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,12 @@ func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gos
PublishStateInfoInterval: time.Duration(1) * time.Second,
RequestStateInfoInterval: time.Duration(1) * time.Second,
}
selfId := api.PeerIdentityType(conf.InternalEndpoint)
cryptoService := &naiveCryptoService{}
idMapper := identity.NewIdentityMapper(cryptoService)
idMapper := identity.NewIdentityMapper(cryptoService, selfId)

gossip := gossip.NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService,
idMapper, api.PeerIdentityType(conf.InternalEndpoint), nil)
idMapper, selfId, nil)

gossipService := &gossipServiceImpl{
gossipSvc: gossip,
Expand Down
5 changes: 3 additions & 2 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,10 @@ func newGossipConfig(id int, boot ...int) *gossip.Config {

// Create gossip instance
func newGossipInstance(config *gossip.Config, mcs api.MessageCryptoService) gossip.Gossip {
idMapper := identity.NewIdentityMapper(mcs)
id := api.PeerIdentityType(config.InternalEndpoint)
idMapper := identity.NewIdentityMapper(mcs, id)
return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, mcs,
idMapper, []byte(config.InternalEndpoint), nil)
idMapper, id, nil)
}

// Create new instance of KVLedger to be used for testing
Expand Down

0 comments on commit 08a2515

Please sign in to comment.