Skip to content

Commit

Permalink
Data Exchange plugin can track multiple listeners
Browse files Browse the repository at this point in the history
Each listener will ignore events outside its namespace.

Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Jun 9, 2022
1 parent de9ad86 commit 25c0176
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 18 deletions.
4 changes: 4 additions & 0 deletions internal/dataexchange/ffdx/dxevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package ffdx

import (
"strings"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
Expand Down Expand Up @@ -154,8 +156,10 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) {
var hash *fftypes.Bytes32
hash, err = fftypes.ParseBytes32(h.ctx, msg.Hash)
if err == nil {
pathParts := strings.Split(msg.Path, "/")
e.dxType = dataexchange.DXEventTypePrivateBlobReceived
e.privateBlobReceived = &dataexchange.PrivateBlobReceived{
Namespace: pathParts[0],
PeerID: msg.Sender,
Hash: *hash,
Size: msg.Size,
Expand Down
16 changes: 13 additions & 3 deletions internal/dataexchange/ffdx/ffdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
type FFDX struct {
ctx context.Context
capabilities *dataexchange.Capabilities
callbacks dataexchange.Callbacks
callbacks callbacks
client *resty.Client
wsconn wsclient.WSClient
needsInit bool
Expand All @@ -49,6 +49,16 @@ type FFDX struct {
ackChannel chan *ack
}

type callbacks struct {
listeners []dataexchange.Callbacks
}

func (cb *callbacks) DXEvent(event dataexchange.DXEvent) {
for _, cb := range cb.listeners {
cb.DXEvent(event)
}
}

const (
dxHTTPHeaderHash = "dx-hash"
dxHTTPHeaderSize = "dx-size"
Expand Down Expand Up @@ -138,8 +148,8 @@ func (h *FFDX) SetNodes(nodes []fftypes.JSONObject) {
h.nodes = nodes
}

func (h *FFDX) RegisterListener(callbacks dataexchange.Callbacks) {
h.callbacks = callbacks
func (h *FFDX) RegisterListener(listener dataexchange.Callbacks) {
h.callbacks.listeners = append(h.callbacks.listeners, listener)
}

func (h *FFDX) Start() error {
Expand Down
10 changes: 5 additions & 5 deletions internal/dataexchange/ffdx/ffdx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func TestEvents(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"0"}`, string(msg))

mcb := &dataexchangemocks.Callbacks{}
h.callbacks = mcb
h.RegisterListener(mcb)

mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool {
return ev.NamespacedID() == "1" &&
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestEventsWithManifest(t *testing.T) {
assert.Equal(t, `{"action":"ack","id":"0"}`, string(msg))

mcb := &dataexchangemocks.Callbacks{}
h.callbacks = mcb
h.RegisterListener(mcb)

mcb.On("DXEvent", mock.MatchedBy(func(ev dataexchange.DXEvent) bool {
return ev.NamespacedID() == "1" &&
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestEventLoopReceiveClosed(t *testing.T) {
wsm := &wsmocks.WSClient{}
h := &FFDX{
ctx: context.Background(),
callbacks: dxc,
callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}},
wsconn: wsm,
}
r := make(chan []byte)
Expand All @@ -601,7 +601,7 @@ func TestEventLoopSendClosed(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
h := &FFDX{
ctx: ctx,
callbacks: dxc,
callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}},
wsconn: wsm,
ackChannel: make(chan *ack, 1),
}
Expand All @@ -622,7 +622,7 @@ func TestEventLoopClosedContext(t *testing.T) {
cancel()
h := &FFDX{
ctx: ctx,
callbacks: dxc,
callbacks: callbacks{listeners: []dataexchange.Callbacks{dxc}},
wsconn: wsm,
}
r := make(chan []byte, 1)
Expand Down
10 changes: 9 additions & 1 deletion internal/events/dx_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (em *eventManager) messageReceived(dx dataexchange.Plugin, event dataexchan

mr := event.MessageReceived()

// De-serializae the transport wrapper
// De-serialize the transport wrapper
var wrapper *core.TransportWrapper
err := json.Unmarshal(mr.Data, &wrapper)
if err != nil {
Expand All @@ -198,6 +198,10 @@ func (em *eventManager) messageReceived(dx dataexchange.Plugin, event dataexchan
event.AckWithManifest("")
return
}
if wrapper.Batch.Namespace != em.namespace {
log.L(em.ctx).Debugf("Ignoring batch from wrong namespace '%s'", wrapper.Batch.Namespace)
return
}
l.Infof("Private batch received from %s peer '%s' (len=%d)", dx.Name(), mr.PeerID, len(mr.Data))

manifestString, err := em.privateBatchReceived(mr.PeerID, wrapper.Batch, wrapper.Group)
Expand All @@ -218,6 +222,10 @@ func (em *eventManager) privateBlobReceived(dx dataexchange.Plugin, event dataex
event.Ack() // Still confirm the event
return
}
if br.Namespace != em.namespace {
log.L(em.ctx).Debugf("Ignoring blob from wrong namespace '%s'", br.Namespace)
return
}

// Dispatch to the blob receiver for efficient batch DB operations
em.blobReceiver.blobReceived(em.ctx, &blobNotification{
Expand Down
3 changes: 3 additions & 0 deletions internal/events/dx_callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/hyperledger/firefly-common/pkg/fftypes"
Expand Down Expand Up @@ -99,7 +100,9 @@ func newMessageReceived(peerID string, data []byte, expectedManifest string) *da

func newPrivateBlobReceivedNoAck(peerID string, hash *fftypes.Bytes32, size int64, payloadRef string) *dataexchangemocks.DXEvent {
mde := &dataexchangemocks.DXEvent{}
pathParts := strings.Split(payloadRef, "/")
mde.On("PrivateBlobReceived").Return(&dataexchange.PrivateBlobReceived{
Namespace: pathParts[0],
PeerID: peerID,
Hash: *hash,
Size: size,
Expand Down
4 changes: 1 addition & 3 deletions internal/identity/tbd/tbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
// TBD is a null implementation of the Identity Interface to avoid breaking configuration created with the previous "onchain" plugin
type TBD struct {
capabilities *identity.Capabilities
callbacks identity.Callbacks
}

func (tbd *TBD) Name() string {
Expand All @@ -38,8 +37,7 @@ func (tbd *TBD) Init(ctx context.Context, config config.Section) (err error) {
return nil
}

func (tbd *TBD) RegisterListener(callbacks identity.Callbacks) {
tbd.callbacks = callbacks
func (tbd *TBD) RegisterListener(listener identity.Callbacks) {
}

func (tbd *TBD) Start() error {
Expand Down
4 changes: 1 addition & 3 deletions internal/sharedstorage/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
type IPFS struct {
ctx context.Context
capabilities *sharedstorage.Capabilities
callbacks sharedstorage.Callbacks
apiClient *resty.Client
gwClient *resty.Client
}
Expand Down Expand Up @@ -68,8 +67,7 @@ func (i *IPFS) Init(ctx context.Context, config config.Section) error {
return nil
}

func (i *IPFS) RegisterListener(callbacks sharedstorage.Callbacks) {
i.callbacks = callbacks
func (i *IPFS) RegisterListener(listener sharedstorage.Callbacks) {
}

func (i *IPFS) Capabilities() *sharedstorage.Capabilities {
Expand Down
3 changes: 2 additions & 1 deletion pkg/dataexchange/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Plugin interface {
SetNodes(nodes []fftypes.JSONObject)

// RegisterListener registers a listener to receive callbacks
RegisterListener(callbacks Callbacks)
RegisterListener(listener Callbacks)

// Data exchange interface must not deliver any events until start is called
Start() error
Expand Down Expand Up @@ -129,6 +129,7 @@ type MessageReceived struct {
}

type PrivateBlobReceived struct {
Namespace string
PeerID string
Hash fftypes.Bytes32
Size int64
Expand Down
2 changes: 1 addition & 1 deletion pkg/identity/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Plugin interface {
Init(ctx context.Context, config config.Section) error

// RegisterListener registers a listener to receive callbacks
RegisterListener(callbacks Callbacks)
RegisterListener(listener Callbacks)

// Blockchain interface must not deliver any events until start is called
Start() error
Expand Down
2 changes: 1 addition & 1 deletion pkg/sharedstorage/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Plugin interface {
Init(ctx context.Context, config config.Section) error

// RegisterListener registers a listener to receive callbacks
RegisterListener(callbacks Callbacks)
RegisterListener(listener Callbacks)

// Capabilities returns capabilities - not called until after Init
Capabilities() *Capabilities
Expand Down

0 comments on commit 25c0176

Please sign in to comment.