Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use optimistic inserts for blockchain events #977

Merged
merged 1 commit into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 39 additions & 25 deletions internal/database/sqlcommon/blockchainevents_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,38 +55,52 @@ var (

const blockchaineventsTable = "blockchainevents"

func (s *SQLCommon) InsertBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (err error) {
func (s *SQLCommon) setBlockchainEventInsertValues(query sq.InsertBuilder, event *core.BlockchainEvent) sq.InsertBuilder {
return query.Values(
event.ID,
event.Source,
event.Namespace,
event.Name,
event.ProtocolID,
event.Listener,
event.Output,
event.Info,
event.Timestamp,
event.TX.Type,
event.TX.ID,
event.TX.BlockchainID,
)
}

func (s *SQLCommon) attemptBlockchainEventInsert(ctx context.Context, tx *txWrapper, event *core.BlockchainEvent, requestConflictEmptyResult bool) (err error) {
_, err = s.insertTxExt(ctx, messagesTable, tx,
s.setBlockchainEventInsertValues(sq.Insert(blockchaineventsTable).Columns(blockchainEventColumns...), event),
func() {
s.callbacks.UUIDCollectionNSEvent(database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, event.Namespace, event.ID)
}, requestConflictEmptyResult)
return err
}

func (s *SQLCommon) InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (existing *core.BlockchainEvent, err error) {
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
if err != nil {
return err
return nil, err
}
defer s.rollbackTx(ctx, tx, autoCommit)

if _, err = s.insertTx(ctx, blockchaineventsTable, tx,
sq.Insert(blockchaineventsTable).
Columns(blockchainEventColumns...).
Values(
event.ID,
event.Source,
event.Namespace,
event.Name,
event.ProtocolID,
event.Listener,
event.Output,
event.Info,
event.Timestamp,
event.TX.Type,
event.TX.ID,
event.TX.BlockchainID,
),
func() {
s.callbacks.UUIDCollectionNSEvent(database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, event.Namespace, event.ID)
},
); err != nil {
return err
opErr := s.attemptBlockchainEventInsert(ctx, tx, event, true /* we want a failure here we can progress past */)
if opErr == nil {
return nil, s.commitTx(ctx, tx, autoCommit)
}

// Do a select within the transaction to determine if the protocolID already exists
existing, err = s.GetBlockchainEventByProtocolID(ctx, event.Namespace, event.Listener, event.ProtocolID)
if err != nil || existing != nil {
return existing, err
}

return s.commitTx(ctx, tx, autoCommit)
// Error was apparently not a protocolID conflict - must have been something else
return nil, opErr
}

func (s *SQLCommon) blockchainEventResult(ctx context.Context, row *sql.Rows) (*core.BlockchainEvent, error) {
Expand Down
23 changes: 18 additions & 5 deletions internal/database/sqlcommon/blockchainevents_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestBlockchainEventsE2EWithDB(t *testing.T) {

s.callbacks.On("UUIDCollectionNSEvent", database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, "ns", event.ID).Return()

err := s.InsertBlockchainEvent(ctx, event)
_, err := s.InsertOrGetBlockchainEvent(ctx, event)
assert.NotNil(t, event.Timestamp)
assert.NoError(t, err)
eventJson, _ := json.Marshal(&event)
Expand Down Expand Up @@ -82,12 +82,24 @@ func TestBlockchainEventsE2EWithDB(t *testing.T) {
assert.NoError(t, err)
eventReadJson, _ = json.Marshal(eventRead)
assert.Equal(t, string(eventJson), string(eventReadJson))

// Try to insert again with a new ID (should return existing row)
event2 := &core.BlockchainEvent{
ID: fftypes.NewUUID(),
Namespace: event.Namespace,
Listener: event.Listener,
ProtocolID: event.ProtocolID,
Timestamp: fftypes.Now(),
}
existing, err := s.InsertOrGetBlockchainEvent(ctx, event2)
assert.NoError(t, err)
assert.Equal(t, event.ID, existing.ID)
}

func TestInsertBlockchainEventFailBegin(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10114", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -96,8 +108,9 @@ func TestInsertBlockchainEventFailInsert(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{}))
mock.ExpectRollback()
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10116", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -107,7 +120,7 @@ func TestInsertBlockchainEventFailCommit(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit().WillReturnError(fmt.Errorf("pop"))
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10119", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -122,7 +135,7 @@ func TestGetBlockchainEventByIDSelectFail(t *testing.T) {

func TestGetBlockchainEventByIDNotFound(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{}))
msg, err := s.GetBlockchainEventByID(context.Background(), "ns", fftypes.NewUUID())
assert.NoError(t, err)
assert.Nil(t, msg)
Expand Down
34 changes: 14 additions & 20 deletions internal/events/batch_pin_complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,15 @@ func TestBatchPinCompleteOkBroadcast(t *testing.T) {
}
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
})).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(nil).Times(1)
})).Return(nil, nil).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Times(1)
})).Return(nil).Once()
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil).Once()
msd := em.sharedDownload.(*shareddownloadmocks.Manager)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)
Expand Down Expand Up @@ -191,16 +190,15 @@ func TestBatchPinCompleteOkBroadcastExistingBatch(t *testing.T) {
}
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
})).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(nil).Times(1)
})).Return(nil, nil).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Times(1)
})).Return(nil).Once()
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil).Once()
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(batchPersisted, nil)

Expand Down Expand Up @@ -235,8 +233,7 @@ func TestBatchPinCompleteOkPrivate(t *testing.T) {
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(fmt.Errorf("These pins have been seen before")) // simulate replay fallback
mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)

Expand Down Expand Up @@ -277,8 +274,7 @@ func TestBatchPinCompleteInsertPinsFail(t *testing.T) {
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(fmt.Errorf("optimization miss"))
mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)

err := em.BatchPinComplete("ns1", batchPin, &core.VerifierRef{
Expand Down Expand Up @@ -311,8 +307,7 @@ func TestBatchPinCompleteGetBatchByIDFails(t *testing.T) {
mdi := em.database.(*databasemocks.Plugin)
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, fmt.Errorf("batch lookup failed"))

Expand Down Expand Up @@ -346,8 +341,7 @@ func TestSequencedBroadcastInitiateDownloadFail(t *testing.T) {
mth.On("PersistTransaction", mock.Anything, batchPin.TransactionID, core.TransactionTypeBatchPin, "0x12345").Return(true, nil)

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)
Expand Down
5 changes: 1 addition & 4 deletions internal/events/blockchain_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,14 @@ func (em *eventManager) getTopicForChainListener(listener *core.ContractListener
}

func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) error {
if existing, err := em.database.GetBlockchainEventByProtocolID(ctx, chainEvent.Namespace, chainEvent.Listener, chainEvent.ProtocolID); err != nil {
if existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent); err != nil {
return err
} else if existing != nil {
log.L(ctx).Debugf("Ignoring duplicate blockchain event %s", chainEvent.ProtocolID)
// Return the ID of the existing event
chainEvent.ID = existing.ID
return nil
}
if err := em.txHelper.InsertBlockchainEvent(ctx, chainEvent); err != nil {
return err
}
topic := em.getTopicForChainListener(listener)
ffEvent := core.NewEvent(core.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic)
if err := em.database.InsertEvent(ctx, ffEvent); err != nil {
Expand Down
42 changes: 9 additions & 33 deletions internal/events/blockchain_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ func TestContractEventWithRetries(t *testing.T) {
mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(nil, fmt.Errorf("pop")).Once()
mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(sub, nil).Times(1) // cached
mth := em.txHelper.(*txcommonmocks.Helper)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", sub.ID, ev.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
eventID = e.ID
return *e.Listener == *sub.ID && e.Name == "Changed" && e.Namespace == "ns1"
})).Return(nil).Times(2)
})).Return(nil, nil).Times(2)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived && e.Reference != nil && e.Reference == eventID && e.Topic == "topic1"
Expand Down Expand Up @@ -141,6 +140,7 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) {
defer cancel()

ev := &core.BlockchainEvent{
ID: fftypes.NewUUID(),
Name: "Changed",
Namespace: "ns1",
ProtocolID: "10/20/30",
Expand All @@ -152,40 +152,16 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) {
},
Listener: fftypes.NewUUID(),
}
existingID := fftypes.NewUUID()

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(&core.BlockchainEvent{}, nil)
mth := em.txHelper.(*txcommonmocks.Helper)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev).Return(&core.BlockchainEvent{ID: existingID}, nil)

err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
assert.NoError(t, err)
assert.Equal(t, existingID, ev.ID)

mdi.AssertExpectations(t)
}

func TestPersistBlockchainEventLookupFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()

ev := &core.BlockchainEvent{
Name: "Changed",
Namespace: "ns1",
ProtocolID: "10/20/30",
Output: fftypes.JSONObject{
"value": "1",
},
Info: fftypes.JSONObject{
"blockNumber": "10",
},
Listener: fftypes.NewUUID(),
}

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(nil, fmt.Errorf("pop"))

err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
mth.AssertExpectations(t)
}

func TestGetTopicForChainListenerFallback(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions internal/events/network_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ func TestNetworkAction(t *testing.T) {
mii := em.identity.(*identitymanagermocks.Manager)

mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(&core.Identity{}, nil)
mdi.On("GetBlockchainEventByProtocolID", em.ctx, "ns1", (*fftypes.UUID)(nil), "0001").Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(be *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(be *core.BlockchainEvent) bool {
return be.ProtocolID == "0001"
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil)
mmp.On("TerminateContract", em.ctx, location, mock.AnythingOfType("*blockchain.Event")).Return(nil)

Expand Down
25 changes: 10 additions & 15 deletions internal/events/token_pool_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ func TestTokenPoolCreatedConfirm(t *testing.T) {

mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "123").Return(nil, fmt.Errorf("pop")).Once()
mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "123").Return(storedPool, nil).Once()
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), chainPool.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == chainPool.Event.Name
})).Return(nil).Once()
})).Return(nil, nil).Once()
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Once()
Expand Down Expand Up @@ -278,10 +277,9 @@ func TestTokenPoolCreatedMigrate(t *testing.T) {
}

mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "magic-tokens", "123").Return(storedPool, nil).Times(2)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), chainPool.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == chainPool.Event.Name
})).Return(nil).Once()
})).Return(nil, nil).Once()
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Once()
Expand Down Expand Up @@ -324,10 +322,9 @@ func TestConfirmPoolBlockchainEventFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(fmt.Errorf("pop"))
})).Return(nil, fmt.Errorf("pop"))

err := em.confirmPool(em.ctx, storedPool, event)
assert.EqualError(t, err, "pop")
Expand Down Expand Up @@ -359,10 +356,9 @@ func TestConfirmPoolTxFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil)
Expand Down Expand Up @@ -397,10 +393,9 @@ func TestConfirmPoolUpsertFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil)
Expand Down
Loading