diff --git a/internal/database/postgres/postgres.go b/internal/database/postgres/postgres.go index 90d5b067e1..a047ccd859 100644 --- a/internal/database/postgres/postgres.go +++ b/internal/database/postgres/postgres.go @@ -42,8 +42,8 @@ func (psql *Postgres) Init(ctx context.Context, config config.Section) error { return psql.SQLCommon.Init(ctx, psql, config, capabilities) } -func (psql *Postgres) RegisterListener(callbacks database.Callbacks) { - psql.SQLCommon.RegisterListener(callbacks) +func (psql *Postgres) RegisterListener(listener database.Callbacks) { + psql.SQLCommon.RegisterListener(listener) } func (psql *Postgres) Name() string { diff --git a/internal/database/sqlcommon/contractapis_sql_test.go b/internal/database/sqlcommon/contractapis_sql_test.go index fc601531fe..9a8868697f 100644 --- a/internal/database/sqlcommon/contractapis_sql_test.go +++ b/internal/database/sqlcommon/contractapis_sql_test.go @@ -126,7 +126,7 @@ func TestContractAPIDBFailUpdate(t *testing.T) { func TestUpsertContractAPIIDMismatch(t *testing.T) { s, db := newMockProvider().init() callbacks := &databasemocks.Callbacks{} - s.SQLCommon.callbacks = callbacks + s.RegisterListener(callbacks) apiID := fftypes.NewUUID() api := &core.ContractAPI{ ID: apiID, diff --git a/internal/database/sqlcommon/sqlcommon.go b/internal/database/sqlcommon/sqlcommon.go index fc5d45b0ee..770be69730 100644 --- a/internal/database/sqlcommon/sqlcommon.go +++ b/internal/database/sqlcommon/sqlcommon.go @@ -39,11 +39,45 @@ import ( type SQLCommon struct { db *sql.DB capabilities *database.Capabilities - callbacks database.Callbacks + callbacks callbacks provider Provider features SQLFeatures } +type callbacks struct { + listeners []database.Callbacks +} + +func (cb *callbacks) OrderedUUIDCollectionNSEvent(resType database.OrderedUUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID, sequence int64) { + for _, cb := range cb.listeners { + cb.OrderedUUIDCollectionNSEvent(resType, eventType, ns, id, sequence) + } +} + +func (cb *callbacks) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) { + for _, cb := range cb.listeners { + cb.OrderedCollectionNSEvent(resType, eventType, ns, sequence) + } +} + +func (cb *callbacks) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID) { + for _, cb := range cb.listeners { + cb.UUIDCollectionNSEvent(resType, eventType, ns, id) + } +} + +func (cb *callbacks) UUIDCollectionEvent(resType database.UUIDCollection, eventType core.ChangeEventType, id *fftypes.UUID) { + for _, cb := range cb.listeners { + cb.UUIDCollectionEvent(resType, eventType, id) + } +} + +func (cb *callbacks) HashCollectionNSEvent(resType database.HashCollectionNS, eventType core.ChangeEventType, ns string, hash *fftypes.Bytes32) { + for _, cb := range cb.listeners { + cb.HashCollectionNSEvent(resType, eventType, ns, hash) + } +} + type txContextKey struct{} type txWrapper struct { @@ -96,8 +130,8 @@ func (s *SQLCommon) Init(ctx context.Context, provider Provider, config config.S return nil } -func (s *SQLCommon) RegisterListener(callbacks database.Callbacks) { - s.callbacks = callbacks +func (s *SQLCommon) RegisterListener(listener database.Callbacks) { + s.callbacks.listeners = append(s.callbacks.listeners, listener) } func (s *SQLCommon) Capabilities() *database.Capabilities { return s.capabilities } diff --git a/internal/database/sqlcommon/tokenpool_sql_test.go b/internal/database/sqlcommon/tokenpool_sql_test.go index 8e8e60439b..db12d4bdf1 100644 --- a/internal/database/sqlcommon/tokenpool_sql_test.go +++ b/internal/database/sqlcommon/tokenpool_sql_test.go @@ -176,7 +176,7 @@ func TestUpsertTokenPoolFailCommit(t *testing.T) { func TestUpsertTokenPoolUpdateIDMismatch(t *testing.T) { s, db := newMockProvider().init() callbacks := &databasemocks.Callbacks{} - s.SQLCommon.callbacks = callbacks + s.RegisterListener(callbacks) poolID := fftypes.NewUUID() pool := &core.TokenPool{ ID: poolID, diff --git a/internal/database/sqlite3/sqlite3.go b/internal/database/sqlite3/sqlite3.go index 73879dd755..d0eafb3d42 100644 --- a/internal/database/sqlite3/sqlite3.go +++ b/internal/database/sqlite3/sqlite3.go @@ -58,8 +58,8 @@ func (sqlite *SQLite3) Init(ctx context.Context, config config.Section) error { return sqlite.SQLCommon.Init(ctx, sqlite, config, capabilities) } -func (sqlite *SQLite3) RegisterListener(callbacks database.Callbacks) { - sqlite.SQLCommon.RegisterListener(callbacks) +func (sqlite *SQLite3) RegisterListener(listener database.Callbacks) { + sqlite.SQLCommon.RegisterListener(listener) } func (sqlite *SQLite3) Name() string { diff --git a/internal/namespace/manager.go b/internal/namespace/manager.go index 3214fd7f6d..2248150590 100644 --- a/internal/namespace/manager.go +++ b/internal/namespace/manager.go @@ -199,7 +199,7 @@ func (nm *namespaceManager) Init(ctx context.Context, cancelCtx context.CancelFu func (nm *namespaceManager) initNamespace(name string, ns *namespace) error { or := nm.utOrchestrator if or == nil { - or = orchestrator.NewOrchestrator(name, ns.config, ns.plugins, nm.metrics, nm.adminEvents) + or = orchestrator.NewOrchestrator(name, ns.config, ns.plugins, nm.metrics) } if err := or.Init(nm.ctx, nm.cancelCtx); err != nil { return err @@ -553,6 +553,7 @@ func (nm *namespaceManager) initPlugins(ctx context.Context) (err error) { if err = entry.plugin.Init(ctx, entry.config); err != nil { return err } + entry.plugin.RegisterListener(nm) } for _, entry := range nm.plugins.blockchain { if err = entry.plugin.Init(ctx, entry.config, nm.metrics); err != nil { diff --git a/internal/namespace/manager_test.go b/internal/namespace/manager_test.go index 1226a6697d..2b3391c800 100644 --- a/internal/namespace/manager_test.go +++ b/internal/namespace/manager_test.go @@ -125,6 +125,7 @@ func TestInit(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) + nm.mdi.On("RegisterListener", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -149,6 +150,7 @@ func TestInitVersion1(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) + nm.mdi.On("RegisterListener", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) @@ -174,6 +176,7 @@ func TestInitVersion1Fail(t *testing.T) { nm.utOrchestrator = mo nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil) + nm.mdi.On("RegisterListener", mock.Anything).Return() nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil) nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil) nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil) diff --git a/internal/namespace/persistence_events.go b/internal/namespace/persistence_events.go new file mode 100644 index 0000000000..38772529c5 --- /dev/null +++ b/internal/namespace/persistence_events.go @@ -0,0 +1,73 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package namespace + +import ( + "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly/pkg/core" + "github.com/hyperledger/firefly/pkg/database" +) + +func (nm *namespaceManager) OrderedUUIDCollectionNSEvent(resType database.OrderedUUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID, sequence int64) { + var ces *int64 + if eventType == core.ChangeEventTypeCreated { + // Sequence is only provided on create events + ces = &sequence + } + nm.adminEvents.Dispatch(&core.ChangeEvent{ + Collection: string(resType), + Type: eventType, + Namespace: ns, + ID: id, + Sequence: ces, + }) +} + +func (nm *namespaceManager) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) { + nm.adminEvents.Dispatch(&core.ChangeEvent{ + Collection: string(resType), + Type: eventType, + Namespace: ns, + Sequence: &sequence, + }) +} + +func (nm *namespaceManager) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID) { + nm.adminEvents.Dispatch(&core.ChangeEvent{ + Collection: string(resType), + Type: eventType, + Namespace: ns, + ID: id, + }) +} + +func (nm *namespaceManager) UUIDCollectionEvent(resType database.UUIDCollection, eventType core.ChangeEventType, id *fftypes.UUID) { + nm.adminEvents.Dispatch(&core.ChangeEvent{ + Collection: string(resType), + Type: eventType, + ID: id, + }) +} + +func (nm *namespaceManager) HashCollectionNSEvent(resType database.HashCollectionNS, eventType core.ChangeEventType, ns string, hash *fftypes.Bytes32) { + nm.adminEvents.Dispatch(&core.ChangeEvent{ + Collection: string(resType), + Type: eventType, + Namespace: ns, + Hash: hash, + }) +} diff --git a/internal/namespace/persistence_events_test.go b/internal/namespace/persistence_events_test.go new file mode 100644 index 0000000000..4746b38a14 --- /dev/null +++ b/internal/namespace/persistence_events_test.go @@ -0,0 +1,107 @@ +// Copyright © 2021 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package namespace + +import ( + "testing" + + "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly/mocks/spieventsmocks" + "github.com/hyperledger/firefly/pkg/core" + "github.com/hyperledger/firefly/pkg/database" + "github.com/stretchr/testify/mock" +) + +func TestMessageCreated(t *testing.T) { + mae := &spieventsmocks.Manager{} + nm := &namespaceManager{ + adminEvents: mae, + } + mae.On("Dispatch", mock.Anything).Return() + nm.OrderedUUIDCollectionNSEvent(database.CollectionMessages, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID(), 12345) + mae.AssertExpectations(t) +} + +func TestPinCreated(t *testing.T) { + mae := &spieventsmocks.Manager{} + nm := &namespaceManager{ + adminEvents: mae, + } + mae.On("Dispatch", mock.Anything).Return() + nm.OrderedCollectionNSEvent(database.CollectionPins, core.ChangeEventTypeCreated, "ns1", 12345) + mae.AssertExpectations(t) +} + +func TestEventCreated(t *testing.T) { + mae := &spieventsmocks.Manager{} + nm := &namespaceManager{ + adminEvents: mae, + } + mae.On("Dispatch", mock.Anything).Return() + nm.OrderedUUIDCollectionNSEvent(database.CollectionEvents, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID(), 12345) + mae.AssertExpectations(t) +} + +func TestSubscriptionCreated(t *testing.T) { + mae := &spieventsmocks.Manager{} + nm := &namespaceManager{ + adminEvents: mae, + } + mae.On("Dispatch", mock.Anything).Return() + nm.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID()) + mae.AssertExpectations(t) +} + +func TestSubscriptionUpdated(t *testing.T) { + mae := &spieventsmocks.Manager{} + nm := &namespaceManager{ + adminEvents: mae, + } + mae.On("Dispatch", mock.Anything).Return() + nm.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeUpdated, "ns1", fftypes.NewUUID()) + mae.AssertExpectations(t) +} + +func TestSubscriptionDeleted(t *testing.T) { + mae := &spieventsmocks.Manager{} + nm := &namespaceManager{ + adminEvents: mae, + } + mae.On("Dispatch", mock.Anything).Return() + nm.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeDeleted, "ns1", fftypes.NewUUID()) + mae.AssertExpectations(t) +} + +func TestUUIDCollectionEventFull(t *testing.T) { + mae := &spieventsmocks.Manager{} + nm := &namespaceManager{ + adminEvents: mae, + } + mae.On("Dispatch", mock.Anything).Return() + nm.UUIDCollectionEvent(database.CollectionNamespaces, core.ChangeEventTypeDeleted, fftypes.NewUUID()) + mae.AssertExpectations(t) +} + +func TestHashCollectionNSEventOk(t *testing.T) { + mae := &spieventsmocks.Manager{} + nm := &namespaceManager{ + adminEvents: mae, + } + mae.On("Dispatch", mock.Anything).Return() + nm.HashCollectionNSEvent(database.CollectionGroups, core.ChangeEventTypeDeleted, "ns1", fftypes.NewRandB32()) + mae.AssertExpectations(t) +} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 111facbbde..aed19cefcd 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -37,7 +37,6 @@ import ( "github.com/hyperledger/firefly/internal/operations" "github.com/hyperledger/firefly/internal/privatemessaging" "github.com/hyperledger/firefly/internal/shareddownload" - "github.com/hyperledger/firefly/internal/spievents" "github.com/hyperledger/firefly/internal/syncasync" "github.com/hyperledger/firefly/internal/txcommon" "github.com/hyperledger/firefly/pkg/blockchain" @@ -193,16 +192,14 @@ type orchestrator struct { operations operations.Manager sharedDownload shareddownload.Manager txHelper txcommon.Helper - adminEvents spievents.Manager } -func NewOrchestrator(ns string, config Config, plugins Plugins, metrics metrics.Manager, adminEvents spievents.Manager) Orchestrator { +func NewOrchestrator(ns string, config Config, plugins Plugins, metrics metrics.Manager) Orchestrator { or := &orchestrator{ - namespace: ns, - config: config, - plugins: plugins, - metrics: metrics, - adminEvents: adminEvents, + namespace: ns, + config: config, + plugins: plugins, + metrics: metrics, } return or } diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 7fa087eb46..d85541e45c 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -149,7 +149,6 @@ func newTestOrchestrator() *testOrchestrator { tor.orchestrator.operations = tor.mom tor.orchestrator.batchpin = tor.mbp tor.orchestrator.sharedDownload = tor.msd - tor.orchestrator.adminEvents = tor.mae tor.orchestrator.txHelper = tor.mth tor.orchestrator.definitions = tor.mdh tor.orchestrator.plugins.Blockchain.Plugin = tor.mbi @@ -179,7 +178,6 @@ func TestNewOrchestrator(t *testing.T) { Config{}, Plugins{}, &metricsmocks.Manager{}, - &spieventsmocks.Manager{}, ) assert.NotNil(t, or) } diff --git a/internal/orchestrator/persistence_events.go b/internal/orchestrator/persistence_events.go index ab084913eb..1374682921 100644 --- a/internal/orchestrator/persistence_events.go +++ b/internal/orchestrator/persistence_events.go @@ -18,44 +18,39 @@ package orchestrator import ( "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" ) func (or *orchestrator) OrderedUUIDCollectionNSEvent(resType database.OrderedUUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID, sequence int64) { + if ns != or.namespace { + log.L(or.ctx).Debugf("Ignoring database event from wrong namespace '%s'", ns) + return + } switch { case eventType == core.ChangeEventTypeCreated && resType == database.CollectionMessages: or.batch.NewMessages() <- sequence case eventType == core.ChangeEventTypeCreated && resType == database.CollectionEvents: or.events.NewEvents() <- sequence } - var ces *int64 - if eventType == core.ChangeEventTypeCreated { - // Sequence is only provided on create events - ces = &sequence - } - or.adminEvents.Dispatch(&core.ChangeEvent{ - Collection: string(resType), - Type: eventType, - Namespace: ns, - ID: id, - Sequence: ces, - }) } func (or *orchestrator) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) { + if ns != or.namespace { + log.L(or.ctx).Debugf("Ignoring database event from wrong namespace '%s'", ns) + return + } if eventType == core.ChangeEventTypeCreated && resType == database.CollectionPins { or.events.NewPins() <- sequence } - or.adminEvents.Dispatch(&core.ChangeEvent{ - Collection: string(resType), - Type: eventType, - Namespace: ns, - Sequence: &sequence, - }) } func (or *orchestrator) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID) { + if ns != or.namespace { + log.L(or.ctx).Debugf("Ignoring database event from wrong namespace '%s'", ns) + return + } switch { case eventType == core.ChangeEventTypeCreated && resType == database.CollectionSubscriptions: or.events.NewSubscriptions() <- id @@ -64,28 +59,12 @@ func (or *orchestrator) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, case eventType == core.ChangeEventTypeUpdated && resType == database.CollectionSubscriptions: or.events.SubscriptionUpdates() <- id } - or.adminEvents.Dispatch(&core.ChangeEvent{ - Collection: string(resType), - Type: eventType, - Namespace: ns, - ID: id, - }) } func (or *orchestrator) UUIDCollectionEvent(resType database.UUIDCollection, eventType core.ChangeEventType, id *fftypes.UUID) { - or.adminEvents.Dispatch(&core.ChangeEvent{ - Collection: string(resType), - Type: eventType, - ID: id, - }) + // do nothing } func (or *orchestrator) HashCollectionNSEvent(resType database.HashCollectionNS, eventType core.ChangeEventType, ns string, hash *fftypes.Bytes32) { - or.adminEvents.Dispatch(&core.ChangeEvent{ - Collection: string(resType), - Type: eventType, - Namespace: ns, - Hash: hash, - }) - + // do nothing } diff --git a/internal/orchestrator/persistence_events_test.go b/internal/orchestrator/persistence_events_test.go index dd161f8606..e78d7e6e1f 100644 --- a/internal/orchestrator/persistence_events_test.go +++ b/internal/orchestrator/persistence_events_test.go @@ -22,112 +22,72 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly/mocks/batchmocks" "github.com/hyperledger/firefly/mocks/eventmocks" - "github.com/hyperledger/firefly/mocks/spieventsmocks" "github.com/hyperledger/firefly/pkg/core" "github.com/hyperledger/firefly/pkg/database" - "github.com/stretchr/testify/mock" ) func TestMessageCreated(t *testing.T) { mb := &batchmocks.Manager{} - mae := &spieventsmocks.Manager{} o := &orchestrator{ - batch: mb, - adminEvents: mae, + namespace: "ns1", + batch: mb, } mb.On("NewMessages").Return((chan<- int64)(make(chan int64, 1))) - mae.On("Dispatch", mock.Anything).Return() o.OrderedUUIDCollectionNSEvent(database.CollectionMessages, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID(), 12345) mb.AssertExpectations(t) - mae.AssertExpectations(t) } func TestPinCreated(t *testing.T) { mem := &eventmocks.EventManager{} - mae := &spieventsmocks.Manager{} o := &orchestrator{ - adminEvents: mae, - events: mem, + namespace: "ns1", + events: mem, } - mae.On("Dispatch", mock.Anything).Return() mem.On("NewPins").Return((chan<- int64)(make(chan int64, 1))) o.OrderedCollectionNSEvent(database.CollectionPins, core.ChangeEventTypeCreated, "ns1", 12345) mem.AssertExpectations(t) - mae.AssertExpectations(t) } func TestEventCreated(t *testing.T) { mem := &eventmocks.EventManager{} - mae := &spieventsmocks.Manager{} o := &orchestrator{ - adminEvents: mae, - events: mem, + namespace: "ns1", + events: mem, } - mae.On("Dispatch", mock.Anything).Return() mem.On("NewEvents").Return((chan<- int64)(make(chan int64, 1))) o.OrderedUUIDCollectionNSEvent(database.CollectionEvents, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID(), 12345) mem.AssertExpectations(t) - mae.AssertExpectations(t) } func TestSubscriptionCreated(t *testing.T) { mem := &eventmocks.EventManager{} - mae := &spieventsmocks.Manager{} o := &orchestrator{ - adminEvents: mae, - events: mem, + namespace: "ns1", + events: mem, } - mae.On("Dispatch", mock.Anything).Return() mem.On("NewSubscriptions").Return((chan<- *fftypes.UUID)(make(chan *fftypes.UUID, 1))) o.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID()) mem.AssertExpectations(t) - mae.AssertExpectations(t) } func TestSubscriptionUpdated(t *testing.T) { mem := &eventmocks.EventManager{} - mae := &spieventsmocks.Manager{} o := &orchestrator{ - adminEvents: mae, - events: mem, + namespace: "ns1", + events: mem, } - mae.On("Dispatch", mock.Anything).Return() mem.On("SubscriptionUpdates").Return((chan<- *fftypes.UUID)(make(chan *fftypes.UUID, 1))) o.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeUpdated, "ns1", fftypes.NewUUID()) mem.AssertExpectations(t) - mae.AssertExpectations(t) } func TestSubscriptionDeleted(t *testing.T) { mem := &eventmocks.EventManager{} - mae := &spieventsmocks.Manager{} o := &orchestrator{ - adminEvents: mae, - events: mem, + namespace: "ns1", + events: mem, } - mae.On("Dispatch", mock.Anything).Return() mem.On("DeletedSubscriptions").Return((chan<- *fftypes.UUID)(make(chan *fftypes.UUID, 1))) o.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeDeleted, "ns1", fftypes.NewUUID()) mem.AssertExpectations(t) - mae.AssertExpectations(t) -} - -func TestUUIDCollectionEventFull(t *testing.T) { - mae := &spieventsmocks.Manager{} - o := &orchestrator{ - adminEvents: mae, - } - mae.On("Dispatch", mock.Anything).Return() - o.UUIDCollectionEvent(database.CollectionNamespaces, core.ChangeEventTypeDeleted, fftypes.NewUUID()) - mae.AssertExpectations(t) -} - -func TestHashCollectionNSEventOk(t *testing.T) { - mae := &spieventsmocks.Manager{} - o := &orchestrator{ - adminEvents: mae, - } - mae.On("Dispatch", mock.Anything).Return() - o.HashCollectionNSEvent(database.CollectionGroups, core.ChangeEventTypeDeleted, "ns1", fftypes.NewRandB32()) - mae.AssertExpectations(t) } diff --git a/mocks/databasemocks/plugin.go b/mocks/databasemocks/plugin.go index 93c7c0afbc..37e33d2399 100644 --- a/mocks/databasemocks/plugin.go +++ b/mocks/databasemocks/plugin.go @@ -2460,9 +2460,9 @@ func (_m *Plugin) Name() string { return r0 } -// RegisterListener provides a mock function with given fields: callbacks -func (_m *Plugin) RegisterListener(callbacks database.Callbacks) { - _m.Called(callbacks) +// RegisterListener provides a mock function with given fields: listener +func (_m *Plugin) RegisterListener(listener database.Callbacks) { + _m.Called(listener) } // ReplaceMessage provides a mock function with given fields: ctx, message diff --git a/pkg/database/plugin.go b/pkg/database/plugin.go index 3cb940730b..b1ead72b20 100644 --- a/pkg/database/plugin.go +++ b/pkg/database/plugin.go @@ -54,7 +54,7 @@ type Plugin interface { Init(ctx context.Context, config config.Section) error // RegisterListener registers a listener to receive callbacks - RegisterListener(callbacks Callbacks) + RegisterListener(listener Callbacks) // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities