Skip to content

Commit

Permalink
Use optimistic inserts for blockchain events
Browse files Browse the repository at this point in the history
Always attempt to insert first, then fall back to a query for duplicates.

Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Aug 18, 2022
1 parent b8556a4 commit 0e55946
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 166 deletions.
64 changes: 39 additions & 25 deletions internal/database/sqlcommon/blockchainevents_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,38 +55,52 @@ var (

const blockchaineventsTable = "blockchainevents"

func (s *SQLCommon) InsertBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (err error) {
func (s *SQLCommon) setBlockchainEventInsertValues(query sq.InsertBuilder, event *core.BlockchainEvent) sq.InsertBuilder {
return query.Values(
event.ID,
event.Source,
event.Namespace,
event.Name,
event.ProtocolID,
event.Listener,
event.Output,
event.Info,
event.Timestamp,
event.TX.Type,
event.TX.ID,
event.TX.BlockchainID,
)
}

func (s *SQLCommon) attemptBlockchainEventInsert(ctx context.Context, tx *txWrapper, event *core.BlockchainEvent, requestConflictEmptyResult bool) (err error) {
_, err = s.insertTxExt(ctx, messagesTable, tx,
s.setBlockchainEventInsertValues(sq.Insert(blockchaineventsTable).Columns(blockchainEventColumns...), event),
func() {
s.callbacks.UUIDCollectionNSEvent(database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, event.Namespace, event.ID)
}, requestConflictEmptyResult)
return err
}

func (s *SQLCommon) InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (existing *core.BlockchainEvent, err error) {
ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
if err != nil {
return err
return nil, err
}
defer s.rollbackTx(ctx, tx, autoCommit)

if _, err = s.insertTx(ctx, blockchaineventsTable, tx,
sq.Insert(blockchaineventsTable).
Columns(blockchainEventColumns...).
Values(
event.ID,
event.Source,
event.Namespace,
event.Name,
event.ProtocolID,
event.Listener,
event.Output,
event.Info,
event.Timestamp,
event.TX.Type,
event.TX.ID,
event.TX.BlockchainID,
),
func() {
s.callbacks.UUIDCollectionNSEvent(database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, event.Namespace, event.ID)
},
); err != nil {
return err
opErr := s.attemptBlockchainEventInsert(ctx, tx, event, true /* we want a failure here we can progress past */)
if opErr == nil {
return nil, s.commitTx(ctx, tx, autoCommit)
}

// Do a select within the transaction to determine if the protocolID already exists
existing, err = s.GetBlockchainEventByProtocolID(ctx, event.Namespace, event.Listener, event.ProtocolID)
if err != nil || existing != nil {
return existing, err
}

return s.commitTx(ctx, tx, autoCommit)
// Error was apparently not a protocolID conflict - must have been something else
return nil, opErr
}

func (s *SQLCommon) blockchainEventResult(ctx context.Context, row *sql.Rows) (*core.BlockchainEvent, error) {
Expand Down
23 changes: 18 additions & 5 deletions internal/database/sqlcommon/blockchainevents_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestBlockchainEventsE2EWithDB(t *testing.T) {

s.callbacks.On("UUIDCollectionNSEvent", database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, "ns", event.ID).Return()

err := s.InsertBlockchainEvent(ctx, event)
_, err := s.InsertOrGetBlockchainEvent(ctx, event)
assert.NotNil(t, event.Timestamp)
assert.NoError(t, err)
eventJson, _ := json.Marshal(&event)
Expand Down Expand Up @@ -82,12 +82,24 @@ func TestBlockchainEventsE2EWithDB(t *testing.T) {
assert.NoError(t, err)
eventReadJson, _ = json.Marshal(eventRead)
assert.Equal(t, string(eventJson), string(eventReadJson))

// Try to insert again with a new ID (should return existing row)
event2 := &core.BlockchainEvent{
ID: fftypes.NewUUID(),
Namespace: event.Namespace,
Listener: event.Listener,
ProtocolID: event.ProtocolID,
Timestamp: fftypes.Now(),
}
existing, err := s.InsertOrGetBlockchainEvent(ctx, event2)
assert.NoError(t, err)
assert.Equal(t, event.ID, existing.ID)
}

func TestInsertBlockchainEventFailBegin(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10114", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -96,8 +108,9 @@ func TestInsertBlockchainEventFailInsert(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{}))
mock.ExpectRollback()
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10116", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -107,7 +120,7 @@ func TestInsertBlockchainEventFailCommit(t *testing.T) {
mock.ExpectBegin()
mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit().WillReturnError(fmt.Errorf("pop"))
err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{})
_, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{})
assert.Regexp(t, "FF10119", err)
assert.NoError(t, mock.ExpectationsWereMet())
}
Expand All @@ -122,7 +135,7 @@ func TestGetBlockchainEventByIDSelectFail(t *testing.T) {

func TestGetBlockchainEventByIDNotFound(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{}))
msg, err := s.GetBlockchainEventByID(context.Background(), "ns", fftypes.NewUUID())
assert.NoError(t, err)
assert.Nil(t, msg)
Expand Down
34 changes: 14 additions & 20 deletions internal/events/batch_pin_complete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,15 @@ func TestBatchPinCompleteOkBroadcast(t *testing.T) {
}
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
})).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(nil).Times(1)
})).Return(nil, nil).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Times(1)
})).Return(nil).Once()
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil).Once()
msd := em.sharedDownload.(*shareddownloadmocks.Manager)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)
Expand Down Expand Up @@ -191,16 +190,15 @@ func TestBatchPinCompleteOkBroadcastExistingBatch(t *testing.T) {
}
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
})).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == batchPin.Event.Name
})).Return(nil).Times(1)
})).Return(nil, nil).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Times(1)
})).Return(nil).Once()
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil).Once()
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(batchPersisted, nil)

Expand Down Expand Up @@ -235,8 +233,7 @@ func TestBatchPinCompleteOkPrivate(t *testing.T) {
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(fmt.Errorf("These pins have been seen before")) // simulate replay fallback
mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)

Expand Down Expand Up @@ -277,8 +274,7 @@ func TestBatchPinCompleteInsertPinsFail(t *testing.T) {
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(fmt.Errorf("optimization miss"))
mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)

err := em.BatchPinComplete("ns1", batchPin, &core.VerifierRef{
Expand Down Expand Up @@ -311,8 +307,7 @@ func TestBatchPinCompleteGetBatchByIDFails(t *testing.T) {
mdi := em.database.(*databasemocks.Plugin)
mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, fmt.Errorf("batch lookup failed"))

Expand Down Expand Up @@ -346,8 +341,7 @@ func TestSequencedBroadcastInitiateDownloadFail(t *testing.T) {
mth.On("PersistTransaction", mock.Anything, batchPin.TransactionID, core.TransactionTypeBatchPin, "0x12345").Return(true, nil)

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil)
mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil)
mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil)
Expand Down
5 changes: 1 addition & 4 deletions internal/events/blockchain_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,14 @@ func (em *eventManager) getTopicForChainListener(listener *core.ContractListener
}

func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) error {
if existing, err := em.database.GetBlockchainEventByProtocolID(ctx, chainEvent.Namespace, chainEvent.Listener, chainEvent.ProtocolID); err != nil {
if existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent); err != nil {
return err
} else if existing != nil {
log.L(ctx).Debugf("Ignoring duplicate blockchain event %s", chainEvent.ProtocolID)
// Return the ID of the existing event
chainEvent.ID = existing.ID
return nil
}
if err := em.txHelper.InsertBlockchainEvent(ctx, chainEvent); err != nil {
return err
}
topic := em.getTopicForChainListener(listener)
ffEvent := core.NewEvent(core.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic)
if err := em.database.InsertEvent(ctx, ffEvent); err != nil {
Expand Down
42 changes: 9 additions & 33 deletions internal/events/blockchain_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,11 @@ func TestContractEventWithRetries(t *testing.T) {
mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(nil, fmt.Errorf("pop")).Once()
mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(sub, nil).Times(1) // cached
mth := em.txHelper.(*txcommonmocks.Helper)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", sub.ID, ev.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once()
mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")).Once()
mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
eventID = e.ID
return *e.Listener == *sub.ID && e.Name == "Changed" && e.Namespace == "ns1"
})).Return(nil).Times(2)
})).Return(nil, nil).Times(2)
mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once()
mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived && e.Reference != nil && e.Reference == eventID && e.Topic == "topic1"
Expand Down Expand Up @@ -141,6 +140,7 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) {
defer cancel()

ev := &core.BlockchainEvent{
ID: fftypes.NewUUID(),
Name: "Changed",
Namespace: "ns1",
ProtocolID: "10/20/30",
Expand All @@ -152,40 +152,16 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) {
},
Listener: fftypes.NewUUID(),
}
existingID := fftypes.NewUUID()

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(&core.BlockchainEvent{}, nil)
mth := em.txHelper.(*txcommonmocks.Helper)
mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev).Return(&core.BlockchainEvent{ID: existingID}, nil)

err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
assert.NoError(t, err)
assert.Equal(t, existingID, ev.ID)

mdi.AssertExpectations(t)
}

func TestPersistBlockchainEventLookupFail(t *testing.T) {
em, cancel := newTestEventManager(t)
defer cancel()

ev := &core.BlockchainEvent{
Name: "Changed",
Namespace: "ns1",
ProtocolID: "10/20/30",
Output: fftypes.JSONObject{
"value": "1",
},
Info: fftypes.JSONObject{
"blockNumber": "10",
},
Listener: fftypes.NewUUID(),
}

mdi := em.database.(*databasemocks.Plugin)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(nil, fmt.Errorf("pop"))

err := em.maybePersistBlockchainEvent(em.ctx, ev, nil)
assert.EqualError(t, err, "pop")

mdi.AssertExpectations(t)
mth.AssertExpectations(t)
}

func TestGetTopicForChainListenerFallback(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions internal/events/network_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ func TestNetworkAction(t *testing.T) {
mii := em.identity.(*identitymanagermocks.Manager)

mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(&core.Identity{}, nil)
mdi.On("GetBlockchainEventByProtocolID", em.ctx, "ns1", (*fftypes.UUID)(nil), "0001").Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(be *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(be *core.BlockchainEvent) bool {
return be.ProtocolID == "0001"
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil)
mmp.On("TerminateContract", em.ctx, location, mock.AnythingOfType("*blockchain.Event")).Return(nil)

Expand Down
25 changes: 10 additions & 15 deletions internal/events/token_pool_created_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,9 @@ func TestTokenPoolCreatedConfirm(t *testing.T) {

mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "123").Return(nil, fmt.Errorf("pop")).Once()
mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "123").Return(storedPool, nil).Once()
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), chainPool.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == chainPool.Event.Name
})).Return(nil).Once()
})).Return(nil, nil).Once()
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Once()
Expand Down Expand Up @@ -278,10 +277,9 @@ func TestTokenPoolCreatedMigrate(t *testing.T) {
}

mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "magic-tokens", "123").Return(storedPool, nil).Times(2)
mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), chainPool.Event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == chainPool.Event.Name
})).Return(nil).Once()
})).Return(nil, nil).Once()
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil).Once()
Expand Down Expand Up @@ -324,10 +322,9 @@ func TestConfirmPoolBlockchainEventFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(fmt.Errorf("pop"))
})).Return(nil, fmt.Errorf("pop"))

err := em.confirmPool(em.ctx, storedPool, event)
assert.EqualError(t, err, "pop")
Expand Down Expand Up @@ -359,10 +356,9 @@ func TestConfirmPoolTxFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil)
Expand Down Expand Up @@ -397,10 +393,9 @@ func TestConfirmPoolUpsertFail(t *testing.T) {
ProtocolID: "tx1",
}

mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil)
mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool {
return e.Name == event.Name
})).Return(nil)
})).Return(nil, nil)
mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool {
return e.Type == core.EventTypeBlockchainEventReceived
})).Return(nil)
Expand Down
Loading

0 comments on commit 0e55946

Please sign in to comment.