diff --git a/internal/data/data_manager.go b/internal/data/data_manager.go index b9ffaf5a6f..18d60f437a 100644 --- a/internal/data/data_manager.go +++ b/internal/data/data_manager.go @@ -51,6 +51,7 @@ type Manager interface { UploadBlob(ctx context.Context, inData *core.DataRefOrValue, blob *ffapi.Multipart, autoMeta bool) (*core.Data, error) DownloadBlob(ctx context.Context, dataID string) (*core.Blob, io.ReadCloser, error) HydrateBatch(ctx context.Context, persistedBatch *core.BatchPersisted) (*core.Batch, error) + Start() WaitStop() } @@ -123,10 +124,13 @@ func NewDataManager(ctx context.Context, ns core.NamespaceRef, di database.Plugi batchTimeout: config.GetDuration(coreconfig.MessageWriterBatchTimeout), maxInserts: config.GetInt(coreconfig.MessageWriterBatchMaxInserts), }) - dm.messageWriter.start() return dm, nil } +func (dm *dataManager) Start() { + dm.messageWriter.start() +} + func (dm *dataManager) CheckDatatype(ctx context.Context, datatype *core.Datatype) error { _, err := newJSONValidator(ctx, dm.namespace.LocalName, datatype) return err diff --git a/internal/data/data_manager_test.go b/internal/data/data_manager_test.go index 86477172b2..5493401334 100644 --- a/internal/data/data_manager_test.go +++ b/internal/data/data_manager_test.go @@ -44,6 +44,7 @@ func newTestDataManager(t *testing.T) (*dataManager, context.Context, func()) { ns := core.NamespaceRef{LocalName: "ns1", RemoteName: "ns1"} dm, err := NewDataManager(ctx, ns, mdi, mdx) assert.NoError(t, err) + dm.Start() return dm.(*dataManager), ctx, func() { cancel() dm.WaitStop() diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index f58dd7212b..10ab18cb88 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -245,6 +245,7 @@ func (or *orchestrator) tokens() map[string]tokens.Plugin { } func (or *orchestrator) Start() (err error) { + or.data.Start() if or.config.Multiparty.Enabled { err = or.multiparty.ConfigureContract(or.ctx) if err == nil { @@ -288,6 +289,10 @@ func (or *orchestrator) WaitStop() { or.sharedDownload.WaitStop() or.sharedDownload = nil } + if or.events != nil { + or.events.WaitStop() + or.events = nil + } if or.operations != nil { or.operations.WaitStop() or.operations = nil diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 3b08c7290b..5158da1d7e 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -367,6 +367,7 @@ func TestStartBatchFail(t *testing.T) { coreconfig.Reset() or := newTestOrchestrator() defer or.cleanup(t) + or.mdm.On("Start").Return(nil) or.mmp.On("ConfigureContract", mock.Anything).Return(nil) or.mba.On("Start").Return(fmt.Errorf("pop")) err := or.Start() @@ -377,6 +378,7 @@ func TestStartBlockchainsConfigureFail(t *testing.T) { coreconfig.Reset() or := newTestOrchestrator() defer or.cleanup(t) + or.mdm.On("Start").Return(nil) or.mmp.On("ConfigureContract", mock.Anything).Return(fmt.Errorf("pop")) err := or.Start() assert.EqualError(t, err, "pop") @@ -387,6 +389,7 @@ func TestStartStopOk(t *testing.T) { or := newTestOrchestrator() defer or.cleanup(t) or.mmp.On("ConfigureContract", mock.Anything).Return(nil) + or.mdm.On("Start").Return(nil) or.mba.On("Start").Return(nil) or.mem.On("Start").Return(nil) or.mbm.On("Start").Return(nil) @@ -397,6 +400,7 @@ func TestStartStopOk(t *testing.T) { or.mdm.On("WaitStop").Return(nil) or.msd.On("WaitStop").Return(nil) or.mom.On("WaitStop").Return(nil) + or.mem.On("WaitStop").Return(nil) err := or.Start() assert.NoError(t, err) or.WaitStop() diff --git a/mocks/datamocks/manager.go b/mocks/datamocks/manager.go index c857acfef0..6d98dc05d1 100644 --- a/mocks/datamocks/manager.go +++ b/mocks/datamocks/manager.go @@ -220,6 +220,11 @@ func (_m *Manager) ResolveInlineData(ctx context.Context, msg *data.NewMessage) return r0 } +// Start provides a mock function with given fields: +func (_m *Manager) Start() { + _m.Called() +} + // UpdateMessageCache provides a mock function with given fields: msg, _a1 func (_m *Manager) UpdateMessageCache(msg *core.Message, _a1 core.DataArray) { _m.Called(msg, _a1)