Skip to content

Commit

Permalink
[FAB-6100] - Extend collection criteria with ns
Browse files Browse the repository at this point in the history
Since collections will be defined per chaincode bases collection
criteria data structure has to encapsulate in it the notion of the
namespace to enable required granularity. This commit changes proto
messages to capture the chaincode namespace.

Change-Id: I012ad0b1f46838ca553f0a55f26e12c5b4055d72
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Sep 12, 2017
1 parent f46a2d4 commit 3a8d54c
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 162 deletions.
6 changes: 4 additions & 2 deletions gossip/privdata/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (

type PrivateDataRetriever interface {
// CollectionRWSet returns the bytes of CollectionPvtReadWriteSet for a given txID and collection from the transient store
CollectionRWSet(txID, collection string) []util.PrivateRWSet
CollectionRWSet(txID, collection, namespace string) []util.PrivateRWSet
}

// gossip defines capabilities that the gossip module gives the Coordinator
Expand Down Expand Up @@ -130,6 +130,7 @@ func (p *puller) createResponse(message proto.ReceivedMessage) []*proto.PvtDataE
Channel: p.channel,
Collection: dig.Collection,
TxId: dig.TxId,
Namespace: dig.Namespace,
})
if pol == nil {
logger.Debug("No policy found for channel", p.channel, ", collection", dig.Collection, "txID", dig.TxId, "skipping...")
Expand All @@ -148,7 +149,7 @@ func (p *puller) createResponse(message proto.ReceivedMessage) []*proto.PvtDataE
}
// TODO: dig in the ledger if it's not in the transient store
// Else, it's eligible to receive the private data, so we append it to the returned slice
rwSets := p.CollectionRWSet(dig.TxId, dig.Collection)
rwSets := p.CollectionRWSet(dig.TxId, dig.Collection, dig.Namespace)
logger.Debug("Found", len(rwSets), "for TxID", dig.TxId, ", collection", dig.Collection, "for", message.GetConnectionInfo().Endpoint)
if len(rwSets) == 0 {
continue
Expand Down Expand Up @@ -353,6 +354,7 @@ func (p *puller) computeFilters(req *proto.RemotePvtDataRequest) (digestToFilter
Channel: p.channel,
TxId: digest.TxId,
Collection: digest.Collection,
Namespace: digest.Namespace,
})
if pol == nil {
return nil, errors.Errorf("Failed obtaining policy for channel %s, txID %s, collection %s", p.channel, digest.TxId, digest.Collection)
Expand Down
42 changes: 22 additions & 20 deletions gossip/privdata/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ type dataRetrieverMock struct {
mock.Mock
}

func (dr *dataRetrieverMock) CollectionRWSet(txID, collection string) []util.PrivateRWSet {
return dr.Called(txID, collection).Get(0).([]util.PrivateRWSet)
func (dr *dataRetrieverMock) CollectionRWSet(txID, collection, namespace string) []util.PrivateRWSet {
return dr.Called(txID, collection, namespace).Get(0).([]util.PrivateRWSet)
}

type receivedMsg struct {
Expand Down Expand Up @@ -225,15 +225,15 @@ func TestPullerFromOnly1Peer(t *testing.T) {
p2TransientStore := newPRWSet()
policyStore = newPolicyStore().withPolicy("col1").thatMapsTo("p1")
p2 := gn.newPuller("p2", policyStore)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Return(p2TransientStore)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Return(p2TransientStore)

p3 := gn.newPuller("p3", newPolicyStore())
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Run(func(_ mock.Arguments) {
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Run(func(_ mock.Arguments) {
t.Fatal("p3 shouldn't have been selected for pull")
})

fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1"}},
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}},
})
rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0])
rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1])
Expand All @@ -252,15 +252,15 @@ func TestPullerDataNotAvailable(t *testing.T) {

policyStore = newPolicyStore().withPolicy("col1").thatMapsTo("p1")
p2 := gn.newPuller("p2", policyStore)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Return([]util.PrivateRWSet{})
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Return([]util.PrivateRWSet{})

p3 := gn.newPuller("p3", newPolicyStore())
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Run(func(_ mock.Arguments) {
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Run(func(_ mock.Arguments) {
t.Fatal("p3 shouldn't have been selected for pull")
})

fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1"}},
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}},
})
assert.Empty(t, fetchedMessages)
assert.NoError(t, err)
Expand All @@ -273,7 +273,7 @@ func TestPullerNoPeersKnown(t *testing.T) {
policyStore := newPolicyStore().withPolicy("col1").thatMapsTo("p2").withPolicy("col1").thatMapsTo("p3")
p1 := gn.newPuller("p1", policyStore)
fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1"}},
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}},
})
assert.Empty(t, fetchedMessages)
assert.Error(t, err)
Expand All @@ -288,7 +288,7 @@ func TestPullPeerFilterError(t *testing.T) {
p1 := gn.newPuller("p1", policyStore)
gn.peers[0].On("PeerFilter", mock.Anything, mock.Anything).Return(nil, errors.New("Failed obtaining filter"))
fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1"}},
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}},
})
assert.Error(t, err)
assert.Contains(t, err.Error(), "Failed obtaining filter")
Expand All @@ -305,13 +305,13 @@ func TestPullerPeerNotEligible(t *testing.T) {

policyStore = newPolicyStore().withPolicy("col1").thatMapsTo("p2")
p2 := gn.newPuller("p2", policyStore)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Run(func(_ mock.Arguments) {
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Run(func(_ mock.Arguments) {
t.Fatal("p2 shouldn't have approved the pull")
})

policyStore = newPolicyStore().withPolicy("col1").thatMapsTo("p3")
p3 := gn.newPuller("p3", policyStore)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Run(func(_ mock.Arguments) {
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Run(func(_ mock.Arguments) {
t.Fatal("p3 shouldn't have approved the pull")
})

Expand All @@ -334,15 +334,17 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) {
p2TransientStore := newPRWSet()
policyStore = newPolicyStore().withPolicy("col2").thatMapsTo("p1")
p2 := gn.newPuller("p2", policyStore)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col2").Return(p2TransientStore)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col2", "ns1").Return(p2TransientStore)

p3TransientStore := newPRWSet()
policyStore = newPolicyStore().withPolicy("col3").thatMapsTo("p1")
p3 := gn.newPuller("p3", policyStore)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col3").Return(p3TransientStore)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col3", "ns1").Return(p3TransientStore)

fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{
Digests: []*proto.PvtDataDigest{{Collection: "col2", TxId: "txID1"}, {Collection: "col3", TxId: "txID1"}},
Digests: []*proto.PvtDataDigest{
{Collection: "col2", TxId: "txID1", Namespace: "ns1"},
{Collection: "col3", TxId: "txID1", Namespace: "ns1"}},
})
assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0])
Expand Down Expand Up @@ -373,26 +375,26 @@ func TestPullerRetries(t *testing.T) {
// p2
policyStore = newPolicyStore().withPolicy("col1").thatMapsTo("p2")
p2 := gn.newPuller("p2", policyStore)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Return(transientStore)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Return(transientStore)

// p3
policyStore = newPolicyStore().withPolicy("col1").thatMapsTo("p1")
p3 := gn.newPuller("p3", policyStore)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Return(transientStore)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Return(transientStore)

// p4
policyStore = newPolicyStore().withPolicy("col1").thatMapsTo("p4")
p4 := gn.newPuller("p4", policyStore)
p4.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Return(transientStore)
p4.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Return(transientStore)

// p5
policyStore = newPolicyStore().withPolicy("col1").thatMapsTo("p5")
p5 := gn.newPuller("p5", policyStore)
p5.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1").Return(transientStore)
p5.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", "txID1", "col1", "ns1").Return(transientStore)

// Fetch from someone
fetchedMessages, err := p1.fetch(&proto.RemotePvtDataRequest{
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1"}},
Digests: []*proto.PvtDataDigest{{Collection: "col1", TxId: "txID1", Namespace: "ns1"}},
})
assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0])
Expand Down
Loading

0 comments on commit 3a8d54c

Please sign in to comment.