Skip to content

Commit

Permalink
Move SystemEvents interface into events/system package
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Aug 4, 2022
1 parent c17cea8 commit 2eedf09
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ $(eval $(call makemock, pkg/tokens, Plugin, tokenmock
$(eval $(call makemock, pkg/tokens, Callbacks, tokenmocks))
$(eval $(call makemock, internal/txcommon, Helper, txcommonmocks))
$(eval $(call makemock, internal/identity, Manager, identitymanagermocks))
$(eval $(call makemock, internal/sysmessaging, SystemEvents, sysmessagingmocks))
$(eval $(call makemock, internal/sysmessaging, MessageSender, sysmessagingmocks))
$(eval $(call makemock, internal/syncasync, Bridge, syncasyncmocks))
$(eval $(call makemock, internal/data, Manager, datamocks))
Expand All @@ -65,6 +64,7 @@ $(eval $(call makemock, internal/shareddownload, Callbacks, shareddow
$(eval $(call makemock, internal/definitions, Handler, definitionsmocks))
$(eval $(call makemock, internal/definitions, Sender, definitionsmocks))
$(eval $(call makemock, internal/events, EventManager, eventmocks))
$(eval $(call makemock, internal/events/system, EventInterface, systemeventmocks))
$(eval $(call makemock, internal/namespace, Manager, namespacemocks))
$(eval $(call makemock, internal/networkmap, Manager, networkmapmocks))
$(eval $(call makemock, internal/assets, Manager, assetmocks))
Expand Down
3 changes: 1 addition & 2 deletions internal/events/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/hyperledger/firefly/internal/multiparty"
"github.com/hyperledger/firefly/internal/privatemessaging"
"github.com/hyperledger/firefly/internal/shareddownload"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/blockchain"
"github.com/hyperledger/firefly/pkg/core"
Expand Down Expand Up @@ -84,7 +83,7 @@ type EventManager interface {
GetPlugins() []*core.NamespaceStatusPlugin

// Internal events
sysmessaging.SystemEvents
system.EventInterface
}

type eventManager struct {
Expand Down
4 changes: 4 additions & 0 deletions internal/events/system/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Events struct {
readAhead uint16
}

type EventInterface interface {
AddSystemEventListener(ns string, el EventListener) error
}

type EventListener func(event *core.EventDelivery) error

func (se *Events) Name() string { return SystemEventsTransport }
Expand Down
10 changes: 5 additions & 5 deletions internal/syncasync/sync_async_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ import (
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/data"
"github.com/hyperledger/firefly/internal/sysmessaging"
"github.com/hyperledger/firefly/internal/events/system"
"github.com/hyperledger/firefly/pkg/core"
"github.com/hyperledger/firefly/pkg/database"
)

// Bridge translates synchronous (HTTP API) calls, into asynchronously sending a
// message and blocking until a correlating response is received, or we hit a timeout.
type Bridge interface {
// Init is required as there's a bi-directional relationship between sysmessaging and syncasync bridge
Init(sysevents sysmessaging.SystemEvents)
// Init is required as there's a bi-directional relationship between event manager and syncasync bridge
Init(sysevents system.EventInterface)

// The following "WaitFor*" methods all wait for a particular type of event callback, and block until it is received.
// To use them, invoke the appropriate method, and pass a "send" callback that is expected to trigger the relevant event.
Expand Down Expand Up @@ -92,7 +92,7 @@ type syncAsyncBridge struct {
namespace string
database database.Plugin
data data.Manager
sysevents sysmessaging.SystemEvents
sysevents system.EventInterface
inflightMux sync.Mutex
inflight inflightRequestMap
}
Expand All @@ -108,7 +108,7 @@ func NewSyncAsyncBridge(ctx context.Context, ns string, di database.Plugin, dm d
return sa
}

func (sa *syncAsyncBridge) Init(sysevents sysmessaging.SystemEvents) {
func (sa *syncAsyncBridge) Init(sysevents system.EventInterface) {
sa.sysevents = sysevents
}

Expand Down
48 changes: 24 additions & 24 deletions internal/syncasync/sync_async_bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly/mocks/databasemocks"
"github.com/hyperledger/firefly/mocks/datamocks"
"github.com/hyperledger/firefly/mocks/sysmessagingmocks"
"github.com/hyperledger/firefly/mocks/systemeventmocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -35,7 +35,7 @@ func newTestSyncAsyncBridge(t *testing.T) (*syncAsyncBridge, func()) {
ctx, cancel := context.WithCancel(context.Background())
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mse := &sysmessagingmocks.SystemEvents{}
mse := &systemeventmocks.EventInterface{}
sa := NewSyncAsyncBridge(ctx, "ns1", mdi, mdm)
sa.Init(mse)
return sa.(*syncAsyncBridge), cancel
Expand All @@ -50,7 +50,7 @@ func TestRequestReplyOk(t *testing.T) {
replyID := fftypes.NewUUID()
dataID := fftypes.NewUUID()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestAwaitConfirmationOk(t *testing.T) {
requestID := fftypes.NewUUID()
dataID := fftypes.NewUUID()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestAwaitConfirmationRejected(t *testing.T) {
requestID := fftypes.NewUUID()
dataID := fftypes.NewUUID()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestRequestReplyTimeout(t *testing.T) {
sa, cancel := newTestSyncAsyncBridge(t)
cancel()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

_, err := sa.WaitForReply(sa.ctx, fftypes.NewUUID(), func(ctx context.Context) error {
Expand All @@ -209,7 +209,7 @@ func TestRequestSetupSystemListenerFail(t *testing.T) {
sa, cancel := newTestSyncAsyncBridge(t)
defer cancel()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(fmt.Errorf("pop"))

_, err := sa.WaitForReply(sa.ctx, fftypes.NewUUID(), func(ctx context.Context) error {
Expand All @@ -224,7 +224,7 @@ func TestEventCallbackNotInflight(t *testing.T) {
sa, cancel := newTestSyncAsyncBridge(t)
defer cancel()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

err := sa.eventCallback(&core.EventDelivery{
Expand Down Expand Up @@ -762,7 +762,7 @@ func TestAwaitTokenPoolConfirmation(t *testing.T) {

requestID := fftypes.NewUUID()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -802,7 +802,7 @@ func TestAwaitTokenPoolConfirmationSendFail(t *testing.T) {
sa, cancel := newTestSyncAsyncBridge(t)
defer cancel()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

_, err := sa.WaitForTokenPool(sa.ctx, fftypes.NewUUID(), func(ctx context.Context) error {
Expand Down Expand Up @@ -837,7 +837,7 @@ func TestAwaitTokenPoolConfirmationRejected(t *testing.T) {
},
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -870,7 +870,7 @@ func TestAwaitTokenTransferConfirmation(t *testing.T) {

requestID := fftypes.NewUUID()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -912,7 +912,7 @@ func TestAwaitTokenApprovalConfirmation(t *testing.T) {

requestID := fftypes.NewUUID()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -952,7 +952,7 @@ func TestAwaitTokenApprovalConfirmationSendFail(t *testing.T) {
sa, cancel := newTestSyncAsyncBridge(t)
defer cancel()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

_, err := sa.WaitForTokenApproval(sa.ctx, fftypes.NewUUID(), func(ctx context.Context) error {
Expand All @@ -966,7 +966,7 @@ func TestAwaitTokenTransferConfirmationSendFail(t *testing.T) {
sa, cancel := newTestSyncAsyncBridge(t)
defer cancel()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

_, err := sa.WaitForTokenTransfer(sa.ctx, fftypes.NewUUID(), func(ctx context.Context) error {
Expand All @@ -989,7 +989,7 @@ func TestAwaitFailedTokenPool(t *testing.T) {
Error: "pop",
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func TestAwaitFailedTokenTransfer(t *testing.T) {
Error: "pop",
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -1067,7 +1067,7 @@ func TestAwaitFailedTokenApproval(t *testing.T) {
Error: "pop",
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -1380,7 +1380,7 @@ func TestAwaitIdentityConfirmed(t *testing.T) {
},
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -1412,7 +1412,7 @@ func TestAwaitIdentityFail(t *testing.T) {

requestID := fftypes.NewUUID()

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

_, err := sa.WaitForIdentity(sa.ctx, requestID, func(ctx context.Context) error {
Expand All @@ -1432,7 +1432,7 @@ func TestAwaitInvokeOpSucceeded(t *testing.T) {
Status: core.OpStatusSucceeded,
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -1471,7 +1471,7 @@ func TestAwaitInvokeOpSucceededLookupFail(t *testing.T) {
},
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -1502,7 +1502,7 @@ func TestAwaitInvokeOpFailed(t *testing.T) {
Error: "pop",
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down Expand Up @@ -1540,7 +1540,7 @@ func TestAwaitInvokeOpFailedLookupFail(t *testing.T) {
},
}

mse := sa.sysevents.(*sysmessagingmocks.SystemEvents)
mse := sa.sysevents.(*systemeventmocks.EventInterface)
mse.On("AddSystemEventListener", "ns1", mock.Anything).Return(nil)

mdi := sa.database.(*databasemocks.Plugin)
Expand Down
24 changes: 0 additions & 24 deletions internal/sysmessaging/sysevents.go

This file was deleted.

4 changes: 2 additions & 2 deletions mocks/syncasyncmocks/bridge.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 2eedf09

Please sign in to comment.