Skip to content

Commit

Permalink
Tokens plugin can track multiple listeners
Browse files Browse the repository at this point in the history
Each listener will ignore events outside its namespace.

Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Jun 9, 2022
1 parent 588c8f8 commit dc11215
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 26 deletions.
9 changes: 9 additions & 0 deletions internal/events/token_pool_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (em *eventManager) TokenPoolCreated(ti tokens.Plugin, pool *tokens.TokenPoo
return err
}
if existingPool != nil {
if existingPool.Namespace != em.namespace {
log.L(em.ctx).Debugf("Ignoring token pool from wrong namespace '%s'", existingPool.Namespace)
return nil
}
if existingPool.State == core.TokenPoolStateConfirmed {
return nil // already confirmed
}
Expand All @@ -163,6 +167,11 @@ func (em *eventManager) TokenPoolCreated(ti tokens.Plugin, pool *tokens.TokenPoo
if announcePool, err = em.shouldAnnounce(ctx, pool); err != nil {
return err
} else if announcePool != nil {
if announcePool.Namespace != em.namespace {
log.L(em.ctx).Debugf("Ignoring token pool from wrong namespace '%s'", announcePool.Namespace)
announcePool = nil
return nil
}
return nil // trigger announce after completion of database transaction
}

Expand Down
6 changes: 3 additions & 3 deletions internal/events/token_pool_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func TestTokenPoolCreatedAnnounce(t *testing.T) {
ID: fftypes.NewUUID(),
Input: fftypes.JSONObject{
"id": poolID.String(),
"namespace": "test-ns",
"namespace": "ns1",
"name": "my-pool",
},
},
Expand All @@ -451,8 +451,8 @@ func TestTokenPoolCreatedAnnounce(t *testing.T) {
mdi.On("GetTokenPoolByLocator", em.ctx, "erc1155", "123").Return(nil, nil).Times(2)
mdi.On("GetOperations", em.ctx, mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Once()
mdi.On("GetOperations", em.ctx, mock.Anything).Return(operations, nil, nil).Once()
mbm.On("BroadcastTokenPool", em.ctx, "test-ns", mock.MatchedBy(func(pool *core.TokenPoolAnnouncement) bool {
return pool.Pool.Namespace == "test-ns" && pool.Pool.Name == "my-pool" && *pool.Pool.ID == *poolID
mbm.On("BroadcastTokenPool", em.ctx, "ns1", mock.MatchedBy(func(pool *core.TokenPoolAnnouncement) bool {
return pool.Pool.Namespace == "ns1" && pool.Pool.Name == "my-pool" && *pool.Pool.ID == *poolID
}), false).Return(nil, nil)

err := em.TokenPoolCreated(mti, pool)
Expand Down
4 changes: 4 additions & 0 deletions internal/events/tokens_approved.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (em *eventManager) persistTokenApproval(ctx context.Context, approval *toke
log.L(ctx).Infof("Token approval received for unknown pool '%s' - ignoring: %s", approval.PoolLocator, approval.Event.ProtocolID)
return false, nil
}
if pool.Namespace != em.namespace {
log.L(em.ctx).Debugf("Ignoring token approval from wrong namespace '%s'", pool.Namespace)
return false, nil
}
approval.Namespace = pool.Namespace
approval.Pool = pool.ID

Expand Down
4 changes: 4 additions & 0 deletions internal/events/tokens_transferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (em *eventManager) persistTokenTransfer(ctx context.Context, transfer *toke
log.L(ctx).Infof("Token transfer received for unknown pool '%s' - ignoring: %s", transfer.PoolLocator, transfer.Event.ProtocolID)
return false, nil
}
if pool.Namespace != em.namespace {
log.L(em.ctx).Debugf("Ignoring token transfer from wrong namespace '%s'", pool.Namespace)
return false, nil
}
transfer.Namespace = pool.Namespace
transfer.Pool = pool.ID

Expand Down
43 changes: 40 additions & 3 deletions internal/tokens/fftokens/fftokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,49 @@ import (
type FFTokens struct {
ctx context.Context
capabilities *tokens.Capabilities
callbacks tokens.Callbacks
callbacks callbacks
configuredName string
client *resty.Client
wsconn wsclient.WSClient
}

type callbacks struct {
listeners []tokens.Callbacks
}

func (cb *callbacks) TokenOpUpdate(plugin tokens.Plugin, nsOpID string, txState core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
for _, cb := range cb.listeners {
cb.TokenOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput)
}
}

func (cb *callbacks) TokenPoolCreated(plugin tokens.Plugin, pool *tokens.TokenPool) error {
for _, cb := range cb.listeners {
if err := cb.TokenPoolCreated(plugin, pool); err != nil {
return err
}
}
return nil
}

func (cb *callbacks) TokensTransferred(plugin tokens.Plugin, transfer *tokens.TokenTransfer) error {
for _, cb := range cb.listeners {
if err := cb.TokensTransferred(plugin, transfer); err != nil {
return err
}
}
return nil
}

func (cb *callbacks) TokensApproved(plugin tokens.Plugin, approval *tokens.TokenApproval) error {
for _, cb := range cb.listeners {
if err := cb.TokensApproved(plugin, approval); err != nil {
return err
}
}
return nil
}

type wsEvent struct {
Event msgType `json:"event"`
ID string `json:"id"`
Expand Down Expand Up @@ -159,8 +196,8 @@ func (ft *FFTokens) Init(ctx context.Context, name string, config config.Section
return nil
}

func (ft *FFTokens) RegisterListener(callbacks tokens.Callbacks) {
ft.callbacks = callbacks
func (ft *FFTokens) RegisterListener(listener tokens.Callbacks) {
ft.callbacks.listeners = append(ft.callbacks.listeners, listener)
}

func (ft *FFTokens) Start() error {
Expand Down
26 changes: 10 additions & 16 deletions internal/tokens/fftokens/fftokens_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestCreateTokenPoolSynchronous(t *testing.T) {
})

mcb := &tokenmocks.Callbacks{}
h.callbacks = mcb
h.RegisterListener(mcb)
mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool {
return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && *p.TX.ID == *pool.TX.ID
})).Return(nil)
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestActivateTokenPoolSynchronous(t *testing.T) {
})

mcb := &tokenmocks.Callbacks{}
h.callbacks = mcb
h.RegisterListener(mcb)
mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool {
return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && p.TX.ID == nil && p.Event.ProtocolID == ""
})).Return(nil)
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestActivateTokenPoolSynchronousBadResponse(t *testing.T) {
})

mcb := &tokenmocks.Callbacks{}
h.callbacks = mcb
h.RegisterListener(mcb)
mcb.On("TokenPoolCreated", h, mock.MatchedBy(func(p *tokens.TokenPool) bool {
return p.PoolLocator == "F1" && p.Type == core.TokenTypeFungible && p.TX.ID == nil
})).Return(nil)
Expand Down Expand Up @@ -780,7 +780,7 @@ func TestEvents(t *testing.T) {
assert.Equal(t, `{"data":{"id":"1"},"event":"ack"}`, string(msg))

mcb := &tokenmocks.Callbacks{}
h.callbacks = mcb
h.RegisterListener(mcb)
opID := fftypes.NewUUID()
txID := fftypes.NewUUID()

Expand Down Expand Up @@ -1118,12 +1118,10 @@ func TestEvents(t *testing.T) {
}

func TestEventLoopReceiveClosed(t *testing.T) {
dxc := &tokenmocks.Callbacks{}
wsm := &wsmocks.WSClient{}
h := &FFTokens{
ctx: context.Background(),
callbacks: dxc,
wsconn: wsm,
ctx: context.Background(),
wsconn: wsm,
}
r := make(chan []byte)
close(r)
Expand All @@ -1133,12 +1131,10 @@ func TestEventLoopReceiveClosed(t *testing.T) {
}

func TestEventLoopSendClosed(t *testing.T) {
dxc := &tokenmocks.Callbacks{}
wsm := &wsmocks.WSClient{}
h := &FFTokens{
ctx: context.Background(),
callbacks: dxc,
wsconn: wsm,
ctx: context.Background(),
wsconn: wsm,
}
r := make(chan []byte, 1)
r <- []byte(`{"id":"1"}`) // ignored but acked
Expand All @@ -1149,14 +1145,12 @@ func TestEventLoopSendClosed(t *testing.T) {
}

func TestEventLoopClosedContext(t *testing.T) {
dxc := &tokenmocks.Callbacks{}
wsm := &wsmocks.WSClient{}
ctx, cancel := context.WithCancel(context.Background())
cancel()
h := &FFTokens{
ctx: ctx,
callbacks: dxc,
wsconn: wsm,
ctx: ctx,
wsconn: wsm,
}
r := make(chan []byte, 1)
wsm.On("Close").Return()
Expand Down
6 changes: 3 additions & 3 deletions mocks/tokenmocks/plugin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/tokens/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Plugin interface {
Init(ctx context.Context, name string, config config.Section) error

// RegisterListener registers a listener to receive callbacks
RegisterListener(callbacks Callbacks)
RegisterListener(listener Callbacks)

// Blockchain interface must not deliver any events until start is called
Start() error
Expand Down

0 comments on commit dc11215

Please sign in to comment.