Skip to content

Commit

Permalink
[FAB-6522] Disseminate to a global set of peers
Browse files Browse the repository at this point in the history
This change set makes the private data push logic to disseminate
to a set of peers that doesn't take into account their belonging
to any organization.

Change-Id: Ic1969c50861c5b623042f37a8974b3d93a9ee20b
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Oct 13, 2017
1 parent a21d89d commit 5465089
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 148 deletions.
8 changes: 2 additions & 6 deletions core/common/privdata/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@ type CollectionAccessPolicy interface {
// AccessFilter returns a member filter function for a collection
AccessFilter() Filter

// RequiredExternalPeerCount returns the minimum number of external peers
// RequiredPeerCount returns the minimum number of peers
// required to send private data to
RequiredExternalPeerCount() int

// RequiredExternalPeerCount returns the minimum number of internal peers
// required to send private data to
RequiredInternalPeerCount() int
RequiredPeerCount() int

// MemberOrgs returns the collection's members as MSP IDs. This serves as
// a human-readable way of quickly identifying who is part of a collection.
Expand Down
8 changes: 2 additions & 6 deletions core/common/privdata/nopcollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,8 @@ func (nc *NopCollection) MemberOrgs() []string {
return nil
}

func (nc *NopCollection) RequiredExternalPeerCount() int {
return viper.GetInt("peer.gossip.pvtData.minExternalPeers")
}

func (nc *NopCollection) RequiredInternalPeerCount() int {
return viper.GetInt("peer.gossip.pvtData.minInternalPeers")
func (nc *NopCollection) RequiredPeerCount() int {
return viper.GetInt("peer.gossip.pvtData.minPeers")
}

func (nc *NopCollection) AccessFilter() Filter {
Expand Down
24 changes: 8 additions & 16 deletions core/common/privdata/simplecollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
// SimpleCollection implements a collection with static properties
// and a public member set
type SimpleCollection struct {
name string
accessPolicy policies.Policy
memberOrgs []string
requiredExternalPeerCount int
requiredInternalPeerCount int
name string
accessPolicy policies.Policy
memberOrgs []string
requiredPeerCount int
}

// CollectionID returns the collection's ID
Expand All @@ -38,16 +37,10 @@ func (sc *SimpleCollection) MemberOrgs() []string {
return sc.memberOrgs
}

// RequiredExternalPeerCount returns the minimum number of external peers
// RequiredPeerCount returns the minimum number of peers
// required to send private data to
func (sc *SimpleCollection) RequiredExternalPeerCount() int {
return sc.requiredExternalPeerCount
}

// RequiredInternalPeerCount returns the minimum number of internal peers
// required to send private data to
func (sc *SimpleCollection) RequiredInternalPeerCount() int {
return sc.requiredInternalPeerCount
func (sc *SimpleCollection) RequiredPeerCount() int {
return sc.requiredPeerCount
}

// AccessFilter returns the member filter function that evaluates signed data
Expand Down Expand Up @@ -120,8 +113,7 @@ func (sc *SimpleCollection) Setup(collectionConfig *common.StaticCollectionConfi
}

// set required peer counts
sc.requiredInternalPeerCount = int(collectionConfig.GetRequiredInternalPeerCount())
sc.requiredExternalPeerCount = int(collectionConfig.GetRequiredExternalPeerCount())
sc.requiredPeerCount = int(collectionConfig.GetRequiredExternalPeerCount())

return nil
}
3 changes: 1 addition & 2 deletions core/common/privdata/simplecollection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ func TestSetupGoodConfigCollection(t *testing.T) {
assert.True(t, members[1] == "signer1")

// check required peer count
assert.True(t, sc.RequiredInternalPeerCount() == 1)
assert.True(t, sc.RequiredExternalPeerCount() == 1)
assert.True(t, sc.RequiredPeerCount() == 1)
}

func TestSimpleCollectionFilter(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions gossip/api/subchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import "github.com/hyperledger/fabric/gossip/common"
// or which peers are eligible of receiving a certain message
type RoutingFilter func(peerIdentity PeerIdentityType) bool

// CollectionCriteria describes a way of selecting peers from a sub-channel
// given their signatures and whether they are from our organization
type SubChannelSelectionCriteria func(signature PeerSignature, isFromOurOrg bool) bool
// SubChannelSelectionCriteria describes a way of selecting peers from a sub-channel
// given their signatures
type SubChannelSelectionCriteria func(signature PeerSignature) bool

// RoutingFilterFactory defines an object that given a CollectionCriteria and a channel,
// it can ascertain which peers should be aware of the data related to the
Expand Down
4 changes: 1 addition & 3 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,11 @@ func (gc *gossipChannel) PeerFilter(messagePredicate api.SubChannelSelectionCrit
return false
}

sameOrg := bytes.Equal(gc.selfOrg, gc.GetOrgOfPeer(member.PKIid))

return messagePredicate(api.PeerSignature{
Message: msg.Payload,
Signature: msg.Signature,
PeerIdentity: peerIdentity,
}, sameOrg)
})
}
}

Expand Down
15 changes: 5 additions & 10 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,28 +1550,23 @@ func TestGossipChannelEligibility(t *testing.T) {
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))

// Ensure peers from the channel are returned
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.True(t, sameOrg)
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return true
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.False(t, sameOrg)
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return true
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
// But not peers which aren't in the channel
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.False(t, sameOrg)
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return true
})(discovery.NetworkMember{PKIid: pkiIDinOrg3}))

// Ensure the given predicate is considered
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.False(t, sameOrg)
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))

assert.False(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.True(t, sameOrg)
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))

Expand Down
8 changes: 2 additions & 6 deletions gossip/privdata/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,8 @@ func (cap *collectionAccessPolicy) MemberOrgs() []string {
return []string{"org0", "org1"}
}

func (cap *collectionAccessPolicy) RequiredInternalPeerCount() int {
return viper.GetInt("peer.gossip.pvtData.minInternalPeers")
}

func (cap *collectionAccessPolicy) RequiredExternalPeerCount() int {
return viper.GetInt("peer.gossip.pvtData.minExternalPeers")
func (cap *collectionAccessPolicy) RequiredPeerCount() int {
return viper.GetInt("peer.gossip.pvtData.minPeers")
}

func (cap *collectionAccessPolicy) AccessFilter() privdata.Filter {
Expand Down
61 changes: 24 additions & 37 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,48 +111,35 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, privData *rwset.

func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *proto.SignedGossipMessage) ([]*dissemination, error) {
var disseminationPlan []*dissemination
for _, expectedToBeInOurOrg := range []bool{true, false} {
var expectedToBeInOurOrg bool = expectedToBeInOurOrg
minAck := colAP.RequiredExternalPeerCount()
if expectedToBeInOurOrg {
minAck = colAP.RequiredInternalPeerCount()
}

routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChainID(d.chainID), func(signature api.PeerSignature, isOurOrg bool) bool {
if isOurOrg != expectedToBeInOurOrg {
return false
}
return colFilter(common.SignedData{
Data: signature.Message,
Signature: signature.Signature,
Identity: []byte(signature.PeerIdentity),
})
minAck := colAP.RequiredPeerCount()
routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChainID(d.chainID), func(signature api.PeerSignature) bool {
return colFilter(common.SignedData{
Data: signature.Message,
Signature: signature.Signature,
Identity: []byte(signature.PeerIdentity),
})
})

if err != nil {
logger.Error("Failed to retrieve peer routing filter for channel", d.chainID, ":", err)
return nil, err
}
if err != nil {
logger.Error("Failed to retrieve peer routing filter for channel", d.chainID, ":", err)
return nil, err
}

maxPeers := viper.GetInt("peer.gossip.pvtData.maxExternalPeers")
if expectedToBeInOurOrg {
maxPeers = viper.GetInt("peer.gossip.pvtData.maxInternalPeers")
}
maxPeers := viper.GetInt("peer.gossip.pvtData.maxPeers")

sc := gossip2.SendCriteria{
Timeout: time.Second,
Channel: gossipCommon.ChainID(d.chainID),
MaxPeers: maxPeers,
MinAck: minAck,
IsEligible: func(member discovery.NetworkMember) bool {
return routingFilter(member)
},
}
disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: pvtDataMsg,
})
sc := gossip2.SendCriteria{
Timeout: time.Second,
Channel: gossipCommon.ChainID(d.chainID),
MaxPeers: maxPeers,
MinAck: minAck,
IsEligible: func(member discovery.NetworkMember) bool {
return routingFilter(member)
},
}
disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: pvtDataMsg,
})
return disseminationPlan, nil
}

Expand Down
47 changes: 9 additions & 38 deletions gossip/privdata/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,13 @@ func (g *gossipMock) PeerFilter(channel gcommon.ChainID, messagePredicate api.Su
return nil, g.err
}
return func(member discovery.NetworkMember) bool {
var fromOurOrg bool
if string(member.PKIid) == "ORG1" {
fromOurOrg = true
}
return messagePredicate(g.PeerSignature, fromOurOrg)
return messagePredicate(g.PeerSignature)
}, nil
}

func TestDistributor(t *testing.T) {
viper.Set("peer.gossip.pvtData.minInternalPeers", 1)
viper.Set("peer.gossip.pvtData.maxInternalPeers", 2)
viper.Set("peer.gossip.pvtData.minExternalPeers", 3)
viper.Set("peer.gossip.pvtData.maxExternalPeers", 4)
viper.Set("peer.gossip.pvtData.minPeers", 1)
viper.Set("peer.gossip.pvtData.maxPeers", 2)
g := &gossipMock{
Mock: mock.Mock{},
PeerSignature: api.PeerSignature{
Expand Down Expand Up @@ -96,7 +90,6 @@ func TestDistributor(t *testing.T) {
Collection: "c2",
})
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1", "c2").create()
// Each private RWSet is sent to inside our org, and outside of the org, totalling in 8 sends.
err := d.Distribute("tx1", pvtData[0].WriteSet, cs)
assert.NoError(t, err)
err = d.Distribute("tx2", pvtData[1].WriteSet, cs)
Expand All @@ -108,38 +101,16 @@ func TestDistributor(t *testing.T) {
// The mock collection store returns policies that for ns1 and c1, or ns2 and c2 return true regardless
// of the network member, and for any other collection and namespace combination - return false

// If this is a dissemination to a foreign org, ensure MaxPeers is maxExternalPeers which is 4
// and MinAck is minExternalPeers which is is 3
foreignOrg := sc.IsEligible(discovery.NetworkMember{PKIid: gcommon.PKIidType("ORG2")})
if foreignOrg {
assert.Equal(t, 4, sc.MaxPeers)
assert.Equal(t, 3, sc.MinAck)
}

// If this is a dissemination within the org
intraOrg := sc.IsEligible(discovery.NetworkMember{PKIid: gcommon.PKIidType("ORG1")})
if intraOrg {
// Ensure MaxPeers is maxInternalPeers which is 2
//and MinAck is minInternalPeers which is 1
assert.Equal(t, 2, sc.MaxPeers)
assert.Equal(t, 1, sc.MinAck)
}

// If this private payload isn't allowed to be disseminated to any org,
// ensure this is because the private data is either ns1 and c2 or ns2 and c1.
// Otherwise, this private data is allowed to be disseminated to someone,
// so it has to be ns1 and c1 or ns2 and c2
if !foreignOrg && !intraOrg {
assert.True(t, (pp.Namespace == "ns1" && pp.CollectionName == "c2") || (pp.Namespace == "ns2" && pp.CollectionName == "c1"))
} else {
assert.True(t, (pp.Namespace == "ns1" && pp.CollectionName == "c1") || (pp.Namespace == "ns2" && pp.CollectionName == "c2"))
}
// Ensure MaxPeers is maxInternalPeers which is 2
//and MinAck is minInternalPeers which is 1
assert.Equal(t, 2, sc.MaxPeers)
assert.Equal(t, 1, sc.MinAck)
}
i := 0
for dis := range sendings {
assertACL(dis.PrivatePayload, dis.SendCriteria)
i++
if i == 8 {
if i == 4 {
break
}
}
Expand All @@ -157,5 +128,5 @@ func TestDistributor(t *testing.T) {
g.err = nil
err = d.Distribute("tx1", pvtData[0].WriteSet, cs)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Failed disseminating 4 out of 4 private RWSets")
assert.Contains(t, err.Error(), "Failed disseminating 2 out of 2 private RWSets")
}
4 changes: 2 additions & 2 deletions gossip/privdata/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err
if f == nil {
return nil, errors.Errorf("Failed obtaining collection filter for channel %s, txID %s, collection %s", p.channel, digest.TxId, digest.Collection)
}
anyPeerInCollection, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature, _ bool) bool {
anyPeerInCollection, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature) bool {
return f(fcommon.SignedData{
Signature: peerSignature.Signature,
Identity: peerSignature.PeerIdentity,
Expand All @@ -394,7 +394,7 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err
return nil, errors.WithStack(err)
}
sources := sources
endorserPeer, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature, _ bool) bool {
endorserPeer, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature) bool {
for _, endorsement := range sources {
if bytes.Equal(endorsement.Endorser, []byte(peerSignature.PeerIdentity)) {
return true
Expand Down
8 changes: 2 additions & 6 deletions gossip/privdata/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ func (mc *mockCollectionAccess) AccessFilter() privdata.Filter {
return policy2Filter[mc]
}

func (mc *mockCollectionAccess) RequiredExternalPeerCount() int {
return 0
}

func (mc *mockCollectionAccess) RequiredInternalPeerCount() int {
func (mc *mockCollectionAccess) RequiredPeerCount() int {
return 0
}

Expand Down Expand Up @@ -164,7 +160,7 @@ func (g *mockGossip) PeerFilter(channel common.ChainID, messagePredicate api.Sub
return func(member discovery.NetworkMember) bool {
return messagePredicate(api.PeerSignature{
PeerIdentity: api.PeerIdentityType(member.PKIid),
}, true)
})
}, nil
}

Expand Down
17 changes: 4 additions & 13 deletions sampleconfig/core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,23 +186,14 @@ peer:
# pullRetryThreshold determines the maximum duration of time private data corresponding for a given block
# would be attempted to be pulled from peers until the block would be committed without the private data
pullRetryThreshold: 60s
# minInternalPeers defines the minimum number of peers from the peer's organization
# minPeers defines the minimum number of peers
# that an endorsement containing private data would target for dissemination.
# The endorsement would fail if an insufficient number of acknowledgements from
# peers of the peer's organization wasn't obtained.
minInternalPeers: 1
# maxInternalPeers defines the maximum number of peers from the peer's organization
minPeers: 1
# maxPeers defines the maximum number of peers
# that an endorsement containing private data would target for dissemination
maxInternalPeers: 1

# minInternalPeers defines the minimum number of peers foreign to the peer's organization
# that an endorsement containing private data would target for dissemination
# The endorsement would fail if an insufficient number of acknowledgements from
# peers foreign to the peer's organization wasn't obtained.
minExternalPeers: 1
# maxInternalPeers defines the maximum number of peers foreign to the peer's organization
# that an endorsement containing private data would target for dissemination
maxExternalPeers: 1
maxPeers: 1

# EventHub related configuration
events:
Expand Down

0 comments on commit 5465089

Please sign in to comment.