diff --git a/internal/dataexchange/ffdx/dxevent.go b/internal/dataexchange/ffdx/dxevent.go index cf91e3e08d..b125fe7b26 100644 --- a/internal/dataexchange/ffdx/dxevent.go +++ b/internal/dataexchange/ffdx/dxevent.go @@ -17,6 +17,8 @@ package ffdx import ( + "strings" + "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" @@ -154,8 +156,10 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) { var hash *fftypes.Bytes32 hash, err = fftypes.ParseBytes32(h.ctx, msg.Hash) if err == nil { + pathParts := strings.Split(msg.Path, "/") e.dxType = dataexchange.DXEventTypePrivateBlobReceived e.privateBlobReceived = &dataexchange.PrivateBlobReceived{ + Namespace: pathParts[0], PeerID: msg.Sender, Hash: *hash, Size: msg.Size, diff --git a/internal/dataexchange/ffdx/ffdx.go b/internal/dataexchange/ffdx/ffdx.go index 1447a27933..eed9b14b79 100644 --- a/internal/dataexchange/ffdx/ffdx.go +++ b/internal/dataexchange/ffdx/ffdx.go @@ -39,7 +39,7 @@ import ( type FFDX struct { ctx context.Context capabilities *dataexchange.Capabilities - callbacks dataexchange.Callbacks + callbacks callbacks client *resty.Client wsconn wsclient.WSClient needsInit bool @@ -49,6 +49,16 @@ type FFDX struct { ackChannel chan *ack } +type callbacks struct { + listeners []dataexchange.Callbacks +} + +func (cb *callbacks) DXEvent(event dataexchange.DXEvent) { + for _, cb := range cb.listeners { + cb.DXEvent(event) + } +} + const ( dxHTTPHeaderHash = "dx-hash" dxHTTPHeaderSize = "dx-size" @@ -138,8 +148,8 @@ func (h *FFDX) SetNodes(nodes []fftypes.JSONObject) { h.nodes = nodes } -func (h *FFDX) RegisterListener(callbacks dataexchange.Callbacks) { - h.callbacks = callbacks +func (h *FFDX) RegisterListener(listener dataexchange.Callbacks) { + h.callbacks.listeners = append(h.callbacks.listeners, listener) } func (h *FFDX) Start() error { diff --git a/internal/dataexchange/ffdx/ffdx_test.go b/internal/dataexchange/ffdx/ffdx_test.go index 04281bdbfc..25a37476dd 100644 --- a/internal/dataexchange/ffdx/ffdx_test.go +++ b/internal/dataexchange/ffdx/ffdx_test.go @@ -443,7 +443,7 @@ func TestEvents(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"0"}`, string(msg)) mcb := &dataexchangemocks.Callbacks{} - h.callbacks = mcb + h.RegisterListener(mcb) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.NamespacedID() == "1" && @@ -557,7 +557,7 @@ func TestEventsWithManifest(t *testing.T) { assert.Equal(t, `{"action":"ack","id":"0"}`, string(msg)) mcb := &dataexchangemocks.Callbacks{} - h.callbacks = mcb + h.RegisterListener(mcb) mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool { return ev.NamespacedID() == "1" && @@ -585,7 +585,7 @@ func TestEventLoopReceiveClosed(t *testing.T) { wsm := &wsmocks.WSClient{} h := &FFDX{ ctx: context.Background(), - callbacks: dxc, + callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}}, wsconn: wsm, } r := make(chan []byte) @@ -601,7 +601,7 @@ func TestEventLoopSendClosed(t *testing.T) { ctx, cancelCtx := context.WithCancel(context.Background()) h := &FFDX{ ctx: ctx, - callbacks: dxc, + callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}}, wsconn: wsm, ackChannel: make(chan *ack, 1), } @@ -622,7 +622,7 @@ func TestEventLoopClosedContext(t *testing.T) { cancel() h := &FFDX{ ctx: ctx, - callbacks: dxc, + callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}}, wsconn: wsm, } r := make(chan []byte, 1) diff --git a/internal/events/dx_callbacks.go b/internal/events/dx_callbacks.go index 1b91174bcd..a873fa9472 100644 --- a/internal/events/dx_callbacks.go +++ b/internal/events/dx_callbacks.go @@ -185,7 +185,7 @@ func (em *eventManager) messageReceived(dx dataexchange.Plugin, event dataexchan mr := event.MessageReceived() - // De-serializae the transport wrapper + // De-serialize the transport wrapper var wrapper *core.TransportWrapper err := json.Unmarshal(mr.Data, &wrapper) if err != nil { @@ -198,6 +198,10 @@ func (em *eventManager) messageReceived(dx dataexchange.Plugin, event dataexchan event.AckWithManifest("") return } + if wrapper.Batch.Namespace != em.namespace { + log.L(em.ctx).Debugf("Ignoring batch from wrong namespace '%s'", wrapper.Batch.Namespace) + return + } l.Infof("Private batch received from %s peer '%s' (len=%d)", dx.Name(), mr.PeerID, len(mr.Data)) manifestString, err := em.privateBatchReceived(mr.PeerID, wrapper.Batch, wrapper.Group) @@ -218,6 +222,10 @@ func (em *eventManager) privateBlobReceived(dx dataexchange.Plugin, event dataex event.Ack() // Still confirm the event return } + if br.Namespace != em.namespace { + log.L(em.ctx).Debugf("Ignoring blob from wrong namespace '%s'", br.Namespace) + return + } // Dispatch to the blob receiver for efficient batch DB operations em.blobReceiver.blobReceived(em.ctx, &blobNotification{ diff --git a/internal/events/dx_callbacks_test.go b/internal/events/dx_callbacks_test.go index d64c323467..a24840d86a 100644 --- a/internal/events/dx_callbacks_test.go +++ b/internal/events/dx_callbacks_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "testing" "github.com/hyperledger/firefly-common/pkg/fftypes" @@ -99,7 +100,9 @@ func newMessageReceived(peerID string, data []byte, expectedManifest string) *da func newPrivateBlobReceivedNoAck(peerID string, hash *fftypes.Bytes32, size int64, payloadRef string) *dataexchangemocks.DXEvent { mde := &dataexchangemocks.DXEvent{} + pathParts := strings.Split(payloadRef, "/") mde.On("PrivateBlobReceived").Return(&dataexchange.PrivateBlobReceived{ + Namespace: pathParts[0], PeerID: peerID, Hash: *hash, Size: size, diff --git a/internal/identity/tbd/tbd.go b/internal/identity/tbd/tbd.go index 507d8aedf0..197d9269c6 100644 --- a/internal/identity/tbd/tbd.go +++ b/internal/identity/tbd/tbd.go @@ -26,7 +26,6 @@ import ( // TBD is a null implementation of the Identity Interface to avoid breaking configuration created with the previous "onchain" plugin type TBD struct { capabilities *identity.Capabilities - callbacks identity.Callbacks } func (tbd *TBD) Name() string { @@ -38,8 +37,7 @@ func (tbd *TBD) Init(ctx context.Context, config config.Section) (err error) { return nil } -func (tbd *TBD) RegisterListener(callbacks identity.Callbacks) { - tbd.callbacks = callbacks +func (tbd *TBD) RegisterListener(listener identity.Callbacks) { } func (tbd *TBD) Start() error { diff --git a/internal/sharedstorage/ipfs/ipfs.go b/internal/sharedstorage/ipfs/ipfs.go index e9d4d12020..a7d878ca48 100644 --- a/internal/sharedstorage/ipfs/ipfs.go +++ b/internal/sharedstorage/ipfs/ipfs.go @@ -35,7 +35,6 @@ import ( type IPFS struct { ctx context.Context capabilities *sharedstorage.Capabilities - callbacks sharedstorage.Callbacks apiClient *resty.Client gwClient *resty.Client } @@ -68,8 +67,7 @@ func (i *IPFS) Init(ctx context.Context, config config.Section) error { return nil } -func (i *IPFS) RegisterListener(callbacks sharedstorage.Callbacks) { - i.callbacks = callbacks +func (i *IPFS) RegisterListener(listener sharedstorage.Callbacks) { } func (i *IPFS) Capabilities() *sharedstorage.Capabilities { diff --git a/pkg/dataexchange/plugin.go b/pkg/dataexchange/plugin.go index bdb6e7ff38..3a6db6e4ff 100644 --- a/pkg/dataexchange/plugin.go +++ b/pkg/dataexchange/plugin.go @@ -67,7 +67,7 @@ type Plugin interface { SetNodes(nodes []fftypes.JSONObject) // RegisterListener registers a listener to receive callbacks - RegisterListener(callbacks Callbacks) + RegisterListener(listener Callbacks) // Data exchange interface must not deliver any events until start is called Start() error @@ -129,6 +129,7 @@ type MessageReceived struct { } type PrivateBlobReceived struct { + Namespace string PeerID string Hash fftypes.Bytes32 Size int64 diff --git a/pkg/identity/plugin.go b/pkg/identity/plugin.go index 6f250e6002..59c9f5248d 100644 --- a/pkg/identity/plugin.go +++ b/pkg/identity/plugin.go @@ -34,7 +34,7 @@ type Plugin interface { Init(ctx context.Context, config config.Section) error // RegisterListener registers a listener to receive callbacks - RegisterListener(callbacks Callbacks) + RegisterListener(listener Callbacks) // Blockchain interface must not deliver any events until start is called Start() error diff --git a/pkg/sharedstorage/plugin.go b/pkg/sharedstorage/plugin.go index 1045353ea5..4c65855125 100644 --- a/pkg/sharedstorage/plugin.go +++ b/pkg/sharedstorage/plugin.go @@ -35,7 +35,7 @@ type Plugin interface { Init(ctx context.Context, config config.Section) error // RegisterListener registers a listener to receive callbacks - RegisterListener(callbacks Callbacks) + RegisterListener(listener Callbacks) // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities