Skip to content

Commit

Permalink
[FAB-6379] Prioritize pulling from endorsers
Browse files Browse the repository at this point in the history
This change set makes the pull mechanism of private data to try
select endorsers of a certain private RWSet over others.

Change-Id: If5f2e0a0031a208fee06c749216f708b81342847
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Oct 13, 2017
1 parent ff714cd commit a21d89d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 20 deletions.
53 changes: 41 additions & 12 deletions gossip/privdata/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (p *puller) waitForMembership() []discovery.NetworkMember {

func (p *puller) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) {
// computeFilters returns a map from a digest to a routing filter
dig2Filter, err := p.computeFilters(dig2src.keys())
dig2Filter, err := p.computeFilters(dig2src)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -216,6 +216,7 @@ func (p *puller) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) {
logger.Warning("Do not know any peer in the channel(", p.channel, ") that matches the policies , aborting")
return nil, errors.New("Empty membership")
}
members = randomizeMemberList(members)
var res []*proto.PvtDataElement
// Distribute requests to peers, and obtain subscriptions for all their messages
// matchDigestToPeer returns a map from a peer to the digests which we would ask it for
Expand Down Expand Up @@ -308,10 +309,14 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
}
res := make(map[remotePeer][]proto.PvtDataDigest)
// Create a mapping between peer and digests to ask for
members = randomizeMemberList(members)
for dig, filt := range dig2Filter {
// Find the first peer that matches the filter
selectedPeer := filter.First(members, filt)
for dig, collectionFilter := range dig2Filter {
// Find a peer that is an endorser
selectedPeer := filter.First(members, collectionFilter.endorser)
if selectedPeer == nil {
logger.Debug("No endorser found for", dig)
// Find some peer that is in the collection
selectedPeer = filter.First(members, collectionFilter.anyPeer)
}
if selectedPeer == nil {
logger.Debug("No peer matches txID", dig.TxId, "collection", dig.Collection)
continue
Expand All @@ -332,12 +337,18 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
return res, noneSelectedPeers
}

type digestToFilterMapping map[proto.PvtDataDigest]filter.RoutingFilter
type collectionRoutingFilter struct {
anyPeer filter.RoutingFilter
endorser filter.RoutingFilter
}

type digestToFilterMapping map[proto.PvtDataDigest]collectionRoutingFilter

func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter {
var filters []filter.RoutingFilter
for _, f := range dig2f {
filters = append(filters, f)
filters = append(filters, f.endorser)
filters = append(filters, f.anyPeer)
}
return filters
}
Expand All @@ -355,9 +366,9 @@ func (dig2Filter digestToFilterMapping) String() string {
return buffer.String()
}

func (p *puller) computeFilters(digests []*proto.PvtDataDigest) (digestToFilterMapping, error) {
filters := make(map[proto.PvtDataDigest]filter.RoutingFilter)
for _, digest := range digests {
func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, error) {
filters := make(map[proto.PvtDataDigest]collectionRoutingFilter)
for digest, sources := range dig2src {
collection := p.cs.RetrieveCollectionAccessPolicy(fcommon.CollectionCriteria{
Channel: p.channel,
TxId: digest.TxId,
Expand All @@ -371,17 +382,35 @@ func (p *puller) computeFilters(digests []*proto.PvtDataDigest) (digestToFilterM
if f == nil {
return nil, errors.Errorf("Failed obtaining collection filter for channel %s, txID %s, collection %s", p.channel, digest.TxId, digest.Collection)
}
rf, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature, _ bool) bool {
anyPeerInCollection, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature, _ bool) bool {
return f(fcommon.SignedData{
Signature: peerSignature.Signature,
Identity: peerSignature.PeerIdentity,
Data: peerSignature.Message,
})
})

if err != nil {
return nil, errors.WithStack(err)
}
sources := sources
endorserPeer, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature, _ bool) bool {
for _, endorsement := range sources {
if bytes.Equal(endorsement.Endorser, []byte(peerSignature.PeerIdentity)) {
return true
}
}
return false
})

if err != nil {
return nil, errors.WithStack(err)
}
filters[*digest] = rf

filters[*digest] = collectionRoutingFilter{
anyPeer: anyPeerInCollection,
endorser: endorserPeer,
}
}
return filters, nil
}
Expand Down
54 changes: 54 additions & 0 deletions gossip/privdata/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,57 @@ func TestPullerRetries(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, transientStore, fetched)
}

func TestPullerPreferEndorsers(t *testing.T) {
t.Parallel()
// Scenario: p1 pulls from p2, p3, p4, p5
// and the only endorser for col1 is p3, so it should be selected
// at the top priority for col1.
// for col2, only p2 should have the data, but its not an endorser of the data.
gn := &gossipNetwork{}

policyStore := newCollectionStore().withPolicy("col1").thatMapsTo("p1", "p2", "p3", "p4", "p5").withPolicy("col2").thatMapsTo("p1", "p2")
p1 := gn.newPuller("p1", policyStore, "p2", "p3", "p4", "p5")

p3TransientStore := newPRWSet()
p2TransientStore := newPRWSet()

p2 := gn.newPuller("p2", policyStore)
p3 := gn.newPuller("p3", policyStore)
gn.newPuller("p4", policyStore)
gn.newPuller("p5", policyStore)

dig1 := &proto.PvtDataDigest{
TxId: "txID1",
Collection: "col1",
Namespace: "ns1",
}

dig2 := &proto.PvtDataDigest{
TxId: "txID1",
Collection: "col2",
Namespace: "ns1",
}

// We only define an action for dig2 on p2, and the test would fail with panic if any other peer is asked for
// a private RWSet on dig2
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig2).Return(p2TransientStore)

// We only define an action for dig1 on p3, and the test would fail with panic if any other peer is asked for
// a private RWSet on dig1
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(p3TransientStore)

dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(dig1).toSources("p3").mapDigest(dig2).toSources().create()
fetchedMessages, err := p1.fetch(d2s)
assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0])
rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1])
rws3 := util.PrivateRWSet(fetchedMessages[1].Payload[0])
rws4 := util.PrivateRWSet(fetchedMessages[1].Payload[1])
fetched := []util.PrivateRWSet{rws1, rws2, rws3, rws4}
assert.Contains(t, fetched, p3TransientStore[0])
assert.Contains(t, fetched, p3TransientStore[1])
assert.Contains(t, fetched, p2TransientStore[0])
assert.Contains(t, fetched, p2TransientStore[1])
}
11 changes: 3 additions & 8 deletions gossip/privdata/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,19 +250,14 @@ func (f *digestsAndSourceFactory) mapDigest(dig *gossip2.PvtDataDigest) *digests
return f
}

func (f *digestsAndSourceFactory) toSources(orgs ...string) *digestsAndSourceFactory {
func (f *digestsAndSourceFactory) toSources(peers ...string) *digestsAndSourceFactory {
if f.d2s == nil {
f.d2s = make(dig2sources)
}
var endorsements []*peer.Endorsement
for i, org := range orgs {
sId := &msp.SerializedIdentity{
Mspid: org,
IdBytes: []byte(fmt.Sprintf("p%d.%s", i, org)),
}
b, _ := proto.Marshal(sId)
for _, p := range peers {
endorsements = append(endorsements, &peer.Endorsement{
Endorser: b,
Endorser: []byte(p),
})
}
f.d2s[f.lastDig] = endorsements
Expand Down

0 comments on commit a21d89d

Please sign in to comment.