Skip to content

Commit

Permalink
Merge pull request #977 from kaleido-io/events
Browse files Browse the repository at this point in the history
Use optimistic inserts for blockchain events
  • Loading branch information
shorsher authored Aug 18, 2022
2 parents b8556a4 + 0e55946 commit cec7b98
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 166 deletions.
64 changes: 39 additions & 25 deletions internal/database/sqlcommon/blockchainevents_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,38 +55,52 @@ var (

const blockchaineventsTable = "blockchainevents"

func (s *SQLCommon) InsertBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (err error) {
func (s *SQLCommon) setBlockchainEventInsertValues(query sq.InsertBuilder, event *core.BlockchainEvent) sq.InsertBuilder {
return query.Values(
event.ID,
event.Source,
event.Namespace,
event.Name,
event.ProtocolID,
event.Listener,
event.Output,
event.Info,
event.Timestamp,
event.TX.Type,
event.TX.ID,
event.TX.BlockchainID,
)
}

func (s *SQLCommon) attemptBlockchainEventInsert(ctx context.Context, tx *txWrapper, event *core.BlockchainEvent, requestConflictEmptyResult bool) (err error) {
_, err = s.insertTxExt(ctx, messagesTable, tx,
s.setBlockchainEventInsertValues(sq.Insert(blockchaineventsTable).Columns(blockchainEventColumns...), event),
func() {
s.callbacks.UUIDCollectionNSEvent(database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, event.Namespace, event.ID)
}, requestConflictEmptyResult)
return err
}

func (s *SQLCommon) InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (existing *core.BlockchainEvent, err error) {
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
if err != nil {
return err
return nil, err
}
defer s.rollbackTx(ctx, tx, autoCommit)

if _, err = s.insertTx(ctx, blockchaineventsTable, tx,
sq.Insert(blockchaineventsTable).
Columns(blockchainEventColumns...).
Values(
event.ID,
event.Source,
event.Namespace,
event.Name,
event.ProtocolID,
event.Listener,
event.Output,
event.Info,
event.Timestamp,
event.TX.Type,
event.TX.ID,
event.TX.BlockchainID,
),
func() {
s.callbacks.UUIDCollectionNSEvent(database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, event.Namespace, event.ID)
},
); err != nil {
return err
opErr := s.attemptBlockchainEventInsert(ctx, tx, event, true /* we want a failure here we can progress past */)
if opErr == nil {
return nil, s.commitTx(ctx, tx, autoCommit)
}

// Do a select within the transaction to determine if the protocolID already exists
existing, err = s.GetBlockchainEventByProtocolID(ctx, event.Namespace, event.Listener, event.ProtocolID)
if err != nil || existing != nil {
return existing, err
}

return s.commitTx(ctx, tx, autoCommit)
// Error was apparently not a protocolID conflict - must have been something else
return nil, opErr
}

func (s *SQLCommon) blockchainEventResult(ctx context.Context, row *sql.Rows) (*core.BlockchainEvent, error) {
Expand Down
23 changes: 18 additions & 5 deletions internal/database/sqlcommon/blockchainevents_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestBlockchainEventsE2EWithDB(t *testing.T) {

s.callbacks.On("UUIDCollectionNSEvent", database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, "ns", event.ID).Return()

err := s.InsertBlockchainEvent(ctx, event)
_, err := s.InsertOrGetBlockchainEvent(ctx, event)
assert.NotNil(t, event.Timestamp)
assert.NoError(t, err)
eventJson, _ := json.Marshal(&event)
Expand Down Expand Up @@ -82,12 +82,24 @@ func TestBlockchainEventsE2EWithDB(t *testing.T) {
assert.NoError(t, err)
eventReadJson, _ = json.Marshal(eventRead)
assert.Equal(t, string(eventJson), string(eventReadJson))

// Try to insert again with a new ID (should return existing row)
event2 := &core.BlockchainEvent{
ID: fftypes.NewUUID(),
Namespace: event.Namespace,
Listener: event.Listener,
ProtocolID: event.ProtocolID,
Timestamp: fftypes.Now(),
}
existing, err := s.InsertOrGetBlockchainEvent(ctx, event2)
assert.NoError(t, err)
assert.Equal(t, event.ID, existing.ID)
}

func TestInsertBlockchainEventFailBegin(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10114", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -96,8 +108,9 @@ func TestInsertBlockchainEventFailInsert(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{}))
mock.ExpectRollback()
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10116", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -107,7 +120,7 @@ func TestInsertBlockchainEventFailCommit(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit().WillReturnError(fmt.Errorf("pop"))
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10119", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -122,7 +135,7 @@ func TestGetBlockchainEventByIDSelectFail(t *testing.T) {

func TestGetBlockchainEventByIDNotFound(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{}))
msg, err := s.GetBlockchainEventByID(context.Background(), "ns", fftypes.NewUUID())
assert.NoError(t, err)
assert.Nil(t, msg)
Expand Down
34 changes: 14 additions & 20 deletions internal/events/batch_pin_complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,15 @@ func TestBatchPinCompleteOkBroadcast(t *testing.T) {
}
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
})).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(nil).Times(1)
})).Return(nil, nil).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Times(1)
})).Return(nil).Once()
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil).Once()
msd := em.sharedDownload.(*shareddownloadmocks.Manager)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)
Expand Down Expand Up @@ -191,16 +190,15 @@ func TestBatchPinCompleteOkBroadcastExistingBatch(t *testing.T) {
}
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
})).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(nil).Times(1)
})).Return(nil, nil).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Times(1)
})).Return(nil).Once()
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil).Once()
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(batchPersisted, nil)

Expand Down Expand Up @@ -235,8 +233,7 @@ func TestBatchPinCompleteOkPrivate(t *testing.T) {
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(fmt.Errorf("These pins have been seen before")) // simulate replay fallback
mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)

Expand Down Expand Up @@ -277,8 +274,7 @@ func TestBatchPinCompleteInsertPinsFail(t *testing.T) {
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(fmt.Errorf("optimization miss"))
mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)

err := em.BatchPinComplete("ns1", batchPin, &core.VerifierRef{
Expand Down Expand Up @@ -311,8 +307,7 @@ func TestBatchPinCompleteGetBatchByIDFails(t *testing.T) {
mdi := em.database.(*databasemocks.Plugin)
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, fmt.Errorf("batch lookup failed"))

Expand Down Expand Up @@ -346,8 +341,7 @@ func TestSequencedBroadcastInitiateDownloadFail(t *testing.T) {
mth.On("PersistTransaction", mock.Anything, batchPin.TransactionID, core.TransactionTypeBatchPin, "0x12345").Return(true, nil)

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)
Expand Down
5 changes: 1 addition & 4 deletions internal/events/blockchain_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,14 @@ func (em *eventManager) getTopicForChainListener(listener *core.ContractListener
}

func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) error {
if existing, err := em.database.GetBlockchainEventByProtocolID(ctx, chainEvent.Namespace, chainEvent.Listener, chainEvent.ProtocolID); err != nil {
if existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent); err != nil {
return err
} else if existing != nil {
log.L(ctx).Debugf("Ignoring duplicate blockchain event %s", chainEvent.ProtocolID)
// Return the ID of the existing event
chainEvent.ID = existing.ID
return nil
}
if err := em.txHelper.InsertBlockchainEvent(ctx, chainEvent); err != nil {
return err
}
topic := em.getTopicForChainListener(listener)
ffEvent := core.NewEvent(core.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic)
if err := em.database.InsertEvent(ctx, ffEvent); err != nil {
Expand Down
42 changes: 9 additions & 33 deletions internal/events/blockchain_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ func TestContractEventWithRetries(t *testing.T) {
mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(nil, fmt.Errorf("pop")).Once()
mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(sub, nil).Times(1) // cached
mth := em.txHelper.(*txcommonmocks.Helper)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", sub.ID, ev.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
eventID = e.ID
return *e.Listener == *sub.ID && e.Name == "Changed" && e.Namespace == "ns1"
})).Return(nil).Times(2)
})).Return(nil, nil).Times(2)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived && e.Reference != nil && e.Reference == eventID && e.Topic == "topic1"
Expand Down Expand Up @@ -141,6 +140,7 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) {
defer cancel()

ev := &core.BlockchainEvent{
ID: fftypes.NewUUID(),
Name: "Changed",
Namespace: "ns1",
ProtocolID: "10/20/30",
Expand All @@ -152,40 +152,16 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) {
},
Listener: fftypes.NewUUID(),
}
existingID := fftypes.NewUUID()

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(&core.BlockchainEvent{}, nil)
mth := em.txHelper.(*txcommonmocks.Helper)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev).Return(&core.BlockchainEvent{ID: existingID}, nil)

err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
assert.NoError(t, err)
assert.Equal(t, existingID, ev.ID)

mdi.AssertExpectations(t)
}

func TestPersistBlockchainEventLookupFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()

ev := &core.BlockchainEvent{
Name: "Changed",
Namespace: "ns1",
ProtocolID: "10/20/30",
Output: fftypes.JSONObject{
"value": "1",
},
Info: fftypes.JSONObject{
"blockNumber": "10",
},
Listener: fftypes.NewUUID(),
}

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(nil, fmt.Errorf("pop"))

err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
mth.AssertExpectations(t)
}

func TestGetTopicForChainListenerFallback(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions internal/events/network_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ func TestNetworkAction(t *testing.T) {
mii := em.identity.(*identitymanagermocks.Manager)

mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(&core.Identity{}, nil)
mdi.On("GetBlockchainEventByProtocolID", em.ctx, "ns1", (*fftypes.UUID)(nil), "0001").Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(be *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(be *core.BlockchainEvent) bool {
return be.ProtocolID == "0001"
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil)
mmp.On("TerminateContract", em.ctx, location, mock.AnythingOfType("*blockchain.Event")).Return(nil)

Expand Down
25 changes: 10 additions & 15 deletions internal/events/token_pool_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ func TestTokenPoolCreatedConfirm(t *testing.T) {

mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "123").Return(nil, fmt.Errorf("pop")).Once()
mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "123").Return(storedPool, nil).Once()
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), chainPool.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == chainPool.Event.Name
})).Return(nil).Once()
})).Return(nil, nil).Once()
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Once()
Expand Down Expand Up @@ -278,10 +277,9 @@ func TestTokenPoolCreatedMigrate(t *testing.T) {
}

mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "magic-tokens", "123").Return(storedPool, nil).Times(2)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), chainPool.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == chainPool.Event.Name
})).Return(nil).Once()
})).Return(nil, nil).Once()
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Once()
Expand Down Expand Up @@ -324,10 +322,9 @@ func TestConfirmPoolBlockchainEventFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(fmt.Errorf("pop"))
})).Return(nil, fmt.Errorf("pop"))

err := em.confirmPool(em.ctx, storedPool, event)
assert.EqualError(t, err, "pop")
Expand Down Expand Up @@ -359,10 +356,9 @@ func TestConfirmPoolTxFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil)
Expand Down Expand Up @@ -397,10 +393,9 @@ func TestConfirmPoolUpsertFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil)
Expand Down
Loading

0 comments on commit cec7b98

Please sign in to comment.