diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 340ace0d8..2646a66f0 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -55,7 +55,7 @@ type Ethereum struct { prefixShort string prefixLong string capabilities *blockchain.Capabilities - callbacks blockchain.Callbacks + callbacks callbacks client *resty.Client fftmClient *resty.Client streams *streamManager @@ -76,6 +76,43 @@ type Ethereum struct { contractConfSize int } +type callbacks struct { + listeners []blockchain.Callbacks +} + +func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { + for _, cb := range cb.listeners { + cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) + } +} + +func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { + for _, cb := range cb.listeners { + if err := cb.BatchPinComplete(batch, signingKey); err != nil { + return err + } + } + return nil +} + +func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { + for _, cb := range cb.listeners { + if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { + return err + } + } + return nil +} + +func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { + for _, cb := range cb.listeners { + if err := cb.BlockchainEvent(event); err != nil { + return err + } + } + return nil +} + type eventStreamWebsocket struct { Topic string `json:"topic"` } @@ -213,8 +250,8 @@ func (e *Ethereum) Init(ctx context.Context, config config.Section, metrics metr return nil } -func (e *Ethereum) RegisterListener(callbacks blockchain.Callbacks) { - e.callbacks = callbacks +func (e *Ethereum) RegisterListener(listener blockchain.Callbacks) { + e.callbacks.listeners = append(e.callbacks.listeners, listener) } func (e *Ethereum) Start() (err error) { diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 0a1a06c60..5196f0bff 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -75,7 +75,6 @@ func resetConf(e *Ethereum) { func newTestEthereum() (*Ethereum, func()) { ctx, cancel := context.WithCancel(context.Background()) - em := &blockchainmocks.Callbacks{} wsm := &wsmocks.WSClient{} mm := &metricsmocks.Manager{} mm.On("IsMetricsEnabled").Return(true) @@ -87,7 +86,6 @@ func newTestEthereum() (*Ethereum, func()) { topic: "topic1", prefixShort: defaultPrefixShort, prefixLong: defaultPrefixLong, - callbacks: em, wsconn: wsm, metrics: mm, } @@ -1095,7 +1093,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" e.fireflyContract.networkVersion = 1 @@ -1179,10 +1177,7 @@ func TestHandleMessageBatchPinMissingAuthor(t *testing.T) { } ]`) - em := &blockchainmocks.Callbacks{} - e := &Ethereum{ - callbacks: em, - } + e := &Ethereum{} e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" var events []interface{} @@ -1191,8 +1186,6 @@ func TestHandleMessageBatchPinMissingAuthor(t *testing.T) { err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - em.AssertExpectations(t) - } func TestHandleMessageEmptyPayloadRef(t *testing.T) { @@ -1223,7 +1216,7 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" e.fireflyContract.networkVersion = 1 @@ -1285,7 +1278,7 @@ func TestHandleMessageBatchPinExit(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" e.fireflyContract.networkVersion = 1 @@ -1298,11 +1291,11 @@ func TestHandleMessageBatchPinExit(t *testing.T) { err = e.handleMessageBatch(context.Background(), events) assert.EqualError(t, err, "pop") + em.AssertExpectations(t) } func TestHandleMessageBatchPinEmpty(t *testing.T) { - em := &blockchainmocks.Callbacks{} - e := &Ethereum{callbacks: em} + e := &Ethereum{} e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" var events []interface{} @@ -1316,12 +1309,10 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { assert.NoError(t, err) err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - assert.Equal(t, 0, len(em.Calls)) } func TestHandleMessageBatchMissingData(t *testing.T) { - em := &blockchainmocks.Callbacks{} - e := &Ethereum{callbacks: em} + e := &Ethereum{} e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" var events []interface{} @@ -1336,12 +1327,10 @@ func TestHandleMessageBatchMissingData(t *testing.T) { assert.NoError(t, err) err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - assert.Equal(t, 0, len(em.Calls)) } func TestHandleMessageBatchPinBadTransactionID(t *testing.T) { - em := &blockchainmocks.Callbacks{} - e := &Ethereum{callbacks: em} + e := &Ethereum{} e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", @@ -1368,12 +1357,10 @@ func TestHandleMessageBatchPinBadTransactionID(t *testing.T) { assert.NoError(t, err) err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - assert.Equal(t, 0, len(em.Calls)) } func TestHandleMessageBatchPinBadIDentity(t *testing.T) { - em := &blockchainmocks.Callbacks{} - e := &Ethereum{callbacks: em} + e := &Ethereum{} e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", @@ -1400,12 +1387,10 @@ func TestHandleMessageBatchPinBadIDentity(t *testing.T) { assert.NoError(t, err) err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - assert.Equal(t, 0, len(em.Calls)) } func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { - em := &blockchainmocks.Callbacks{} - e := &Ethereum{callbacks: em} + e := &Ethereum{} e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", @@ -1432,12 +1417,10 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { assert.NoError(t, err) err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - assert.Equal(t, 0, len(em.Calls)) } func TestHandleMessageBatchPinBadPin(t *testing.T) { - em := &blockchainmocks.Callbacks{} - e := &Ethereum{callbacks: em} + e := &Ethereum{} e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" data := fftypes.JSONAnyPtr(`[{ "address": "0x1C197604587F046FD40684A8f21f4609FB811A7b", @@ -1464,15 +1447,12 @@ func TestHandleMessageBatchPinBadPin(t *testing.T) { assert.NoError(t, err) err = e.handleMessageBatch(context.Background(), events) assert.NoError(t, err) - assert.Equal(t, 0, len(em.Calls)) } func TestHandleMessageBatchBadJSON(t *testing.T) { - em := &blockchainmocks.Callbacks{} - e := &Ethereum{callbacks: em} + e := &Ethereum{} err := e.handleMessageBatch(context.Background(), []interface{}{10, 20}) assert.NoError(t, err) - assert.Equal(t, 0, len(em.Calls)) } func TestEventLoopContextCancelled(t *testing.T) { @@ -1523,7 +1503,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { e := &Ethereum{ ctx: context.Background(), topic: "topic1", - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, wsconn: wsm, } @@ -1564,6 +1544,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { assert.NoError(t, err) e.handleReceipt(context.Background(), reply) + em.AssertExpectations(t) } func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { @@ -1591,7 +1572,8 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { "requestPayload": "{\"from\":\"0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635\",\"gas\":0,\"gasPrice\":0,\"headers\":{\"id\":\"6fb94fff-81d3-4094-567d-e031b1871694\",\"type\":\"SendTransaction\"},\"method\":{\"inputs\":[{\"internalType\":\"bytes32\",\"name\":\"txnId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"batchId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"payloadRef\",\"type\":\"bytes32\"}],\"name\":\"broadcastBatch\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},\"params\":[\"12345\",\"!\",\"!\"],\"to\":\"0xd3266a857285fb75eb7df37353b4a15c8bb828f5\",\"value\":0}" }`) - em := e.callbacks.(*blockchainmocks.Callbacks) + em := &blockchainmocks.Callbacks{} + e.callbacks.listeners = []blockchain.Callbacks{em} txsu := em.On("BlockchainOpUpdate", e, "ns1:"+operationID.String(), @@ -1609,16 +1591,16 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { r <- []byte(`"not an object"`) // ignored wrong type r <- data.Bytes() <-done + + em.AssertExpectations(t) } func TestHandleMsgBatchBadData(t *testing.T) { - em := &blockchainmocks.Callbacks{} wsm := &wsmocks.WSClient{} e := &Ethereum{ - ctx: context.Background(), - topic: "topic1", - callbacks: em, - wsconn: wsm, + ctx: context.Background(), + topic: "topic1", + wsconn: wsm, } var reply fftypes.JSONObject @@ -1836,7 +1818,7 @@ func TestHandleMessageContractEvent(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -1898,7 +1880,7 @@ func TestHandleMessageContractEventError(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -2980,7 +2962,7 @@ func TestHandleNetworkAction(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Ethereum{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 33547deec..efea17ce1 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -51,7 +51,7 @@ type Fabric struct { prefixShort string prefixLong string capabilities *blockchain.Capabilities - callbacks blockchain.Callbacks + callbacks callbacks client *resty.Client streams *streamManager streamID string @@ -71,6 +71,43 @@ type Fabric struct { contractConfSize int } +type callbacks struct { + listeners []blockchain.Callbacks +} + +func (cb *callbacks) BlockchainOpUpdate(plugin blockchain.Plugin, nsOpID string, txState blockchain.TransactionStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) { + for _, cb := range cb.listeners { + cb.BlockchainOpUpdate(plugin, nsOpID, txState, blockchainTXID, errorMessage, opOutput) + } +} + +func (cb *callbacks) BatchPinComplete(batch *blockchain.BatchPin, signingKey *core.VerifierRef) error { + for _, cb := range cb.listeners { + if err := cb.BatchPinComplete(batch, signingKey); err != nil { + return err + } + } + return nil +} + +func (cb *callbacks) BlockchainNetworkAction(action string, event *blockchain.Event, signingKey *core.VerifierRef) error { + for _, cb := range cb.listeners { + if err := cb.BlockchainNetworkAction(action, event, signingKey); err != nil { + return err + } + } + return nil +} + +func (cb *callbacks) BlockchainEvent(event *blockchain.EventWithSubscription) error { + for _, cb := range cb.listeners { + if err := cb.BlockchainEvent(event); err != nil { + return err + } + } + return nil +} + type eventStreamWebsocket struct { Topic string `json:"topic"` } @@ -285,8 +322,8 @@ func (f *Fabric) TerminateContract(ctx context.Context, contracts *core.FireFlyC return f.ConfigureContract(ctx, contracts) } -func (f *Fabric) RegisterListener(callbacks blockchain.Callbacks) { - f.callbacks = callbacks +func (f *Fabric) RegisterListener(listener blockchain.Callbacks) { + f.callbacks.listeners = append(f.callbacks.listeners, listener) } func (f *Fabric) Start() (err error) { diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 4cb08d01a..110df2080 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -53,7 +53,6 @@ func resetConf(e *Fabric) { func newTestFabric() (*Fabric, func()) { ctx, cancel := context.WithCancel(context.Background()) - em := &blockchainmocks.Callbacks{} wsm := &wsmocks.WSClient{} e := &Fabric{ ctx: ctx, @@ -62,7 +61,6 @@ func newTestFabric() (*Fabric, func()) { topic: "topic1", prefixShort: defaultPrefixShort, prefixLong: defaultPrefixLong, - callbacks: em, wsconn: wsm, } e.fireflyContract.chaincode = "firefly" @@ -945,7 +943,7 @@ func TestHandleMessageBatchPinOK(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -992,7 +990,7 @@ func TestHandleMessageEmptyPayloadRef(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -1039,7 +1037,7 @@ func TestHandleMessageBatchPinExit(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" @@ -1071,7 +1069,9 @@ func TestHandleMessageBatchPinEmpty(t *testing.T) { ]`) em := &blockchainmocks.Callbacks{} - e := &Fabric{callbacks: em} + e := &Fabric{ + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" var events []interface{} @@ -1095,7 +1095,9 @@ func TestHandleMessageUnknownEventName(t *testing.T) { ]`) em := &blockchainmocks.Callbacks{} - e := &Fabric{callbacks: em} + e := &Fabric{ + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" var events []interface{} @@ -1108,7 +1110,9 @@ func TestHandleMessageUnknownEventName(t *testing.T) { func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { em := &blockchainmocks.Callbacks{} - e := &Fabric{callbacks: em} + e := &Fabric{ + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" data := []byte(`[{ "chaincodeId": "firefly", @@ -1128,7 +1132,9 @@ func TestHandleMessageBatchPinBadBatchHash(t *testing.T) { func TestHandleMessageBatchPinBadPin(t *testing.T) { em := &blockchainmocks.Callbacks{} - e := &Fabric{callbacks: em} + e := &Fabric{ + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" data := []byte(`[{ "chaincodeId": "firefly", @@ -1148,7 +1154,9 @@ func TestHandleMessageBatchPinBadPin(t *testing.T) { func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { em := &blockchainmocks.Callbacks{} - e := &Fabric{callbacks: em} + e := &Fabric{ + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" data := []byte(`[{ "chaincodeId": "firefly", @@ -1168,7 +1176,9 @@ func TestHandleMessageBatchPinBadPayloadEncoding(t *testing.T) { func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { em := &blockchainmocks.Callbacks{} - e := &Fabric{callbacks: em} + e := &Fabric{ + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" data := []byte(`[{ "chaincodeId": "firefly", @@ -1188,7 +1198,9 @@ func TestHandleMessageBatchPinBadPayloadUUIDs(t *testing.T) { func TestHandleMessageBatchBadJSON(t *testing.T) { em := &blockchainmocks.Callbacks{} - e := &Fabric{callbacks: em} + e := &Fabric{ + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, + } err := e.handleMessageBatch(context.Background(), []interface{}{10, 20}) assert.NoError(t, err) assert.Equal(t, 0, len(em.Calls)) @@ -1257,7 +1269,8 @@ func TestEventLoopUnexpectedMessage(t *testing.T) { "receivedAt": 1622428511616, "requestPayload": "{\"from\":\"0x91d2b4381a4cd5c7c0f27565a7d4b829844c8635\",\"gas\":0,\"gasPrice\":0,\"headers\":{\"id\":\"6fb94fff-81d3-4094-567d-e031b1871694\",\"type\":\"SendTransaction\"},\"method\":{\"inputs\":[{\"internalType\":\"bytes32\",\"name\":\"txnId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"batchId\",\"type\":\"bytes32\"},{\"internalType\":\"bytes32\",\"name\":\"payloadRef\",\"type\":\"bytes32\"}],\"name\":\"broadcastBatch\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},\"params\":[\"12345\",\"!\",\"!\"],\"to\":\"0xd3266a857285fb75eb7df37353b4a15c8bb828f5\",\"value\":0}" }`) - em := e.callbacks.(*blockchainmocks.Callbacks) + em := &blockchainmocks.Callbacks{} + e.RegisterListener(em) txsu := em.On("BlockchainOpUpdate", e, "ns1:"+operationID.String(), @@ -1283,7 +1296,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, wsconn: wsm, } @@ -1323,7 +1336,7 @@ func TestHandleReceiptNoRequestID(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, wsconn: wsm, } @@ -1340,7 +1353,7 @@ func TestHandleReceiptFailedTx(t *testing.T) { e := &Fabric{ ctx: context.Background(), topic: "topic1", - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, wsconn: wsm, } @@ -1532,7 +1545,7 @@ func TestHandleMessageContractEvent(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -1590,7 +1603,7 @@ func TestHandleMessageContractEventBadPayload(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -1618,7 +1631,7 @@ func TestHandleMessageContractEventError(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-b5b97a4e-a317-4053-6400-1474650efcb5" @@ -2017,7 +2030,7 @@ func TestHandleNetworkAction(t *testing.T) { em := &blockchainmocks.Callbacks{} e := &Fabric{ - callbacks: em, + callbacks: callbacks{listeners: []blockchain.Callbacks{em}}, } e.fireflyContract.subscription = "sb-0910f6a8-7bd6-4ced-453e-2db68149ce8e" diff --git a/internal/events/batch_pin_complete.go b/internal/events/batch_pin_complete.go index 799722ec7..f8e7982c6 100644 --- a/internal/events/batch_pin_complete.go +++ b/internal/events/batch_pin_complete.go @@ -32,11 +32,14 @@ import ( // We must block here long enough to get the payload from the sharedstorage, persist the messages in the correct // sequence, and also persist all the data. func (em *eventManager) BatchPinComplete(bi blockchain.Plugin, batchPin *blockchain.BatchPin, signingKey *core.VerifierRef) error { + if batchPin.Namespace != em.namespace { + log.L(em.ctx).Debugf("Ignoring BatchPin from wrong namespace '%s'", batchPin.Namespace) + return nil // move on + } if batchPin.TransactionID == nil { log.L(em.ctx).Errorf("Invalid BatchPin transaction - ID is nil") return nil // move on } - if err := core.ValidateFFNameField(em.ctx, batchPin.Namespace, "namespace"); err != nil { log.L(em.ctx).Errorf("Invalid transaction ID='%s' - invalid namespace '%s': %a", batchPin.TransactionID, batchPin.Namespace, err) return nil // move on diff --git a/internal/events/blockchain_event.go b/internal/events/blockchain_event.go index 561a66ba6..4cee6ae4f 100644 --- a/internal/events/blockchain_event.go +++ b/internal/events/blockchain_event.go @@ -127,6 +127,10 @@ func (em *eventManager) BlockchainEvent(event *blockchain.EventWithSubscription) log.L(ctx).Warnf("Event received from unknown subscription %s", event.Subscription) return nil // no retry } + if sub.Namespace != em.namespace { + log.L(em.ctx).Debugf("Ignoring blockchain event from wrong namespace '%s'", sub.Namespace) + return nil + } chainEvent := buildBlockchainEvent(sub.Namespace, sub.ID, &event.Event, &core.BlockchainTransactionRef{ BlockchainID: event.BlockchainTXID, diff --git a/internal/events/blockchain_event_test.go b/internal/events/blockchain_event_test.go index 64d451bef..5c1ec86b9 100644 --- a/internal/events/blockchain_event_test.go +++ b/internal/events/blockchain_event_test.go @@ -49,7 +49,7 @@ func TestContractEventWithRetries(t *testing.T) { }, } sub := &core.ContractListener{ - Namespace: "ns", + Namespace: "ns1", ID: fftypes.NewUUID(), Topic: "topic1", } @@ -59,11 +59,11 @@ func TestContractEventWithRetries(t *testing.T) { mdi.On("GetContractListenerByBackendID", mock.Anything, "sb-1").Return(nil, fmt.Errorf("pop")).Once() mdi.On("GetContractListenerByBackendID", mock.Anything, "sb-1").Return(sub, nil).Times(1) // cached mth := em.txHelper.(*txcommonmocks.Helper) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns", sub.ID, ev.ProtocolID).Return(nil, nil) + mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", sub.ID, ev.ProtocolID).Return(nil, nil) mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { eventID = e.ID - return *e.Listener == *sub.ID && e.Name == "Changed" && e.Namespace == "ns" + return *e.Listener == *sub.ID && e.Name == "Changed" && e.Namespace == "ns1" })).Return(nil).Times(2) mdi.On("GetContractListenerByID", mock.Anything, sub.ID).Return(sub, nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() @@ -111,7 +111,7 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) { ev := &core.BlockchainEvent{ Name: "Changed", - Namespace: "ns", + Namespace: "ns1", ProtocolID: "10/20/30", Output: fftypes.JSONObject{ "value": "1", @@ -123,7 +123,7 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) { } mdi := em.database.(*databasemocks.Plugin) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns", ev.Listener, ev.ProtocolID).Return(&core.BlockchainEvent{}, nil) + mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(&core.BlockchainEvent{}, nil) err := em.maybePersistBlockchainEvent(em.ctx, ev) assert.NoError(t, err) @@ -137,7 +137,7 @@ func TestPersistBlockchainEventLookupFail(t *testing.T) { ev := &core.BlockchainEvent{ Name: "Changed", - Namespace: "ns", + Namespace: "ns1", ProtocolID: "10/20/30", Output: fftypes.JSONObject{ "value": "1", @@ -149,7 +149,7 @@ func TestPersistBlockchainEventLookupFail(t *testing.T) { } mdi := em.database.(*databasemocks.Plugin) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns", ev.Listener, ev.ProtocolID).Return(nil, fmt.Errorf("pop")) + mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(nil, fmt.Errorf("pop")) err := em.maybePersistBlockchainEvent(em.ctx, ev) assert.EqualError(t, err, "pop") @@ -163,7 +163,7 @@ func TestPersistBlockchainEventChainListenerLookupFail(t *testing.T) { ev := &core.BlockchainEvent{ Name: "Changed", - Namespace: "ns", + Namespace: "ns1", ProtocolID: "10/20/30", Output: fftypes.JSONObject{ "value": "1", @@ -176,7 +176,7 @@ func TestPersistBlockchainEventChainListenerLookupFail(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mth := em.txHelper.(*txcommonmocks.Helper) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns", ev.Listener, ev.ProtocolID).Return(nil, nil) + mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(nil, nil) mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil) mdi.On("GetContractListenerByID", mock.Anything, ev.Listener).Return(nil, fmt.Errorf("pop")) @@ -192,7 +192,7 @@ func TestGetTopicForChainListenerFallback(t *testing.T) { defer cancel() sub := &core.ContractListener{ - Namespace: "ns", + Namespace: "ns1", ID: fftypes.NewUUID(), Topic: "", } diff --git a/internal/events/network_action.go b/internal/events/network_action.go index 537e94a64..d3fe860d0 100644 --- a/internal/events/network_action.go +++ b/internal/events/network_action.go @@ -29,13 +29,11 @@ func (em *eventManager) actionTerminate(bi blockchain.Plugin, event *blockchain. if err != nil { return err } - contracts := &namespace.Contracts - if err := bi.TerminateContract(em.ctx, contracts, event); err != nil { + if err := bi.TerminateContract(em.ctx, &namespace.Contracts, event); err != nil { return err } // Currently, a termination event is implied to apply to ALL namespaces return em.database.RunAsGroup(em.ctx, func(ctx context.Context) error { - namespace.Contracts = *contracts if err := em.database.UpsertNamespace(em.ctx, namespace, true); err != nil { return err } diff --git a/internal/operations/manager.go b/internal/operations/manager.go index c5bde77b8..cfbf38501 100644 --- a/internal/operations/manager.go +++ b/internal/operations/manager.go @@ -57,20 +57,22 @@ const ( ) type operationsManager struct { - ctx context.Context - database database.Plugin - handlers map[core.OpType]OperationHandler - updater *operationUpdater + ctx context.Context + namespace string + database database.Plugin + handlers map[core.OpType]OperationHandler + updater *operationUpdater } -func NewOperationsManager(ctx context.Context, di database.Plugin, txHelper txcommon.Helper) (Manager, error) { +func NewOperationsManager(ctx context.Context, ns string, di database.Plugin, txHelper txcommon.Helper) (Manager, error) { if di == nil || txHelper == nil { return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "OperationsManager") } om := &operationsManager{ - ctx: ctx, - database: di, - handlers: make(map[core.OpType]OperationHandler), + ctx: ctx, + namespace: ns, + database: di, + handlers: make(map[core.OpType]OperationHandler), } updater := newOperationUpdater(ctx, om, di, txHelper) om.updater = updater diff --git a/internal/operations/manager_test.go b/internal/operations/manager_test.go index cf369fb7a..70da40e9a 100644 --- a/internal/operations/manager_test.go +++ b/internal/operations/manager_test.go @@ -77,13 +77,13 @@ func newTestOperations(t *testing.T) (*operationsManager, func()) { } ctx, cancel := context.WithCancel(context.Background()) - om, err := NewOperationsManager(ctx, mdi, txHelper) + om, err := NewOperationsManager(ctx, "ns1", mdi, txHelper) assert.NoError(t, err) return om.(*operationsManager), cancel } func TestInitFail(t *testing.T) { - _, err := NewOperationsManager(context.Background(), nil, nil) + _, err := NewOperationsManager(context.Background(), "ns1", nil, nil) assert.Regexp(t, "FF10128", err) } diff --git a/internal/operations/operation_updater.go b/internal/operations/operation_updater.go index 6007773ac..4fea4cca2 100644 --- a/internal/operations/operation_updater.go +++ b/internal/operations/operation_updater.go @@ -107,11 +107,15 @@ func (ou *operationUpdater) pickWorker(ctx context.Context, id *fftypes.UUID, up } func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *OperationUpdate) { - _, id, err := core.ParseNamespacedOpID(ctx, update.NamespacedOpID) + ns, id, err := core.ParseNamespacedOpID(ctx, update.NamespacedOpID) if err != nil { log.L(ctx).Warnf("Unable to update operation '%s' due to invalid ID: %s", update.NamespacedOpID, err) return } + if ns != ou.manager.namespace { + log.L(ou.ctx).Debugf("Ignoring operation update from wrong namespace '%s'", ns) + return + } if ou.conf.workerCount > 0 { select { diff --git a/internal/operations/operation_updater_test.go b/internal/operations/operation_updater_test.go index 0b2dbddb5..d99e2c5a9 100644 --- a/internal/operations/operation_updater_test.go +++ b/internal/operations/operation_updater_test.go @@ -55,7 +55,10 @@ func newTestOperationUpdaterCommon(t *testing.T, dbCapabilities *database.Capabi config.Set(coreconfig.OpUpdateWorkerBatchMaxInserts, 200) logrus.SetLevel(logrus.DebugLevel) - mom := &operationsManager{handlers: make(map[fftypes.FFEnum]OperationHandler)} + mom := &operationsManager{ + namespace: "ns1", + handlers: make(map[fftypes.FFEnum]OperationHandler), + } mdi := &databasemocks.Plugin{} mdi.On("Capabilities").Return(dbCapabilities) mdm := &datamocks.Manager{} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index a8298f1d0..c1acfa086 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -21,6 +21,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/i18n" + "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly/internal/assets" "github.com/hyperledger/firefly/internal/batch" "github.com/hyperledger/firefly/internal/batchpin" @@ -196,21 +197,22 @@ type orchestrator struct { } func NewOrchestrator(ns string, config Config, plugins Plugins, metrics metrics.Manager, adminEvents spievents.Manager) Orchestrator { - return &orchestrator{ + or := &orchestrator{ namespace: ns, config: config, plugins: plugins, metrics: metrics, adminEvents: adminEvents, } + return or } func (or *orchestrator) Init(ctx context.Context, cancelCtx context.CancelFunc) (err error) { - or.ctx = ctx + or.ctx = log.WithLogField(ctx, "ns", or.namespace) or.cancelCtx = cancelCtx - err = or.initPlugins(ctx) + err = or.initPlugins(or.ctx) if err == nil { - err = or.initComponents(ctx) + err = or.initComponents(or.ctx) } // Bind together the blockchain interface callbacks, with the events manager or.bc.bi = or.plugins.Blockchain.Plugin @@ -414,7 +416,7 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } if or.operations == nil { - if or.operations, err = operations.NewOperationsManager(ctx, or.database(), or.txHelper); err != nil { + if or.operations, err = operations.NewOperationsManager(ctx, or.namespace, or.database(), or.txHelper); err != nil { return err } } diff --git a/internal/shareddownload/download_manager_test.go b/internal/shareddownload/download_manager_test.go index ff77abae2..5e8a1ddcc 100644 --- a/internal/shareddownload/download_manager_test.go +++ b/internal/shareddownload/download_manager_test.go @@ -54,7 +54,7 @@ func newTestDownloadManager(t *testing.T) (*downloadManager, func()) { mdi.On("Capabilities").Return(&database.Capabilities{ Concurrency: false, }) - operations, err := operations.NewOperationsManager(context.Background(), mdi, txHelper) + operations, err := operations.NewOperationsManager(context.Background(), "ns1", mdi, txHelper) assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/pkg/blockchain/plugin.go b/pkg/blockchain/plugin.go index 219e48766..4ae497d9b 100644 --- a/pkg/blockchain/plugin.go +++ b/pkg/blockchain/plugin.go @@ -36,7 +36,7 @@ type Plugin interface { Init(ctx context.Context, config config.Section, metrics metrics.Manager) error // RegisterListener registers a listener to receive callbacks - RegisterListener(callbacks Callbacks) + RegisterListener(listener Callbacks) // ConfigureContract initializes the subscription to the FireFly contract // - Checks the provided contract info against the plugin's configuration, and updates it as needed