From 0e559460a4ee2c68a66c23a5ca108bc98e7dfded Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Thu, 18 Aug 2022 10:46:23 -0400 Subject: [PATCH] Use optimistic inserts for blockchain events Always attempt to insert first, then fall back to a query for duplicates. Signed-off-by: Andrew Richardson --- .../sqlcommon/blockchainevents_sql.go | 64 +++++++++++-------- .../sqlcommon/blockchainevents_sql_test.go | 23 +++++-- internal/events/batch_pin_complete_test.go | 34 ++++------ internal/events/blockchain_event.go | 5 +- internal/events/blockchain_event_test.go | 42 +++--------- internal/events/network_action_test.go | 5 +- internal/events/token_pool_created_test.go | 25 +++----- internal/events/tokens_approved_test.go | 15 ++--- internal/events/tokens_transferred_test.go | 25 +++----- internal/orchestrator/data_query.go | 2 +- internal/orchestrator/data_query_test.go | 2 +- internal/txcommon/txcommon.go | 16 +++-- internal/txcommon/txcommon_test.go | 33 ++++++++-- mocks/databasemocks/plugin.go | 37 +++++++---- mocks/txcommonmocks/helper.go | 25 +++++--- pkg/database/plugin.go | 5 +- 16 files changed, 192 insertions(+), 166 deletions(-) diff --git a/internal/database/sqlcommon/blockchainevents_sql.go b/internal/database/sqlcommon/blockchainevents_sql.go index cf3e0a4c2..0dbdf4035 100644 --- a/internal/database/sqlcommon/blockchainevents_sql.go +++ b/internal/database/sqlcommon/blockchainevents_sql.go @@ -55,38 +55,52 @@ var ( const blockchaineventsTable = "blockchainevents" -func (s *SQLCommon) InsertBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (err error) { +func (s *SQLCommon) setBlockchainEventInsertValues(query sq.InsertBuilder, event *core.BlockchainEvent) sq.InsertBuilder { + return query.Values( + event.ID, + event.Source, + event.Namespace, + event.Name, + event.ProtocolID, + event.Listener, + event.Output, + event.Info, + event.Timestamp, + event.TX.Type, + event.TX.ID, + event.TX.BlockchainID, + ) +} + +func (s *SQLCommon) attemptBlockchainEventInsert(ctx context.Context, tx *txWrapper, event *core.BlockchainEvent, requestConflictEmptyResult bool) (err error) { + _, err = s.insertTxExt(ctx, messagesTable, tx, + s.setBlockchainEventInsertValues(sq.Insert(blockchaineventsTable).Columns(blockchainEventColumns...), event), + func() { + s.callbacks.UUIDCollectionNSEvent(database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, event.Namespace, event.ID) + }, requestConflictEmptyResult) + return err +} + +func (s *SQLCommon) InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (existing *core.BlockchainEvent, err error) { ctx, tx, autoCommit, err := s.beginOrUseTx(ctx) if err != nil { - return err + return nil, err } defer s.rollbackTx(ctx, tx, autoCommit) - if _, err = s.insertTx(ctx, blockchaineventsTable, tx, - sq.Insert(blockchaineventsTable). - Columns(blockchainEventColumns...). - Values( - event.ID, - event.Source, - event.Namespace, - event.Name, - event.ProtocolID, - event.Listener, - event.Output, - event.Info, - event.Timestamp, - event.TX.Type, - event.TX.ID, - event.TX.BlockchainID, - ), - func() { - s.callbacks.UUIDCollectionNSEvent(database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, event.Namespace, event.ID) - }, - ); err != nil { - return err + opErr := s.attemptBlockchainEventInsert(ctx, tx, event, true /* we want a failure here we can progress past */) + if opErr == nil { + return nil, s.commitTx(ctx, tx, autoCommit) + } + + // Do a select within the transaction to determine if the protocolID already exists + existing, err = s.GetBlockchainEventByProtocolID(ctx, event.Namespace, event.Listener, event.ProtocolID) + if err != nil || existing != nil { + return existing, err } - return s.commitTx(ctx, tx, autoCommit) + // Error was apparently not a protocolID conflict - must have been something else + return nil, opErr } func (s *SQLCommon) blockchainEventResult(ctx context.Context, row *sql.Rows) (*core.BlockchainEvent, error) { diff --git a/internal/database/sqlcommon/blockchainevents_sql_test.go b/internal/database/sqlcommon/blockchainevents_sql_test.go index d00ae84cc..3f73fc366 100644 --- a/internal/database/sqlcommon/blockchainevents_sql_test.go +++ b/internal/database/sqlcommon/blockchainevents_sql_test.go @@ -53,7 +53,7 @@ func TestBlockchainEventsE2EWithDB(t *testing.T) { s.callbacks.On("UUIDCollectionNSEvent", database.CollectionBlockchainEvents, core.ChangeEventTypeCreated, "ns", event.ID).Return() - err := s.InsertBlockchainEvent(ctx, event) + _, err := s.InsertOrGetBlockchainEvent(ctx, event) assert.NotNil(t, event.Timestamp) assert.NoError(t, err) eventJson, _ := json.Marshal(&event) @@ -82,12 +82,24 @@ func TestBlockchainEventsE2EWithDB(t *testing.T) { assert.NoError(t, err) eventReadJson, _ = json.Marshal(eventRead) assert.Equal(t, string(eventJson), string(eventReadJson)) + + // Try to insert again with a new ID (should return existing row) + event2 := &core.BlockchainEvent{ + ID: fftypes.NewUUID(), + Namespace: event.Namespace, + Listener: event.Listener, + ProtocolID: event.ProtocolID, + Timestamp: fftypes.Now(), + } + existing, err := s.InsertOrGetBlockchainEvent(ctx, event2) + assert.NoError(t, err) + assert.Equal(t, event.ID, existing.ID) } func TestInsertBlockchainEventFailBegin(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin().WillReturnError(fmt.Errorf("pop")) - err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{}) + _, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{}) assert.Regexp(t, "FF10114", err) assert.NoError(t, mock.ExpectationsWereMet()) } @@ -96,8 +108,9 @@ func TestInsertBlockchainEventFailInsert(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() mock.ExpectExec("INSERT .*").WillReturnError(fmt.Errorf("pop")) + mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{})) mock.ExpectRollback() - err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{}) + _, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{}) assert.Regexp(t, "FF10116", err) assert.NoError(t, mock.ExpectationsWereMet()) } @@ -107,7 +120,7 @@ func TestInsertBlockchainEventFailCommit(t *testing.T) { mock.ExpectBegin() mock.ExpectExec("INSERT .*").WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectCommit().WillReturnError(fmt.Errorf("pop")) - err := s.InsertBlockchainEvent(context.Background(), &core.BlockchainEvent{}) + _, err := s.InsertOrGetBlockchainEvent(context.Background(), &core.BlockchainEvent{}) assert.Regexp(t, "FF10119", err) assert.NoError(t, mock.ExpectationsWereMet()) } @@ -122,7 +135,7 @@ func TestGetBlockchainEventByIDSelectFail(t *testing.T) { func TestGetBlockchainEventByIDNotFound(t *testing.T) { s, mock := newMockProvider().init() - mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"})) + mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{})) msg, err := s.GetBlockchainEventByID(context.Background(), "ns", fftypes.NewUUID()) assert.NoError(t, err) assert.Nil(t, msg) diff --git a/internal/events/batch_pin_complete_test.go b/internal/events/batch_pin_complete_test.go index 95019601f..f4a36edbc 100644 --- a/internal/events/batch_pin_complete_test.go +++ b/internal/events/batch_pin_complete_test.go @@ -121,16 +121,15 @@ func TestBatchPinCompleteOkBroadcast(t *testing.T) { } } - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == batchPin.Event.Name - })).Return(fmt.Errorf("pop")).Once() - mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + })).Return(nil, fmt.Errorf("pop")).Once() + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == batchPin.Event.Name - })).Return(nil).Times(1) + })).Return(nil, nil).Once() mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool { return e.Type == core.EventTypeBlockchainEventReceived - })).Return(nil).Times(1) + })).Return(nil).Once() mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil).Once() msd := em.sharedDownload.(*shareddownloadmocks.Manager) mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil) @@ -191,16 +190,15 @@ func TestBatchPinCompleteOkBroadcastExistingBatch(t *testing.T) { } } - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == batchPin.Event.Name - })).Return(fmt.Errorf("pop")).Once() - mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + })).Return(nil, fmt.Errorf("pop")).Once() + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == batchPin.Event.Name - })).Return(nil).Times(1) + })).Return(nil, nil).Once() mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool { return e.Type == core.EventTypeBlockchainEventReceived - })).Return(nil).Times(1) + })).Return(nil).Once() mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil).Once() mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(batchPersisted, nil) @@ -235,8 +233,7 @@ func TestBatchPinCompleteOkPrivate(t *testing.T) { mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil) mdi.On("InsertPins", mock.Anything, mock.Anything).Return(fmt.Errorf("These pins have been seen before")) // simulate replay fallback mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(nil) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil) + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil) @@ -277,8 +274,7 @@ func TestBatchPinCompleteInsertPinsFail(t *testing.T) { mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil) mdi.On("InsertPins", mock.Anything, mock.Anything).Return(fmt.Errorf("optimization miss")) mdi.On("UpsertPin", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil) + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) err := em.BatchPinComplete("ns1", batchPin, &core.VerifierRef{ @@ -311,8 +307,7 @@ func TestBatchPinCompleteGetBatchByIDFails(t *testing.T) { mdi := em.database.(*databasemocks.Plugin) mdi.On("RunAsGroup", mock.Anything, mock.Anything).Return(nil) mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil) + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, fmt.Errorf("batch lookup failed")) @@ -346,8 +341,7 @@ func TestSequencedBroadcastInitiateDownloadFail(t *testing.T) { mth.On("PersistTransaction", mock.Anything, batchPin.TransactionID, core.TransactionTypeBatchPin, "0x12345").Return(true, nil) mdi := em.database.(*databasemocks.Plugin) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), batchPin.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(nil) + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, nil) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(nil) mdi.On("InsertPins", mock.Anything, mock.Anything).Return(nil) mdi.On("GetBatchByID", mock.Anything, "ns1", mock.Anything).Return(nil, nil) diff --git a/internal/events/blockchain_event.go b/internal/events/blockchain_event.go index 96c1fff90..b408a4667 100644 --- a/internal/events/blockchain_event.go +++ b/internal/events/blockchain_event.go @@ -78,7 +78,7 @@ func (em *eventManager) getTopicForChainListener(listener *core.ContractListener } func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent, listener *core.ContractListener) error { - if existing, err := em.database.GetBlockchainEventByProtocolID(ctx, chainEvent.Namespace, chainEvent.Listener, chainEvent.ProtocolID); err != nil { + if existing, err := em.txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent); err != nil { return err } else if existing != nil { log.L(ctx).Debugf("Ignoring duplicate blockchain event %s", chainEvent.ProtocolID) @@ -86,9 +86,6 @@ func (em *eventManager) maybePersistBlockchainEvent(ctx context.Context, chainEv chainEvent.ID = existing.ID return nil } - if err := em.txHelper.InsertBlockchainEvent(ctx, chainEvent); err != nil { - return err - } topic := em.getTopicForChainListener(listener) ffEvent := core.NewEvent(core.EventTypeBlockchainEventReceived, chainEvent.Namespace, chainEvent.ID, chainEvent.TX.ID, topic) if err := em.database.InsertEvent(ctx, ffEvent); err != nil { diff --git a/internal/events/blockchain_event_test.go b/internal/events/blockchain_event_test.go index a9fe04176..e1b2294f2 100644 --- a/internal/events/blockchain_event_test.go +++ b/internal/events/blockchain_event_test.go @@ -59,12 +59,11 @@ func TestContractEventWithRetries(t *testing.T) { mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(nil, fmt.Errorf("pop")).Once() mdi.On("GetContractListenerByBackendID", mock.Anything, "ns1", "sb-1").Return(sub, nil).Times(1) // cached mth := em.txHelper.(*txcommonmocks.Helper) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", sub.ID, ev.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() - mth.On("InsertBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop")).Once() + mth.On("InsertOrGetBlockchainEvent", mock.Anything, mock.MatchedBy(func(e *core.BlockchainEvent) bool { eventID = e.ID return *e.Listener == *sub.ID && e.Name == "Changed" && e.Namespace == "ns1" - })).Return(nil).Times(2) + })).Return(nil, nil).Times(2) mdi.On("InsertEvent", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")).Once() mdi.On("InsertEvent", mock.Anything, mock.MatchedBy(func(e *core.Event) bool { return e.Type == core.EventTypeBlockchainEventReceived && e.Reference != nil && e.Reference == eventID && e.Topic == "topic1" @@ -141,6 +140,7 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) { defer cancel() ev := &core.BlockchainEvent{ + ID: fftypes.NewUUID(), Name: "Changed", Namespace: "ns1", ProtocolID: "10/20/30", @@ -152,40 +152,16 @@ func TestPersistBlockchainEventDuplicate(t *testing.T) { }, Listener: fftypes.NewUUID(), } + existingID := fftypes.NewUUID() - mdi := em.database.(*databasemocks.Plugin) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(&core.BlockchainEvent{}, nil) + mth := em.txHelper.(*txcommonmocks.Helper) + mth.On("InsertOrGetBlockchainEvent", mock.Anything, ev).Return(&core.BlockchainEvent{ID: existingID}, nil) err := em.maybePersistBlockchainEvent(em.ctx, ev, nil) assert.NoError(t, err) + assert.Equal(t, existingID, ev.ID) - mdi.AssertExpectations(t) -} - -func TestPersistBlockchainEventLookupFail(t *testing.T) { - em, cancel := newTestEventManager(t) - defer cancel() - - ev := &core.BlockchainEvent{ - Name: "Changed", - Namespace: "ns1", - ProtocolID: "10/20/30", - Output: fftypes.JSONObject{ - "value": "1", - }, - Info: fftypes.JSONObject{ - "blockNumber": "10", - }, - Listener: fftypes.NewUUID(), - } - - mdi := em.database.(*databasemocks.Plugin) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", ev.Listener, ev.ProtocolID).Return(nil, fmt.Errorf("pop")) - - err := em.maybePersistBlockchainEvent(em.ctx, ev, nil) - assert.EqualError(t, err, "pop") - - mdi.AssertExpectations(t) + mth.AssertExpectations(t) } func TestGetTopicForChainListenerFallback(t *testing.T) { diff --git a/internal/events/network_action_test.go b/internal/events/network_action_test.go index e001c1ee8..447e0c8c7 100644 --- a/internal/events/network_action_test.go +++ b/internal/events/network_action_test.go @@ -48,10 +48,9 @@ func TestNetworkAction(t *testing.T) { mii := em.identity.(*identitymanagermocks.Manager) mii.On("FindIdentityForVerifier", em.ctx, []core.IdentityType{core.IdentityTypeOrg}, verifier).Return(&core.Identity{}, nil) - mdi.On("GetBlockchainEventByProtocolID", em.ctx, "ns1", (*fftypes.UUID)(nil), "0001").Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(be *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(be *core.BlockchainEvent) bool { return be.ProtocolID == "0001" - })).Return(nil) + })).Return(nil, nil) mdi.On("InsertEvent", em.ctx, mock.Anything).Return(nil) mmp.On("TerminateContract", em.ctx, location, mock.AnythingOfType("*blockchain.Event")).Return(nil) diff --git a/internal/events/token_pool_created_test.go b/internal/events/token_pool_created_test.go index 7f7461e12..04a6307c6 100644 --- a/internal/events/token_pool_created_test.go +++ b/internal/events/token_pool_created_test.go @@ -133,10 +133,9 @@ func TestTokenPoolCreatedConfirm(t *testing.T) { mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "123").Return(nil, fmt.Errorf("pop")).Once() mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "123").Return(storedPool, nil).Once() - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), chainPool.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == chainPool.Event.Name - })).Return(nil).Once() + })).Return(nil, nil).Once() mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool { return e.Type == core.EventTypeBlockchainEventReceived })).Return(nil).Once() @@ -278,10 +277,9 @@ func TestTokenPoolCreatedMigrate(t *testing.T) { } mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "magic-tokens", "123").Return(storedPool, nil).Times(2) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), chainPool.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == chainPool.Event.Name - })).Return(nil).Once() + })).Return(nil, nil).Once() mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool { return e.Type == core.EventTypeBlockchainEventReceived })).Return(nil).Once() @@ -324,10 +322,9 @@ func TestConfirmPoolBlockchainEventFail(t *testing.T) { ProtocolID: "tx1", } - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == event.Name - })).Return(fmt.Errorf("pop")) + })).Return(nil, fmt.Errorf("pop")) err := em.confirmPool(em.ctx, storedPool, event) assert.EqualError(t, err, "pop") @@ -359,10 +356,9 @@ func TestConfirmPoolTxFail(t *testing.T) { ProtocolID: "tx1", } - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == event.Name - })).Return(nil) + })).Return(nil, nil) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool { return e.Type == core.EventTypeBlockchainEventReceived })).Return(nil) @@ -397,10 +393,9 @@ func TestConfirmPoolUpsertFail(t *testing.T) { ProtocolID: "tx1", } - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Name == event.Name - })).Return(nil) + })).Return(nil, nil) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(e *core.Event) bool { return e.Type == core.EventTypeBlockchainEventReceived })).Return(nil) diff --git a/internal/events/tokens_approved_test.go b/internal/events/tokens_approved_test.go index bcf14c9d5..0c0625069 100644 --- a/internal/events/tokens_approved_test.go +++ b/internal/events/tokens_approved_test.go @@ -77,10 +77,9 @@ func TestTokensApprovedSucceedWithRetries(t *testing.T) { mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "F1").Return(pool, nil).Times(4) mdi.On("GetTokenApprovalByProtocolID", em.ctx, "ns1", approval.Connector, approval.ProtocolID).Return(nil, fmt.Errorf("pop")).Once() mdi.On("GetTokenApprovalByProtocolID", em.ctx, "ns1", approval.Connector, approval.ProtocolID).Return(nil, nil).Times(3) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), approval.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Namespace == pool.Namespace && e.Name == approval.Event.Name - })).Return(nil).Times(3) + })).Return(nil, nil).Times(3) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *core.Event) bool { return ev.Type == core.EventTypeBlockchainEventReceived && ev.Namespace == pool.Namespace })).Return(nil).Times(3) @@ -291,10 +290,9 @@ func TestApprovedWithTransactionRegenerateLocalID(t *testing.T) { mdi.On("GetOperations", em.ctx, "ns1", mock.Anything).Return(ops, nil, nil) mth.On("PersistTransaction", mock.Anything, approval.TX.ID, core.TransactionTypeTokenApproval, "0xffffeeee").Return(true, nil) mdi.On("GetTokenApprovalByID", em.ctx, "ns1", localID).Return(&core.TokenApproval{}, nil) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), approval.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Namespace == pool.Namespace && e.Name == approval.Event.Name - })).Return(nil) + })).Return(nil, nil) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *core.Event) bool { return ev.Type == core.EventTypeBlockchainEventReceived && ev.Namespace == pool.Namespace })).Return(nil) @@ -338,10 +336,9 @@ func TestApprovedBlockchainEventFail(t *testing.T) { mdi.On("GetOperations", em.ctx, "ns1", mock.Anything).Return(ops, nil, nil) mth.On("PersistTransaction", mock.Anything, approval.TX.ID, core.TransactionTypeTokenApproval, "0xffffeeee").Return(true, nil) mdi.On("GetTokenApprovalByID", em.ctx, "ns1", localID).Return(&core.TokenApproval{}, nil) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), approval.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Namespace == pool.Namespace && e.Name == approval.Event.Name - })).Return(fmt.Errorf("pop")) + })).Return(nil, fmt.Errorf("pop")) valid, err := em.persistTokenApproval(em.ctx, approval) assert.False(t, valid) diff --git a/internal/events/tokens_transferred_test.go b/internal/events/tokens_transferred_test.go index 33dedfbf0..3f4a3dd35 100644 --- a/internal/events/tokens_transferred_test.go +++ b/internal/events/tokens_transferred_test.go @@ -77,10 +77,9 @@ func TestTokensTransferredSucceedWithRetries(t *testing.T) { mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "F1").Return(pool, nil).Times(4) mdi.On("GetTokenTransferByProtocolID", em.ctx, "ns1", "erc1155", "123").Return(nil, fmt.Errorf("pop")).Once() mdi.On("GetTokenTransferByProtocolID", em.ctx, "ns1", "erc1155", "123").Return(nil, nil).Times(3) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), transfer.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name - })).Return(nil).Times(3) + })).Return(nil, nil).Times(3) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *core.Event) bool { return ev.Type == core.EventTypeBlockchainEventReceived && ev.Namespace == pool.Namespace })).Return(nil).Times(3) @@ -269,10 +268,9 @@ func TestPersistTransferBlockchainEventFail(t *testing.T) { mdi.On("GetOperations", em.ctx, "ns1", mock.Anything).Return(ops, nil, nil) mth.On("PersistTransaction", mock.Anything, transfer.TX.ID, core.TransactionTypeTokenTransfer, "0xffffeeee").Return(true, nil) mdi.On("GetTokenTransferByID", em.ctx, "ns1", localID).Return(nil, nil) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), transfer.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name - })).Return(fmt.Errorf("pop")) + })).Return(nil, fmt.Errorf("pop")) valid, err := em.persistTokenTransfer(em.ctx, transfer) assert.False(t, valid) @@ -308,10 +306,9 @@ func TestTokensTransferredWithTransactionRegenerateLocalID(t *testing.T) { mdi.On("GetOperations", em.ctx, "ns1", mock.Anything).Return(operations, nil, nil) mth.On("PersistTransaction", mock.Anything, transfer.TX.ID, core.TransactionTypeTokenTransfer, "0xffffeeee").Return(true, nil) mdi.On("GetTokenTransferByID", em.ctx, "ns1", localID).Return(&core.TokenTransfer{}, nil) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), transfer.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name - })).Return(nil) + })).Return(nil, nil) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *core.Event) bool { return ev.Type == core.EventTypeBlockchainEventReceived && ev.Namespace == pool.Namespace })).Return(nil) @@ -386,10 +383,9 @@ func TestTokensTransferredWithMessageReceived(t *testing.T) { mdi.On("GetTokenTransferByProtocolID", em.ctx, "ns1", "erc1155", "123").Return(nil, nil).Times(2) mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "F1").Return(pool, nil).Times(2) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), transfer.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name - })).Return(nil).Times(2) + })).Return(nil, nil).Times(2) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *core.Event) bool { return ev.Type == core.EventTypeBlockchainEventReceived && ev.Namespace == pool.Namespace })).Return(nil).Times(2) @@ -450,10 +446,9 @@ func TestTokensTransferredWithMessageSend(t *testing.T) { mdi.On("GetTokenTransferByProtocolID", em.ctx, "ns1", "erc1155", "123").Return(nil, nil).Times(2) mdi.On("GetTokenPoolByLocator", em.ctx, "ns1", "erc1155", "F1").Return(pool, nil).Times(2) - mdi.On("GetBlockchainEventByProtocolID", mock.Anything, "ns1", (*fftypes.UUID)(nil), transfer.Event.ProtocolID).Return(nil, nil) - mth.On("InsertBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { + mth.On("InsertOrGetBlockchainEvent", em.ctx, mock.MatchedBy(func(e *core.BlockchainEvent) bool { return e.Namespace == pool.Namespace && e.Name == transfer.Event.Name - })).Return(nil).Times(2) + })).Return(nil, nil).Times(2) mdi.On("InsertEvent", em.ctx, mock.MatchedBy(func(ev *core.Event) bool { return ev.Type == core.EventTypeBlockchainEventReceived && ev.Namespace == pool.Namespace })).Return(nil).Times(2) diff --git a/internal/orchestrator/data_query.go b/internal/orchestrator/data_query.go index ea42f1367..05c2a9145 100644 --- a/internal/orchestrator/data_query.go +++ b/internal/orchestrator/data_query.go @@ -251,7 +251,7 @@ func (or *orchestrator) GetBlockchainEventByID(ctx context.Context, id string) ( if err != nil { return nil, err } - return or.database().GetBlockchainEventByID(ctx, or.namespace.LocalName, u) + return or.txHelper.GetBlockchainEventByIDCached(ctx, u) } func (or *orchestrator) GetBlockchainEvents(ctx context.Context, filter database.AndFilter) ([]*core.BlockchainEvent, *database.FilterResult, error) { diff --git a/internal/orchestrator/data_query_test.go b/internal/orchestrator/data_query_test.go index f89f314a8..a8351257b 100644 --- a/internal/orchestrator/data_query_test.go +++ b/internal/orchestrator/data_query_test.go @@ -696,7 +696,7 @@ func TestGetBlockchainEventByID(t *testing.T) { defer or.cleanup(t) id := fftypes.NewUUID() - or.mdi.On("GetBlockchainEventByID", context.Background(), "ns", id).Return(&core.BlockchainEvent{ + or.mth.On("GetBlockchainEventByIDCached", context.Background(), id).Return(&core.BlockchainEvent{ Namespace: "ns", }, nil) diff --git a/internal/txcommon/txcommon.go b/internal/txcommon/txcommon.go index 69b34ea3a..1f6ca4adc 100644 --- a/internal/txcommon/txcommon.go +++ b/internal/txcommon/txcommon.go @@ -35,7 +35,7 @@ type Helper interface { SubmitNewTransaction(ctx context.Context, txType core.TransactionType) (*fftypes.UUID, error) PersistTransaction(ctx context.Context, id *fftypes.UUID, txType core.TransactionType, blockchainTXID string) (valid bool, err error) AddBlockchainTX(ctx context.Context, tx *core.Transaction, blockchainTXID string) error - InsertBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent) error + InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (existing *core.BlockchainEvent, err error) GetTransactionByIDCached(ctx context.Context, id *fftypes.UUID) (*core.Transaction, error) GetBlockchainEventByIDCached(ctx context.Context, id *fftypes.UUID) (*core.BlockchainEvent, error) } @@ -183,11 +183,15 @@ func (t *transactionHelper) GetBlockchainEventByIDCached(ctx context.Context, id return chainEvent, nil } -func (t *transactionHelper) InsertBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent) error { - err := t.database.InsertBlockchainEvent(ctx, chainEvent) +func (t *transactionHelper) InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (existing *core.BlockchainEvent, err error) { + existing, err = t.database.InsertOrGetBlockchainEvent(ctx, event) if err != nil { - return err + return nil, err } - t.addBlockchainEventToCache(chainEvent) - return nil + if existing != nil { + t.addBlockchainEventToCache(existing) + return existing, nil + } + t.addBlockchainEventToCache(event) + return nil, nil } diff --git a/internal/txcommon/txcommon_test.go b/internal/txcommon/txcommon_test.go index bfb33959a..c60a5f7ef 100644 --- a/internal/txcommon/txcommon_test.go +++ b/internal/txcommon/txcommon_test.go @@ -489,9 +489,9 @@ func TestInsertGetBlockchainEventCached(t *testing.T) { ID: evID, Namespace: "ns1", } - mdi.On("InsertBlockchainEvent", ctx, chainEvent).Return(nil) + mdi.On("InsertOrGetBlockchainEvent", ctx, chainEvent).Return(nil, nil) - err := txHelper.InsertBlockchainEvent(ctx, chainEvent) + _, err := txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent) assert.NoError(t, err) cached, err := txHelper.GetBlockchainEventByIDCached(ctx, evID) @@ -502,7 +502,7 @@ func TestInsertGetBlockchainEventCached(t *testing.T) { } -func TestInsertGetBlockchainEventErr(t *testing.T) { +func TestInsertBlockchainEventDuplicate(t *testing.T) { mdi := &databasemocks.Plugin{} mdm := &datamocks.Manager{} @@ -514,9 +514,32 @@ func TestInsertGetBlockchainEventErr(t *testing.T) { ID: evID, Namespace: "ns1", } - mdi.On("InsertBlockchainEvent", ctx, chainEvent).Return(fmt.Errorf("pop")) + existingEvent := &core.BlockchainEvent{} + mdi.On("InsertOrGetBlockchainEvent", ctx, chainEvent).Return(existingEvent, nil) - err := txHelper.InsertBlockchainEvent(ctx, chainEvent) + result, err := txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent) + assert.NoError(t, err) + assert.Equal(t, existingEvent, result) + + mdi.AssertExpectations(t) + +} + +func TestInsertBlockchainEventErr(t *testing.T) { + + mdi := &databasemocks.Plugin{} + mdm := &datamocks.Manager{} + txHelper := NewTransactionHelper("ns1", mdi, mdm) + ctx := context.Background() + + evID := fftypes.NewUUID() + chainEvent := &core.BlockchainEvent{ + ID: evID, + Namespace: "ns1", + } + mdi.On("InsertOrGetBlockchainEvent", ctx, chainEvent).Return(nil, fmt.Errorf("pop")) + + _, err := txHelper.InsertOrGetBlockchainEvent(ctx, chainEvent) assert.Regexp(t, "pop", err) mdi.AssertExpectations(t) diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index e9d592389..89b668018 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -2094,20 +2094,6 @@ func (_m *Plugin) InsertBlobs(ctx context.Context, blobs []*core.Blob) error { return r0 } -// InsertBlockchainEvent provides a mock function with given fields: ctx, event -func (_m *Plugin) InsertBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) error { - ret := _m.Called(ctx, event) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *core.BlockchainEvent) error); ok { - r0 = rf(ctx, event) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // InsertContractListener provides a mock function with given fields: ctx, sub func (_m *Plugin) InsertContractListener(ctx context.Context, sub *core.ContractListener) error { ret := _m.Called(ctx, sub) @@ -2220,6 +2206,29 @@ func (_m *Plugin) InsertOperation(ctx context.Context, operation *core.Operation return r0 } +// InsertOrGetBlockchainEvent provides a mock function with given fields: ctx, event +func (_m *Plugin) InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (*core.BlockchainEvent, error) { + ret := _m.Called(ctx, event) + + var r0 *core.BlockchainEvent + if rf, ok := ret.Get(0).(func(context.Context, *core.BlockchainEvent) *core.BlockchainEvent); ok { + r0 = rf(ctx, event) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.BlockchainEvent) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *core.BlockchainEvent) error); ok { + r1 = rf(ctx, event) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // InsertPins provides a mock function with given fields: ctx, pins func (_m *Plugin) InsertPins(ctx context.Context, pins []*core.Pin) error { ret := _m.Called(ctx, pins) diff --git a/mocks/txcommonmocks/helper.go b/mocks/txcommonmocks/helper.go index a5b91a892..b4f3e0545 100644 --- a/mocks/txcommonmocks/helper.go +++ b/mocks/txcommonmocks/helper.go @@ -76,18 +76,27 @@ func (_m *Helper) GetTransactionByIDCached(ctx context.Context, id *fftypes.UUID return r0, r1 } -// InsertBlockchainEvent provides a mock function with given fields: ctx, chainEvent -func (_m *Helper) InsertBlockchainEvent(ctx context.Context, chainEvent *core.BlockchainEvent) error { - ret := _m.Called(ctx, chainEvent) +// InsertOrGetBlockchainEvent provides a mock function with given fields: ctx, event +func (_m *Helper) InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (*core.BlockchainEvent, error) { + ret := _m.Called(ctx, event) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *core.BlockchainEvent) error); ok { - r0 = rf(ctx, chainEvent) + var r0 *core.BlockchainEvent + if rf, ok := ret.Get(0).(func(context.Context, *core.BlockchainEvent) *core.BlockchainEvent); ok { + r0 = rf(ctx, event) } else { - r0 = ret.Error(0) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.BlockchainEvent) + } } - return r0 + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *core.BlockchainEvent) error); ok { + r1 = rf(ctx, event) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // PersistTransaction provides a mock function with given fields: ctx, id, txType, blockchainTXID diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 736b5400f..ad2e35795 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -479,8 +479,9 @@ type iContractListenerCollection interface { } type iBlockchainEventCollection interface { - // InsertBlockchainEvent - insert an event from the blockchain - InsertBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (err error) + // InsertOrGetBlockchainEvent - insert an event from the blockchain + // If the ProtocolID has already been recorded, it does not insert but returns the existing row + InsertOrGetBlockchainEvent(ctx context.Context, event *core.BlockchainEvent) (existing *core.BlockchainEvent, err error) // GetBlockchainEventByID - get blockchain event by ID GetBlockchainEventByID(ctx context.Context, namespace string, id *fftypes.UUID) (*core.BlockchainEvent, error)