Skip to content

Commit

Permalink
Test private messaging between two nodes on the same FireFly
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Aug 3, 2022
1 parent 18ff879 commit 198ac15
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 15 deletions.
9 changes: 7 additions & 2 deletions internal/privatemessaging/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func TestDispatchedUnpinnedMessageOK(t *testing.T) {
node2 := newTestNode("node2", newTestOrg("remoteorg"))

mim := pm.identity.(*identitymanagermocks.Manager)
mim.On("GetMultipartyRootOrg", pm.ctx).Return(localOrg, nil)
mim.On("GetLocalNode", pm.ctx).Return(node1, nil)

mdx := pm.exchange.(*dataexchangemocks.Plugin)
mdx.On("SendMessage", pm.ctx, mock.Anything, "node2-peer", mock.Anything).Return(nil)
Expand Down Expand Up @@ -700,6 +700,7 @@ func TestSendDataTransferBlobsFail(t *testing.T) {

localOrg := newTestOrg("localorg")
groupID := fftypes.NewRandB32()
node1 := newTestNode("node1", localOrg)
node2 := newTestNode("node2", newTestOrg("remoteorg"))
nodes := []*core.Identity{node2}

Expand All @@ -709,6 +710,7 @@ func TestSendDataTransferBlobsFail(t *testing.T) {
return true
})).Return(nil)
mim.On("GetMultipartyRootOrg", pm.ctx).Return(localOrg, nil)
mim.On("GetLocalNode", pm.ctx).Return(node1, nil)

mdi := pm.database.(*databasemocks.Plugin)
mdi.On("GetBlobMatchingHash", pm.ctx, mock.Anything).Return(nil, fmt.Errorf("pop"))
Expand Down Expand Up @@ -753,11 +755,12 @@ func TestSendDataTransferFail(t *testing.T) {

localOrg := newTestOrg("localorg")
groupID := fftypes.NewRandB32()
node1 := newTestNode("node1", localOrg)
node2 := newTestNode("node2", newTestOrg("remoteorg"))
nodes := []*core.Identity{node2}

mim := pm.identity.(*identitymanagermocks.Manager)
mim.On("GetMultipartyRootOrg", pm.ctx).Return(localOrg, nil)
mim.On("GetLocalNode", pm.ctx).Return(node1, nil)

mom := pm.operations.(*operationmocks.Manager)
mom.On("AddOrReuseOperation", pm.ctx, mock.Anything).Return(nil)
Expand Down Expand Up @@ -802,6 +805,7 @@ func TestSendDataTransferInsertOperationFail(t *testing.T) {

localOrg := newTestOrg("localorg")
groupID := fftypes.NewRandB32()
node1 := newTestNode("node1", localOrg)
node2 := newTestNode("node2", newTestOrg("remoteorg"))
nodes := []*core.Identity{node2}

Expand All @@ -811,6 +815,7 @@ func TestSendDataTransferInsertOperationFail(t *testing.T) {
return true
})).Return(nil)
mim.On("GetMultipartyRootOrg", pm.ctx).Return(localOrg, nil)
mim.On("GetLocalNode", pm.ctx).Return(node1, nil)

mom := pm.operations.(*operationmocks.Manager)
mom.On("AddOrReuseOperation", pm.ctx, mock.Anything).Return(fmt.Errorf("pop"))
Expand Down
6 changes: 3 additions & 3 deletions internal/privatemessaging/privatemessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,16 @@ func (pm *privateMessaging) sendData(ctx context.Context, tw *core.TransportWrap
l := log.L(ctx)
batch := tw.Batch

// Lookup the local org
localOrg, err := pm.identity.GetMultipartyRootOrg(ctx)
// Lookup the local node
localNode, err := pm.identity.GetLocalNode(ctx)
if err != nil {
return err
}

// Write it to the dataexchange for each member
for i, node := range nodes {

if node.Parent.Equals(localOrg.ID) {
if node.ID.Equals(localNode.ID) {
l.Debugf("Skipping send of batch for local node %s for group=%s node=%s (%d/%d)", batch.ID, batch.Group, node.ID, i+1, len(nodes))
continue
}
Expand Down
10 changes: 5 additions & 5 deletions internal/privatemessaging/privatemessaging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestDispatchBatchWithBlobs(t *testing.T) {
mim := pm.identity.(*identitymanagermocks.Manager)
mom := pm.operations.(*operationmocks.Manager)

mim.On("GetMultipartyRootOrg", pm.ctx).Return(localOrg, nil)
mim.On("GetLocalNode", pm.ctx).Return(node1, nil)
mdi.On("GetGroupByHash", pm.ctx, "ns1", groupID).Return(&core.Group{
Hash: fftypes.NewRandB32(),
GroupIdentity: core.GroupIdentity{
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestSendAndSubmitBatchUnregisteredNode(t *testing.T) {
},
}, nil)

mim.On("GetMultipartyRootOrg", pm.ctx).Return(nil, fmt.Errorf("pop"))
mim.On("GetLocalNode", pm.ctx).Return(nil, fmt.Errorf("pop"))

err := pm.dispatchPinnedBatch(pm.ctx, &batch.DispatchState{
Persisted: core.BatchPersisted{
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestSendSubmitInsertOperationFail(t *testing.T) {
node2 := newTestNode("node2", newTestOrg("remoteorg"))

mim := pm.identity.(*identitymanagermocks.Manager)
mim.On("GetMultipartyRootOrg", pm.ctx).Return(localOrg, nil)
mim.On("GetLocalNode", pm.ctx).Return(node1, nil)

mdi := pm.database.(*databasemocks.Plugin)
mim.On("CachedIdentityLookupByID", pm.ctx, node1.ID).Return(node1, nil).Once()
Expand Down Expand Up @@ -367,7 +367,7 @@ func TestSendSubmitBlobTransferFail(t *testing.T) {
blob1 := fftypes.NewRandB32()

mim := pm.identity.(*identitymanagermocks.Manager)
mim.On("GetMultipartyRootOrg", pm.ctx).Return(localOrg, nil)
mim.On("GetLocalNode", pm.ctx).Return(node1, nil)

mdi := pm.database.(*databasemocks.Plugin)
mim.On("CachedIdentityLookupByID", pm.ctx, node1.ID).Return(node1, nil).Once()
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestWriteTransactionSubmitBatchPinFail(t *testing.T) {
blob1 := fftypes.NewRandB32()

mim := pm.identity.(*identitymanagermocks.Manager)
mim.On("GetMultipartyRootOrg", pm.ctx).Return(localOrg, nil)
mim.On("GetLocalNode", pm.ctx).Return(node1, nil)

mdi := pm.database.(*databasemocks.Plugin)
mim.On("CachedIdentityLookupByID", pm.ctx, node1.ID).Return(node1, nil).Once()
Expand Down
13 changes: 8 additions & 5 deletions test/e2e/multiparty/multi_tenancy.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,17 @@ func (suite *NamespaceAliasSuite) TestMultiTenancy() {
assert.Equal(suite.T(), 202, resp.StatusCode())
e2e.WaitForMessageConfirmed(suite.T(), receivedBob, core.MessageTypePrivate)
e2e.WaitForMessageConfirmed(suite.T(), receivedCharlie, core.MessageTypePrivate)

// Note: cannot send Alice -> Bob because they share a parent org.
// TODO: should this restriction be changed?
// Alice -> Bob
resp, err = clientAlice.PrivateMessage("topic", data, toBob, "", core.TransactionTypeBatchPin, false, suite.testState.startTime)
require.NoError(suite.T(), err)
assert.Equal(suite.T(), 202, resp.StatusCode())
e2e.WaitForMessageConfirmed(suite.T(), receivedAlice, core.MessageTypePrivate)
e2e.WaitForMessageConfirmed(suite.T(), receivedBob, core.MessageTypePrivate)

messages := clientAlice.GetMessages(suite.T(), suite.testState.startTime, core.MessageTypePrivate, "topic")
assert.Len(suite.T(), messages, 2)
assert.Len(suite.T(), messages, 3)
messages = clientBob.GetMessages(suite.T(), suite.testState.startTime, core.MessageTypePrivate, "topic")
assert.Len(suite.T(), messages, 1)
assert.Len(suite.T(), messages, 2)
messages = clientCharlie.GetMessages(suite.T(), suite.testState.startTime, core.MessageTypePrivate, "topic")
assert.Len(suite.T(), messages, 3)
}

0 comments on commit 198ac15

Please sign in to comment.