Skip to content

Commit

Permalink
Database plugin can track multiple listeners
Browse files Browse the repository at this point in the history
Each listener will ignore events outside its namespace.

Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Jun 9, 2022
1 parent 615893b commit 25d3b20
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 112 deletions.
4 changes: 2 additions & 2 deletions internal/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (psql *Postgres) Init(ctx context.Context, config config.Section) error {
return psql.SQLCommon.Init(ctx, psql, config, capabilities)
}

func (psql *Postgres) RegisterListener(callbacks database.Callbacks) {
psql.SQLCommon.RegisterListener(callbacks)
func (psql *Postgres) RegisterListener(listener database.Callbacks) {
psql.SQLCommon.RegisterListener(listener)
}

func (psql *Postgres) Name() string {
Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/contractapis_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestContractAPIDBFailUpdate(t *testing.T) {
func TestUpsertContractAPIIDMismatch(t *testing.T) {
s, db := newMockProvider().init()
callbacks := &databasemocks.Callbacks{}
s.SQLCommon.callbacks = callbacks
s.RegisterListener(callbacks)
apiID := fftypes.NewUUID()
api := &core.ContractAPI{
ID: apiID,
Expand Down
40 changes: 37 additions & 3 deletions internal/database/sqlcommon/sqlcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,45 @@ import (
type SQLCommon struct {
db *sql.DB
capabilities *database.Capabilities
callbacks database.Callbacks
callbacks callbacks
provider Provider
features SQLFeatures
}

type callbacks struct {
listeners []database.Callbacks
}

func (cb *callbacks) OrderedUUIDCollectionNSEvent(resType database.OrderedUUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID, sequence int64) {
for _, cb := range cb.listeners {
cb.OrderedUUIDCollectionNSEvent(resType, eventType, ns, id, sequence)
}
}

func (cb *callbacks) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) {
for _, cb := range cb.listeners {
cb.OrderedCollectionNSEvent(resType, eventType, ns, sequence)
}
}

func (cb *callbacks) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID) {
for _, cb := range cb.listeners {
cb.UUIDCollectionNSEvent(resType, eventType, ns, id)
}
}

func (cb *callbacks) UUIDCollectionEvent(resType database.UUIDCollection, eventType core.ChangeEventType, id *fftypes.UUID) {
for _, cb := range cb.listeners {
cb.UUIDCollectionEvent(resType, eventType, id)
}
}

func (cb *callbacks) HashCollectionNSEvent(resType database.HashCollectionNS, eventType core.ChangeEventType, ns string, hash *fftypes.Bytes32) {
for _, cb := range cb.listeners {
cb.HashCollectionNSEvent(resType, eventType, ns, hash)
}
}

type txContextKey struct{}

type txWrapper struct {
Expand Down Expand Up @@ -96,8 +130,8 @@ func (s *SQLCommon) Init(ctx context.Context, provider Provider, config config.S
return nil
}

func (s *SQLCommon) RegisterListener(callbacks database.Callbacks) {
s.callbacks = callbacks
func (s *SQLCommon) RegisterListener(listener database.Callbacks) {
s.callbacks.listeners = append(s.callbacks.listeners, listener)
}

func (s *SQLCommon) Capabilities() *database.Capabilities { return s.capabilities }
Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/tokenpool_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestUpsertTokenPoolFailCommit(t *testing.T) {
func TestUpsertTokenPoolUpdateIDMismatch(t *testing.T) {
s, db := newMockProvider().init()
callbacks := &databasemocks.Callbacks{}
s.SQLCommon.callbacks = callbacks
s.RegisterListener(callbacks)
poolID := fftypes.NewUUID()
pool := &core.TokenPool{
ID: poolID,
Expand Down
4 changes: 2 additions & 2 deletions internal/database/sqlite3/sqlite3.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (sqlite *SQLite3) Init(ctx context.Context, config config.Section) error {
return sqlite.SQLCommon.Init(ctx, sqlite, config, capabilities)
}

func (sqlite *SQLite3) RegisterListener(callbacks database.Callbacks) {
sqlite.SQLCommon.RegisterListener(callbacks)
func (sqlite *SQLite3) RegisterListener(listener database.Callbacks) {
sqlite.SQLCommon.RegisterListener(listener)
}

func (sqlite *SQLite3) Name() string {
Expand Down
3 changes: 2 additions & 1 deletion internal/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (nm *namespaceManager) Init(ctx context.Context, cancelCtx context.CancelFu
func (nm *namespaceManager) initNamespace(name string, ns *namespace) error {
or := nm.utOrchestrator
if or == nil {
or = orchestrator.NewOrchestrator(name, ns.config, ns.plugins, nm.metrics, nm.adminEvents)
or = orchestrator.NewOrchestrator(name, ns.config, ns.plugins, nm.metrics)
}
if err := or.Init(nm.ctx, nm.cancelCtx); err != nil {
return err
Expand Down Expand Up @@ -553,6 +553,7 @@ func (nm *namespaceManager) initPlugins(ctx context.Context) (err error) {
if err = entry.plugin.Init(ctx, entry.config); err != nil {
return err
}
entry.plugin.RegisterListener(nm)
}
for _, entry := range nm.plugins.blockchain {
if err = entry.plugin.Init(ctx, entry.config, nm.metrics); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/namespace/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestInit(t *testing.T) {
nm.utOrchestrator = mo

nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil)
nm.mdi.On("RegisterListener", mock.Anything).Return()
nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil)
nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil)
nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -149,6 +150,7 @@ func TestInitVersion1(t *testing.T) {
nm.utOrchestrator = mo

nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil)
nm.mdi.On("RegisterListener", mock.Anything).Return()
nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil)
nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil)
nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -174,6 +176,7 @@ func TestInitVersion1Fail(t *testing.T) {
nm.utOrchestrator = mo

nm.mdi.On("Init", mock.Anything, mock.Anything).Return(nil)
nm.mdi.On("RegisterListener", mock.Anything).Return()
nm.mbi.On("Init", mock.Anything, mock.Anything, nm.mmi).Return(nil)
nm.mdx.On("Init", mock.Anything, mock.Anything).Return(nil)
nm.mps.On("Init", mock.Anything, mock.Anything).Return(nil)
Expand Down
73 changes: 73 additions & 0 deletions internal/namespace/persistence_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package namespace

import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly/pkg/core"
"github.com/hyperledger/firefly/pkg/database"
)

func (nm *namespaceManager) OrderedUUIDCollectionNSEvent(resType database.OrderedUUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID, sequence int64) {
var ces *int64
if eventType == core.ChangeEventTypeCreated {
// Sequence is only provided on create events
ces = &sequence
}
nm.adminEvents.Dispatch(&core.ChangeEvent{
Collection: string(resType),
Type: eventType,
Namespace: ns,
ID: id,
Sequence: ces,
})
}

func (nm *namespaceManager) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) {
nm.adminEvents.Dispatch(&core.ChangeEvent{
Collection: string(resType),
Type: eventType,
Namespace: ns,
Sequence: &sequence,
})
}

func (nm *namespaceManager) UUIDCollectionNSEvent(resType database.UUIDCollectionNS, eventType core.ChangeEventType, ns string, id *fftypes.UUID) {
nm.adminEvents.Dispatch(&core.ChangeEvent{
Collection: string(resType),
Type: eventType,
Namespace: ns,
ID: id,
})
}

func (nm *namespaceManager) UUIDCollectionEvent(resType database.UUIDCollection, eventType core.ChangeEventType, id *fftypes.UUID) {
nm.adminEvents.Dispatch(&core.ChangeEvent{
Collection: string(resType),
Type: eventType,
ID: id,
})
}

func (nm *namespaceManager) HashCollectionNSEvent(resType database.HashCollectionNS, eventType core.ChangeEventType, ns string, hash *fftypes.Bytes32) {
nm.adminEvents.Dispatch(&core.ChangeEvent{
Collection: string(resType),
Type: eventType,
Namespace: ns,
Hash: hash,
})
}
107 changes: 107 additions & 0 deletions internal/namespace/persistence_events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright © 2021 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package namespace

import (
"testing"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly/mocks/spieventsmocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/hyperledger/firefly/pkg/database"
"github.com/stretchr/testify/mock"
)

func TestMessageCreated(t *testing.T) {
mae := &spieventsmocks.Manager{}
nm := &namespaceManager{
adminEvents: mae,
}
mae.On("Dispatch", mock.Anything).Return()
nm.OrderedUUIDCollectionNSEvent(database.CollectionMessages, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID(), 12345)
mae.AssertExpectations(t)
}

func TestPinCreated(t *testing.T) {
mae := &spieventsmocks.Manager{}
nm := &namespaceManager{
adminEvents: mae,
}
mae.On("Dispatch", mock.Anything).Return()
nm.OrderedCollectionNSEvent(database.CollectionPins, core.ChangeEventTypeCreated, "ns1", 12345)
mae.AssertExpectations(t)
}

func TestEventCreated(t *testing.T) {
mae := &spieventsmocks.Manager{}
nm := &namespaceManager{
adminEvents: mae,
}
mae.On("Dispatch", mock.Anything).Return()
nm.OrderedUUIDCollectionNSEvent(database.CollectionEvents, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID(), 12345)
mae.AssertExpectations(t)
}

func TestSubscriptionCreated(t *testing.T) {
mae := &spieventsmocks.Manager{}
nm := &namespaceManager{
adminEvents: mae,
}
mae.On("Dispatch", mock.Anything).Return()
nm.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeCreated, "ns1", fftypes.NewUUID())
mae.AssertExpectations(t)
}

func TestSubscriptionUpdated(t *testing.T) {
mae := &spieventsmocks.Manager{}
nm := &namespaceManager{
adminEvents: mae,
}
mae.On("Dispatch", mock.Anything).Return()
nm.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeUpdated, "ns1", fftypes.NewUUID())
mae.AssertExpectations(t)
}

func TestSubscriptionDeleted(t *testing.T) {
mae := &spieventsmocks.Manager{}
nm := &namespaceManager{
adminEvents: mae,
}
mae.On("Dispatch", mock.Anything).Return()
nm.UUIDCollectionNSEvent(database.CollectionSubscriptions, core.ChangeEventTypeDeleted, "ns1", fftypes.NewUUID())
mae.AssertExpectations(t)
}

func TestUUIDCollectionEventFull(t *testing.T) {
mae := &spieventsmocks.Manager{}
nm := &namespaceManager{
adminEvents: mae,
}
mae.On("Dispatch", mock.Anything).Return()
nm.UUIDCollectionEvent(database.CollectionNamespaces, core.ChangeEventTypeDeleted, fftypes.NewUUID())
mae.AssertExpectations(t)
}

func TestHashCollectionNSEventOk(t *testing.T) {
mae := &spieventsmocks.Manager{}
nm := &namespaceManager{
adminEvents: mae,
}
mae.On("Dispatch", mock.Anything).Return()
nm.HashCollectionNSEvent(database.CollectionGroups, core.ChangeEventTypeDeleted, "ns1", fftypes.NewRandB32())
mae.AssertExpectations(t)
}
13 changes: 5 additions & 8 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/hyperledger/firefly/internal/operations"
"github.com/hyperledger/firefly/internal/privatemessaging"
"github.com/hyperledger/firefly/internal/shareddownload"
"github.com/hyperledger/firefly/internal/spievents"
"github.com/hyperledger/firefly/internal/syncasync"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/blockchain"
Expand Down Expand Up @@ -193,16 +192,14 @@ type orchestrator struct {
operations operations.Manager
sharedDownload shareddownload.Manager
txHelper txcommon.Helper
adminEvents spievents.Manager
}

func NewOrchestrator(ns string, config Config, plugins Plugins, metrics metrics.Manager, adminEvents spievents.Manager) Orchestrator {
func NewOrchestrator(ns string, config Config, plugins Plugins, metrics metrics.Manager) Orchestrator {
or := &orchestrator{
namespace: ns,
config: config,
plugins: plugins,
metrics: metrics,
adminEvents: adminEvents,
namespace: ns,
config: config,
plugins: plugins,
metrics: metrics,
}
return or
}
Expand Down
2 changes: 0 additions & 2 deletions internal/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ func newTestOrchestrator() *testOrchestrator {
tor.orchestrator.operations = tor.mom
tor.orchestrator.batchpin = tor.mbp
tor.orchestrator.sharedDownload = tor.msd
tor.orchestrator.adminEvents = tor.mae
tor.orchestrator.txHelper = tor.mth
tor.orchestrator.definitions = tor.mdh
tor.orchestrator.plugins.Blockchain.Plugin = tor.mbi
Expand Down Expand Up @@ -179,7 +178,6 @@ func TestNewOrchestrator(t *testing.T) {
Config{},
Plugins{},
&metricsmocks.Manager{},
&spieventsmocks.Manager{},
)
assert.NotNil(t, or)
}
Expand Down
Loading

0 comments on commit 25d3b20

Please sign in to comment.