Skip to content

Commit

Permalink
Blockchain plugin can track multiple listeners
Browse files Browse the repository at this point in the history
Each listener will ignore events outside its namespace.

Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Jun 9, 2022
1 parent 15ddcd5 commit de9ad86
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 101 deletions.
43 changes: 40 additions & 3 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Ethereum struct {
prefixShort string
prefixLong string
capabilities *blockchain.Capabilities
callbacks blockchain.Callbacks
callbacks callbacks
client *resty.Client
fftmClient *resty.Client
streams *streamManager
Expand All @@ -76,6 +76,43 @@ type Ethereum struct {
contractConfSize int
}

type callbacks struct {
listeners []blockchain.Callbacks
}

func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
for _, cb := range cb.listeners {
cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput)
}
}

func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error {
for _, cb := range cb.listeners {
if err := cb.BatchPinComplete(batch, signingKey); err != nil {
return err
}
}
return nil
}

func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error {
for _, cb := range cb.listeners {
if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil {
return err
}
}
return nil
}

func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error {
for _, cb := range cb.listeners {
if err := cb.BlockchainEvent(event); err != nil {
return err
}
}
return nil
}

type eventStreamWebsocket struct {
Topic string `json:"topic"`
}
Expand Down Expand Up @@ -213,8 +250,8 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr
return nil
}

func (e *Ethereum) RegisterListener(callbacks blockchain.Callbacks) {
e.callbacks = callbacks
func (e *Ethereum) RegisterListener(listener blockchain.Callbacks) {
e.callbacks.listeners = append(e.callbacks.listeners, listener)
}

func (e *Ethereum) Start() (err error) {
Expand Down
66 changes: 24 additions & 42 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func resetConf(e *Ethereum) {

func newTestEthereum() (*Ethereum, func()) {
ctx, cancel := context.WithCancel(context.Background())
em := &blockchainmocks.Callbacks{}
wsm := &wsmocks.WSClient{}
mm := &metricsmocks.Manager{}
mm.On("IsMetricsEnabled").Return(true)
Expand All @@ -87,7 +86,6 @@ func newTestEthereum() (*Ethereum, func()) {
topic: "topic1",
prefixShort: defaultPrefixShort,
prefixLong: defaultPrefixLong,
callbacks: em,
wsconn: wsm,
metrics: mm,
}
Expand Down Expand Up @@ -1095,7 +1093,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) {

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: em,
callbacks: callbacks{listeners: []blockchain.Callbacks{em}},
}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"
e.fireflyContract.networkVersion = 1
Expand Down Expand Up @@ -1179,10 +1177,7 @@ func TestHandleMessageBatchPinMissingAuthor(t *testing.T) {
}
]`)

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: em,
}
e := &Ethereum{}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"

var events []interface{}
Expand All @@ -1191,8 +1186,6 @@ func TestHandleMessageBatchPinMissingAuthor(t *testing.T) {
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)

em.AssertExpectations(t)

}

func TestHandleMessageEmptyPayloadRef(t *testing.T) {
Expand Down Expand Up @@ -1223,7 +1216,7 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) {

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: em,
callbacks: callbacks{listeners: []blockchain.Callbacks{em}},
}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"
e.fireflyContract.networkVersion = 1
Expand Down Expand Up @@ -1285,7 +1278,7 @@ func TestHandleMessageBatchPinExit(t *testing.T) {

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: em,
callbacks: callbacks{listeners: []blockchain.Callbacks{em}},
}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"
e.fireflyContract.networkVersion = 1
Expand All @@ -1298,11 +1291,11 @@ func TestHandleMessageBatchPinExit(t *testing.T) {
err = e.handleMessageBatch(context.Background(), events)
assert.EqualError(t, err, "pop")

em.AssertExpectations(t)
}

func TestHandleMessageBatchPinEmpty(t *testing.T) {
em := &blockchainmocks.Callbacks{}
e := &Ethereum{callbacks: em}
e := &Ethereum{}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"

var events []interface{}
Expand All @@ -1316,12 +1309,10 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) {
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)
assert.Equal(t, 0, len(em.Calls))
}

func TestHandleMessageBatchMissingData(t *testing.T) {
em := &blockchainmocks.Callbacks{}
e := &Ethereum{callbacks: em}
e := &Ethereum{}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"

var events []interface{}
Expand All @@ -1336,12 +1327,10 @@ func TestHandleMessageBatchMissingData(t *testing.T) {
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)
assert.Equal(t, 0, len(em.Calls))
}

func TestHandleMessageBatchPinBadTransactionID(t *testing.T) {
em := &blockchainmocks.Callbacks{}
e := &Ethereum{callbacks: em}
e := &Ethereum{}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"
data := fftypes.JSONAnyPtr(`[{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
Expand All @@ -1368,12 +1357,10 @@ func TestHandleMessageBatchPinBadTransactionID(t *testing.T) {
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)
assert.Equal(t, 0, len(em.Calls))
}

func TestHandleMessageBatchPinBadIDentity(t *testing.T) {
em := &blockchainmocks.Callbacks{}
e := &Ethereum{callbacks: em}
e := &Ethereum{}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"
data := fftypes.JSONAnyPtr(`[{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
Expand All @@ -1400,12 +1387,10 @@ func TestHandleMessageBatchPinBadIDentity(t *testing.T) {
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)
assert.Equal(t, 0, len(em.Calls))
}

func TestHandleMessageBatchPinBadBatchHash(t *testing.T) {
em := &blockchainmocks.Callbacks{}
e := &Ethereum{callbacks: em}
e := &Ethereum{}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"
data := fftypes.JSONAnyPtr(`[{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
Expand All @@ -1432,12 +1417,10 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) {
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)
assert.Equal(t, 0, len(em.Calls))
}

func TestHandleMessageBatchPinBadPin(t *testing.T) {
em := &blockchainmocks.Callbacks{}
e := &Ethereum{callbacks: em}
e := &Ethereum{}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"
data := fftypes.JSONAnyPtr(`[{
"address": "0x1C197604587F046FD40684A8f21f4609FB811A7b",
Expand All @@ -1464,15 +1447,12 @@ func TestHandleMessageBatchPinBadPin(t *testing.T) {
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)
assert.Equal(t, 0, len(em.Calls))
}

func TestHandleMessageBatchBadJSON(t *testing.T) {
em := &blockchainmocks.Callbacks{}
e := &Ethereum{callbacks: em}
e := &Ethereum{}
err := e.handleMessageBatch(context.Background(), []interface{}{10, 20})
assert.NoError(t, err)
assert.Equal(t, 0, len(em.Calls))
}

func TestEventLoopContextCancelled(t *testing.T) {
Expand Down Expand Up @@ -1523,7 +1503,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
e := &Ethereum{
ctx: context.Background(),
topic: "topic1",
callbacks: em,
callbacks: callbacks{listeners: []blockchain.Callbacks{em}},
wsconn: wsm,
}

Expand Down Expand Up @@ -1564,6 +1544,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
assert.NoError(t, err)
e.handleReceipt(context.Background(), reply)

em.AssertExpectations(t)
}

func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {
Expand Down Expand Up @@ -1591,7 +1572,8 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {
"requestPayload": "{\"from\":\"0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635\",\"gas\":0,\"gasPrice\":0,\"headers\":{\"id\":\"6fb94fff-81d3-4094-567d-e031b1871694\",\"type\":\"SendTransaction\"},\"method\":{\"inputs\":[{\"internalType\":\"bytes32\",\"name\":\"txnId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"batchId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"payloadRef\",\"type\":\"bytes32\"}],\"name\":\"broadcastBatch\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},\"params\":[\"12345\",\"!\",\"!\"],\"to\":\"0xd3266a857285fb75eb7df37353b4a15c8bb828f5\",\"value\":0}"
}`)

em := e.callbacks.(*blockchainmocks.Callbacks)
em := &blockchainmocks.Callbacks{}
e.callbacks.listeners = []blockchain.Callbacks{em}
txsu := em.On("BlockchainOpUpdate",
e,
"ns1:"+operationID.String(),
Expand All @@ -1609,16 +1591,16 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {
r <- []byte(`"not an object"`) // ignored wrong type
r <- data.Bytes()
<-done

em.AssertExpectations(t)
}

func TestHandleMsgBatchBadData(t *testing.T) {
em := &blockchainmocks.Callbacks{}
wsm := &wsmocks.WSClient{}
e := &Ethereum{
ctx: context.Background(),
topic: "topic1",
callbacks: em,
wsconn: wsm,
ctx: context.Background(),
topic: "topic1",
wsconn: wsm,
}

var reply fftypes.JSONObject
Expand Down Expand Up @@ -1836,7 +1818,7 @@ func TestHandleMessageContractEvent(t *testing.T) {

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: em,
callbacks: callbacks{listeners: []blockchain.Callbacks{em}},
}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"

Expand Down Expand Up @@ -1898,7 +1880,7 @@ func TestHandleMessageContractEventError(t *testing.T) {

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: em,
callbacks: callbacks{listeners: []blockchain.Callbacks{em}},
}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"

Expand Down Expand Up @@ -2980,7 +2962,7 @@ func TestHandleNetworkAction(t *testing.T) {

em := &blockchainmocks.Callbacks{}
e := &Ethereum{
callbacks: em,
callbacks: callbacks{listeners: []blockchain.Callbacks{em}},
}
e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5"

Expand Down
43 changes: 40 additions & 3 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Fabric struct {
prefixShort string
prefixLong string
capabilities *blockchain.Capabilities
callbacks blockchain.Callbacks
callbacks callbacks
client *resty.Client
streams *streamManager
streamID string
Expand All @@ -71,6 +71,43 @@ type Fabric struct {
contractConfSize int
}

type callbacks struct {
listeners []blockchain.Callbacks
}

func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
for _, cb := range cb.listeners {
cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput)
}
}

func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error {
for _, cb := range cb.listeners {
if err := cb.BatchPinComplete(batch, signingKey); err != nil {
return err
}
}
return nil
}

func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error {
for _, cb := range cb.listeners {
if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil {
return err
}
}
return nil
}

func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error {
for _, cb := range cb.listeners {
if err := cb.BlockchainEvent(event); err != nil {
return err
}
}
return nil
}

type eventStreamWebsocket struct {
Topic string `json:"topic"`
}
Expand Down Expand Up @@ -285,8 +322,8 @@ func (f *Fabric) TerminateContract(ctx context.Context, contracts *core.FireFlyC
return f.ConfigureContract(ctx, contracts)
}

func (f *Fabric) RegisterListener(callbacks blockchain.Callbacks) {
f.callbacks = callbacks
func (f *Fabric) RegisterListener(listener blockchain.Callbacks) {
f.callbacks.listeners = append(f.callbacks.listeners, listener)
}

func (f *Fabric) Start() (err error) {
Expand Down
Loading

0 comments on commit de9ad86

Please sign in to comment.