Skip to content

Commit

Permalink
[FAB-5752] Gossip identity expiration I
Browse files Browse the repository at this point in the history
This commit:
1) Moves the identity mapper into the gossip package from the service
   package, since it's not needed in the service package.
2) Extends the constructor of the identity mapper to have a callback
   that is fired whenever an identity is deleted from the identity mapper.

Change-Id: I1c9eaa47c97351518848843a9c4f2e1835f82249
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Aug 30, 2017
1 parent 297d393 commit 0a03e39
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 63 deletions.
12 changes: 8 additions & 4 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func acceptAll(msg interface{}) bool {
return true
}

var noopPurgeIdentity = func(_ common.PKIidType, _ api.PeerIdentityType) {

}

var (
naiveSec = &naiveSecProvider{}
hmacKey = []byte{0, 0, 0}
Expand Down Expand Up @@ -97,7 +101,7 @@ func (*naiveSecProvider) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityTyp
func newCommInstance(port int, sec api.MessageCryptoService) (Comm, error) {
endpoint := fmt.Sprintf("localhost:%d", port)
id := []byte(endpoint)
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec, id), id, nil)
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec, id, noopPurgeIdentity), id, nil)
return inst, err
}

Expand Down Expand Up @@ -215,7 +219,7 @@ func TestHandshake(t *testing.T) {
go s.Serve(ll)

id := []byte("localhost:9611")
idMapper := identity.NewIdentityMapper(naiveSec, id)
idMapper := identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity)
inst, err := NewCommInstance(s, nil, idMapper, api.PeerIdentityType("localhost:9611"), func() []grpc.DialOption {
return []grpc.DialOption{grpc.WithInsecure()}
})
Expand Down Expand Up @@ -347,7 +351,7 @@ func TestProdConstructor(t *testing.T) {
defer srv.Stop()
defer lsnr.Close()
id := []byte("localhost:20000")
comm1, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec, id), id, dialOpts)
comm1, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity), id, dialOpts)
comm1.(*commImpl).selfCertHash = certHash
go srv.Serve(lsnr)

Expand All @@ -356,7 +360,7 @@ func TestProdConstructor(t *testing.T) {
defer srv.Stop()
defer lsnr.Close()
id = []byte("localhost:30000")
comm2, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec, id), id, dialOpts)
comm2, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity), id, dialOpts)
comm2.(*commImpl).selfCertHash = certHash
go srv.Serve(lsnr)
defer comm1.Stop()
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func createObjects(updateFactory func(uint64) proto.ReceivedMessage, msgCons pro
selfIdentity := api.PeerIdentityType("SELF")
certStore = newCertStore(&pullerMock{
Mediator: pullMediator,
}, identity.NewIdentityMapper(cs, selfIdentity), selfIdentity, cs)
}, identity.NewIdentityMapper(cs, selfIdentity, func(_ common.PKIidType, _ api.PeerIdentityType) {}), selfIdentity, cs)

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down
39 changes: 21 additions & 18 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,24 @@ type gossipServiceImpl struct {
disSecAdap *discoverySecurityAdapter
mcs api.MessageCryptoService
stateInfoMsgStore msgstore.MessageStore
certPuller pull.Mediator
}

// NewGossipService creates a gossip instance attached to a gRPC server
func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor,
mcs api.MessageCryptoService, idMapper identity.Mapper, selfIdentity api.PeerIdentityType,
mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType,
secureDialOpts api.PeerSecureDialOpts) Gossip {

var c comm.Comm
var err error

lgr := util.GetLogger(util.LoggingGossipModule, conf.ID)
if s == nil {
c, err = createCommWithServer(conf.BindPort, idMapper, selfIdentity, secureDialOpts)
} else {
c, err = createCommWithoutServer(s, conf.TLSServerCert, idMapper, selfIdentity, secureDialOpts)
}

if err != nil {
lgr.Errorf("Failed instntiating communication layer: %+v", errors.WithStack(err))
return nil
}

g := &gossipServiceImpl{
selfOrg: secAdvisor.OrgByPeerIdentity(selfIdentity),
secAdvisor: secAdvisor,
selfIdentity: selfIdentity,
presumedDead: make(chan common.PKIidType, presumedDeadChanSize),
idMapper: idMapper,
disc: nil,
mcs: mcs,
comm: c,
conf: conf,
ChannelDeMultiplexer: comm.NewChannelDemultiplexer(),
logger: lgr,
Expand All @@ -107,6 +94,21 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
}
g.stateInfoMsgStore = g.newStateInfoMsgStore()

g.idMapper = identity.NewIdentityMapper(mcs, selfIdentity, func(pkiID common.PKIidType, identity api.PeerIdentityType) {
g.certPuller.Remove(string(pkiID))
})

if s == nil {
g.comm, err = createCommWithServer(conf.BindPort, g.idMapper, selfIdentity, secureDialOpts)
} else {
g.comm, err = createCommWithoutServer(s, conf.TLSServerCert, g.idMapper, selfIdentity, secureDialOpts)
}

if err != nil {
lgr.Error("Failed instntiating communication layer:", err)
return nil
}

g.chanState = newChannelState(g)
g.emitter = newBatchingEmitter(conf.PropagateIterations,
conf.MaxPropagationBurstSize, conf.MaxPropagationBurstLatency,
Expand All @@ -117,7 +119,8 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy)
g.logger.Info("Creating gossip service with self membership of", g.selfNetworkMember())

g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)
g.certPuller = g.createCertStorePuller()
g.certStore = newCertStore(g.certPuller, g.idMapper, selfIdentity, mcs)

if g.conf.ExternalEndpoint == "" {
g.logger.Warning("External endpoint is empty, peer will not be accessible outside of its organization")
Expand Down Expand Up @@ -168,8 +171,8 @@ func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore iden

// NewGossipServiceWithServer creates a new gossip instance with a gRPC server
func NewGossipServiceWithServer(conf *Config, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService,
mapper identity.Mapper, identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) Gossip {
return NewGossipService(conf, nil, secAdvisor, mcs, mapper, identity, secureDialOpts)
identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) Gossip {
return NewGossipService(conf, nil, secAdvisor, mcs, identity, secureDialOpts)
}

func createCommWithServer(port int, idStore identity.Mapper, identity api.PeerIdentityType,
Expand Down
8 changes: 2 additions & 6 deletions gossip/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/gossip/algo"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -227,8 +226,7 @@ func newGossipInstanceWithCustomMCS(portPrefix int, id int, maxMsgCount int, mcs
RequestStateInfoInterval: time.Duration(1) * time.Second,
}
selfId := api.PeerIdentityType(conf.InternalEndpoint)
idMapper := identity.NewIdentityMapper(mcs, selfId)
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs, idMapper,
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs,
selfId, nil)

return g
Expand Down Expand Up @@ -260,9 +258,7 @@ func newGossipInstanceWithOnlyPull(portPrefix int, id int, maxMsgCount int, boot

cryptoService := &naiveCryptoService{}
selfId := api.PeerIdentityType(conf.InternalEndpoint)
idMapper := identity.NewIdentityMapper(cryptoService, selfId)

g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService, idMapper,
g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService,
selfId, nil)
return g
}
Expand Down
4 changes: 1 addition & 3 deletions gossip/gossip/orgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/util"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -110,8 +109,7 @@ func newGossipInstanceWithExternalEndpoint(portPrefix int, id int, mcs *configur
RequestStateInfoInterval: time.Duration(1) * time.Second,
}
selfId := api.PeerIdentityType(conf.InternalEndpoint)
idMapper := identity.NewIdentityMapper(mcs, selfId)
g := NewGossipServiceWithServer(conf, mcs, mcs, idMapper, selfId,
g := NewGossipServiceWithServer(conf, mcs, mcs, selfId,
nil)

return g
Expand Down
7 changes: 6 additions & 1 deletion gossip/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,22 @@ type Mapper interface {
ListInvalidIdentities(isSuspected api.PeerSuspector) []common.PKIidType
}

type purgeTrigger func(pkiID common.PKIidType, identity api.PeerIdentityType)

// identityMapperImpl is a struct that implements Mapper
type identityMapperImpl struct {
onPurge purgeTrigger
mcs api.MessageCryptoService
pkiID2Cert map[string]*storedIdentity
sync.RWMutex
selfPKIID string
}

// NewIdentityMapper method, all we need is a reference to a MessageCryptoService
func NewIdentityMapper(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType) Mapper {
func NewIdentityMapper(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, onPurge purgeTrigger) Mapper {
selfPKIID := mcs.GetPKIidOfCert(selfIdentity)
idMapper := &identityMapperImpl{
onPurge: onPurge,
mcs: mcs,
pkiID2Cert: make(map[string]*storedIdentity),
selfPKIID: string(selfPKIID),
Expand Down Expand Up @@ -140,6 +144,7 @@ func (is *identityMapperImpl) ListInvalidIdentities(isSuspected api.PeerSuspecto
is.Lock()
defer is.Unlock()
for _, pkiID := range revokedIds {
is.onPurge(pkiID, is.pkiID2Cert[string(pkiID)].peerIdentity)
delete(is.pkiID2Cert, string(pkiID))
}
return revokedIds
Expand Down
10 changes: 6 additions & 4 deletions gossip/identity/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type naiveCryptoService struct {
revokedIdentities map[string]struct{}
}

var noopPurgeTrigger = func(_ common.PKIidType, _ api.PeerIdentityType) {}

func init() {
util.SetupTestLogging()
}
Expand Down Expand Up @@ -75,7 +77,7 @@ func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature,
}

func TestPut(t *testing.T) {
idStore := NewIdentityMapper(msgCryptoService, dummyID)
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger)
identity := []byte("yacovm")
identity2 := []byte("not-yacovm")
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
Expand All @@ -88,7 +90,7 @@ func TestPut(t *testing.T) {
}

func TestGet(t *testing.T) {
idStore := NewIdentityMapper(msgCryptoService, dummyID)
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger)
identity := []byte("yacovm")
identity2 := []byte("not-yacovm")
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
Expand All @@ -103,7 +105,7 @@ func TestGet(t *testing.T) {
}

func TestVerify(t *testing.T) {
idStore := NewIdentityMapper(msgCryptoService, dummyID)
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger)
identity := []byte("yacovm")
identity2 := []byte("not-yacovm")
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
Expand All @@ -116,7 +118,7 @@ func TestVerify(t *testing.T) {
}

func TestListInvalidIdentities(t *testing.T) {
idStore := NewIdentityMapper(msgCryptoService, dummyID)
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger)
identity := []byte("yacovm")
// Test for a revoked identity
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
Expand Down
5 changes: 2 additions & 3 deletions gossip/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/util"
"github.com/pkg/errors"
"github.com/spf13/viper"
Expand Down Expand Up @@ -68,7 +67,7 @@ func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string

// NewGossipComponent creates a gossip component that attaches itself to the given gRPC server
func NewGossipComponent(peerIdentity []byte, endpoint string, s *grpc.Server,
secAdv api.SecurityAdvisor, cryptSvc api.MessageCryptoService, idMapper identity.Mapper,
secAdv api.SecurityAdvisor, cryptSvc api.MessageCryptoService,
secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) (gossip.Gossip, error) {

externalEndpoint := viper.GetString("peer.gossip.externalEndpoint")
Expand All @@ -77,7 +76,7 @@ func NewGossipComponent(peerIdentity []byte, endpoint string, s *grpc.Server,
if err != nil {
return nil, errors.WithStack(err)
}
gossipInstance := gossip.NewGossipService(conf, s, secAdv, cryptSvc, idMapper,
gossipInstance := gossip.NewGossipService(conf, s, secAdv, cryptSvc,
peerIdentity, secureDialOpts)

return gossipInstance, nil
Expand Down
12 changes: 4 additions & 8 deletions gossip/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
Expand Down Expand Up @@ -52,15 +51,13 @@ func TestNewGossipCryptoService(t *testing.T) {
endpoint3 := "localhost:5613"
msptesttools.LoadMSPSetupForTesting()
peerIdentity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
idMapper := identity.NewIdentityMapper(cryptSvc, peerIdentity)

g1, err := NewGossipComponent(peerIdentity, endpoint1, s1, secAdv, cryptSvc, idMapper,
g1, err := NewGossipComponent(peerIdentity, endpoint1, s1, secAdv, cryptSvc,
defaultSecureDialOpts)
assert.NoError(t, err)
g2, err := NewGossipComponent(peerIdentity, endpoint2, s2, secAdv, cryptSvc, idMapper,
g2, err := NewGossipComponent(peerIdentity, endpoint2, s2, secAdv, cryptSvc,
defaultSecureDialOpts, endpoint1)
assert.NoError(t, err)
g3, err := NewGossipComponent(peerIdentity, endpoint3, s3, secAdv, cryptSvc, idMapper,
g3, err := NewGossipComponent(peerIdentity, endpoint3, s3, secAdv, cryptSvc,
defaultSecureDialOpts, endpoint1)
assert.NoError(t, err)
defer g1.Stop()
Expand All @@ -75,11 +72,10 @@ func TestBadInitialization(t *testing.T) {
msptesttools.LoadMSPSetupForTesting()
peerIdentity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
s1 := grpc.NewServer()
idMapper := identity.NewIdentityMapper(cryptSvc, peerIdentity)
_, err := newConfig("anEndpointWithoutAPort", "anEndpointWithoutAPort")

viper.Set("peer.tls.enabled", true)
_, err = NewGossipComponent(peerIdentity, "localhost:5000", s1, secAdv, cryptSvc, idMapper,
_, err = NewGossipComponent(peerIdentity, "localhost:5000", s1, secAdv, cryptSvc,
defaultSecureDialOpts)
assert.Error(t, err)
}
Expand Down
8 changes: 2 additions & 6 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/integration"
privdata2 "github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/gossip/state"
Expand Down Expand Up @@ -81,7 +80,6 @@ type gossipServiceImpl struct {
deliveryService deliverclient.DeliverService
deliveryFactory DeliveryServiceFactory
lock sync.RWMutex
idMapper identity.Mapper
mcs api.MessageCryptoService
peerIdentity []byte
secAdv api.SecurityAdvisor
Expand Down Expand Up @@ -140,17 +138,15 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string

logger.Info("Initialize gossip with endpoint", endpoint, "and bootstrap set", bootPeers)

idMapper := identity.NewIdentityMapper(mcs, peerIdentity)
gossip, err = integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv,
mcs, idMapper, secureDialOpts, bootPeers...)
mcs, secureDialOpts, bootPeers...)
gossipServiceInstance = &gossipServiceImpl{
mcs: mcs,
gossipSvc: gossip,
coordinators: make(map[string]privdata2.Coordinator),
chains: make(map[string]state.GossipStateProvider),
leaderElection: make(map[string]election.LeaderElectionService),
deliveryFactory: factory,
idMapper: idMapper,
peerIdentity: peerIdentity,
secAdv: secAdv,
}
Expand Down Expand Up @@ -287,7 +283,7 @@ func (g *gossipServiceImpl) Stop() {
}

func (g *gossipServiceImpl) newLeaderElectionComponent(chainID string, callback func(bool)) election.LeaderElectionService {
PKIid := g.idMapper.GetPKIidOfCert(g.peerIdentity)
PKIid := g.mcs.GetPKIidOfCert(g.peerIdentity)
adapter := election.NewAdapter(g, PKIid, gossipCommon.ChainID(chainID))
return election.NewLeaderElectionService(adapter, string(PKIid), callback)
}
Expand Down
Loading

0 comments on commit 0a03e39

Please sign in to comment.