From e03dcee4389e6f00a8b64268c61bf5664671e6cf Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Fri, 20 Sep 2024 14:46:39 +0200 Subject: [PATCH 1/6] The first files for the shard manager service created --- cmd/server/cadence/server.go | 3 + common/metrics/defs.go | 1 + common/service/metrics.go | 2 + common/service/name.go | 4 +- common/service/name_test.go | 4 +- config/development.yaml | 11 +++ service/shardmanager/config/config.go | 49 ++++++++++++ service/shardmanager/service.go | 106 ++++++++++++++++++++++++++ 8 files changed, 177 insertions(+), 3 deletions(-) create mode 100644 service/shardmanager/config/config.go create mode 100644 service/shardmanager/service.go diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 599e7418d69..f1a0829b1ec 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -53,6 +53,7 @@ import ( "github.com/uber/cadence/service/frontend" "github.com/uber/cadence/service/history" "github.com/uber/cadence/service/matching" + "github.com/uber/cadence/service/shardmanager" "github.com/uber/cadence/service/worker" ) @@ -322,6 +323,8 @@ func (s *server) startService() common.Daemon { daemon, err = matching.NewService(¶ms) case service.Worker: daemon, err = worker.NewService(¶ms) + case service.ShardManager: + daemon, err = shardmanager.NewService(¶ms) } if err != nil { params.Logger.Fatal("Fail to start "+s.name+" service ", tag.Error(err)) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 7d5a4194272..edeb33614d1 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -67,6 +67,7 @@ const ( History Matching Worker + ShardManager NumServices ) diff --git a/common/service/metrics.go b/common/service/metrics.go index 4f7068b29d6..cc3baf9283c 100644 --- a/common/service/metrics.go +++ b/common/service/metrics.go @@ -36,6 +36,8 @@ func GetMetricsServiceIdx(serviceName string, logger log.Logger) metrics.Service return metrics.Matching case Worker: return metrics.Worker + case ShardManager: + return metrics.ShardManager default: logger.Fatal("Unknown service name for metrics!") } diff --git a/common/service/name.go b/common/service/name.go index 2bc8c8bee92..a7821625f94 100644 --- a/common/service/name.go +++ b/common/service/name.go @@ -33,10 +33,12 @@ const ( Matching = "cadence-matching" // Worker is the name of the worker service Worker = "cadence-worker" + // ShardManager is the name of the shard manager service + ShardManager = "cadence-shard-manager" ) // List contains the list of all cadence services -var List = []string{Frontend, History, Matching, Worker} +var List = []string{Frontend, History, Matching, Worker, ShardManager} // ShortName returns cadence service name without "cadence-" prefix func ShortName(name string) string { diff --git a/common/service/name_test.go b/common/service/name_test.go index f8faea48fcd..1f683e7a1a9 100644 --- a/common/service/name_test.go +++ b/common/service/name_test.go @@ -36,6 +36,6 @@ func TestServiceNames(t *testing.T) { assert.Equal(t, fullName, FullName(shortName)) assert.Equal(t, fullName, FullName(fullName)) - assert.Equal(t, []string{"cadence-frontend", "cadence-history", "cadence-matching", "cadence-worker"}, List) - assert.Equal(t, []string{"frontend", "history", "matching", "worker"}, ShortNames(List)) + assert.Equal(t, []string{"cadence-frontend", "cadence-history", "cadence-matching", "cadence-worker", "cadence-shard-manager"}, List) + assert.Equal(t, []string{"frontend", "history", "matching", "worker", "shard-manager"}, ShortNames(List)) } diff --git a/config/development.yaml b/config/development.yaml index 2ff127df3bf..08e6eb59931 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -75,6 +75,17 @@ services: pprof: port: 7940 + shard-manager: + rpc: + port: 7941 + bindOnLocalHost: true + metrics: + statsd: + hostPort: "127.0.0.1:8125" + prefix: "cadence" + pprof: + port: 7942 + clusterGroupMetadata: failoverVersionIncrement: 10 primaryClusterName: "cluster0" diff --git a/service/shardmanager/config/config.go b/service/shardmanager/config/config.go new file mode 100644 index 00000000000..384d77ca1cf --- /dev/null +++ b/service/shardmanager/config/config.go @@ -0,0 +1,49 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package config + +import ( + "github.com/uber/cadence/common/dynamicconfig" +) + +type ( + // Config represents configuration for shard manager service + Config struct { + PersistenceMaxQPS dynamicconfig.IntPropertyFn + PersistenceGlobalMaxQPS dynamicconfig.IntPropertyFn + ThrottledLogRPS dynamicconfig.IntPropertyFn + + // hostname info + HostName string + } +) + +// NewConfig returns new service config with default values +func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { + return &Config{ + PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceMaxQPS), + PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceGlobalMaxQPS), + ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.MatchingThrottledLogRPS), + HostName: hostName, + } +} diff --git a/service/shardmanager/service.go b/service/shardmanager/service.go new file mode 100644 index 00000000000..96d1ab4906b --- /dev/null +++ b/service/shardmanager/service.go @@ -0,0 +1,106 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package shardmanager + +import ( + "sync/atomic" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/service/shardmanager/config" +) + +// Service represents the shard manager service +type Service struct { + resource.Resource + + status int32 + // handler handler.Handler + stopC chan struct{} + config *config.Config +} + +// NewService builds a new task manager service +func NewService( + params *resource.Params, +) (resource.Resource, error) { + + serviceConfig := config.NewConfig( + dynamicconfig.NewCollection( + params.DynamicConfig, + params.Logger, + dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()), + ), + params.HostName, + ) + + serviceResource, err := resource.New( + params, + service.ShardManager, + &service.Config{ + PersistenceMaxQPS: serviceConfig.PersistenceMaxQPS, + PersistenceGlobalMaxQPS: serviceConfig.PersistenceGlobalMaxQPS, + ThrottledLoggerMaxRPS: serviceConfig.ThrottledLogRPS, + IsErrorRetryableFunction: common.IsServiceTransientError, + // matching doesn't need visibility config as it never read or write visibility + }, + ) + if err != nil { + return nil, err + } + + return &Service{ + Resource: serviceResource, + status: common.DaemonStatusInitialized, + config: serviceConfig, + stopC: make(chan struct{}), + }, nil +} + +// Start starts the service +func (s *Service) Start() { + if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { + return + } + + logger := s.GetLogger() + logger.Info("shard manager starting") + + // setup the handler + + s.Resource.Start() + + <-s.stopC +} + +func (s *Service) Stop() { + if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { + return + } + + close(s.stopC) + + s.Resource.Stop() + + s.GetLogger().Info("shard manager stopped") +} From e1fb30977585cef6343a75eab03f74abb89c2f44 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Thu, 26 Sep 2024 12:48:56 +0200 Subject: [PATCH 2/6] Added testing for the shard manager startup --- common/resource/resource.go | 10 + common/resource/resourceImpl.go | 9 + common/resource/resource_mock.go | 683 +++++++++++++++++++++++++++ common/rpc.go | 2 + common/rpc_mock.go | 63 +++ service/shardmanager/service.go | 5 +- service/shardmanager/service_test.go | 61 +++ 7 files changed, 831 insertions(+), 2 deletions(-) create mode 100644 common/resource/resource_mock.go create mode 100644 common/rpc_mock.go create mode 100644 service/shardmanager/service_test.go diff --git a/common/resource/resource.go b/common/resource/resource.go index 75c70b1c257..80dfb8070e0 100644 --- a/common/resource/resource.go +++ b/common/resource/resource.go @@ -18,9 +18,12 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination resource_mock.go -self_package github.com/uber/cadence/common/resource + package resource import ( + "github.com/uber/cadence/common/service" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/yarpc" @@ -50,6 +53,13 @@ import ( "github.com/uber/cadence/common/quotas/global/rpc" ) +type ResourceFactory interface { + NewResource(params *Params, + serviceName string, + serviceConfig *service.Config, + ) (resource Resource, err error) +} + type ( // Resource is the interface which expose common resources Resource interface { diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index ab842b113f7..91ca8c660f6 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -63,6 +63,15 @@ import ( "github.com/uber/cadence/common/service" ) +type resourceImplFactory struct{} + +func (*resourceImplFactory) NewResource(params *Params, + serviceName string, + serviceConfig *service.Config, +) (resource Resource, err error) { + return New(params, serviceName, serviceConfig) +} + type ( // VisibilityManagerInitializer is the function each service should implement diff --git a/common/resource/resource_mock.go b/common/resource/resource_mock.go new file mode 100644 index 00000000000..980e5a6cb3f --- /dev/null +++ b/common/resource/resource_mock.go @@ -0,0 +1,683 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: resource.go + +// Package resource is a generated GoMock package. +package resource + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + client "github.com/uber/cadence/client" + admin "github.com/uber/cadence/client/admin" + frontend "github.com/uber/cadence/client/frontend" + history "github.com/uber/cadence/client/history" + matching "github.com/uber/cadence/client/matching" + archiver "github.com/uber/cadence/common/archiver" + provider "github.com/uber/cadence/common/archiver/provider" + queue "github.com/uber/cadence/common/asyncworkflow/queue" + blobstore "github.com/uber/cadence/common/blobstore" + cache "github.com/uber/cadence/common/cache" + clock "github.com/uber/cadence/common/clock" + cluster "github.com/uber/cadence/common/cluster" + domain "github.com/uber/cadence/common/domain" + configstore "github.com/uber/cadence/common/dynamicconfig/configstore" + isolationgroup "github.com/uber/cadence/common/isolationgroup" + log "github.com/uber/cadence/common/log" + membership "github.com/uber/cadence/common/membership" + messaging "github.com/uber/cadence/common/messaging" + metrics "github.com/uber/cadence/common/metrics" + partition "github.com/uber/cadence/common/partition" + persistence "github.com/uber/cadence/common/persistence" + client0 "github.com/uber/cadence/common/persistence/client" + rpc "github.com/uber/cadence/common/quotas/global/rpc" + service "github.com/uber/cadence/common/service" + workflowserviceclient "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + yarpc "go.uber.org/yarpc" +) + +// MockResourceFactory is a mock of ResourceFactory interface. +type MockResourceFactory struct { + ctrl *gomock.Controller + recorder *MockResourceFactoryMockRecorder +} + +// MockResourceFactoryMockRecorder is the mock recorder for MockResourceFactory. +type MockResourceFactoryMockRecorder struct { + mock *MockResourceFactory +} + +// NewMockResourceFactory creates a new mock instance. +func NewMockResourceFactory(ctrl *gomock.Controller) *MockResourceFactory { + mock := &MockResourceFactory{ctrl: ctrl} + mock.recorder = &MockResourceFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockResourceFactory) EXPECT() *MockResourceFactoryMockRecorder { + return m.recorder +} + +// NewResource mocks base method. +func (m *MockResourceFactory) NewResource(params *Params, serviceName string, serviceConfig *service.Config) (Resource, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewResource", params, serviceName, serviceConfig) + ret0, _ := ret[0].(Resource) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewResource indicates an expected call of NewResource. +func (mr *MockResourceFactoryMockRecorder) NewResource(params, serviceName, serviceConfig interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewResource", reflect.TypeOf((*MockResourceFactory)(nil).NewResource), params, serviceName, serviceConfig) +} + +// MockResource is a mock of Resource interface. +type MockResource struct { + ctrl *gomock.Controller + recorder *MockResourceMockRecorder +} + +// MockResourceMockRecorder is the mock recorder for MockResource. +type MockResourceMockRecorder struct { + mock *MockResource +} + +// NewMockResource creates a new mock instance. +func NewMockResource(ctrl *gomock.Controller) *MockResource { + mock := &MockResource{ctrl: ctrl} + mock.recorder = &MockResourceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockResource) EXPECT() *MockResourceMockRecorder { + return m.recorder +} + +// GetArchivalMetadata mocks base method. +func (m *MockResource) GetArchivalMetadata() archiver.ArchivalMetadata { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetArchivalMetadata") + ret0, _ := ret[0].(archiver.ArchivalMetadata) + return ret0 +} + +// GetArchivalMetadata indicates an expected call of GetArchivalMetadata. +func (mr *MockResourceMockRecorder) GetArchivalMetadata() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetArchivalMetadata", reflect.TypeOf((*MockResource)(nil).GetArchivalMetadata)) +} + +// GetArchiverProvider mocks base method. +func (m *MockResource) GetArchiverProvider() provider.ArchiverProvider { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetArchiverProvider") + ret0, _ := ret[0].(provider.ArchiverProvider) + return ret0 +} + +// GetArchiverProvider indicates an expected call of GetArchiverProvider. +func (mr *MockResourceMockRecorder) GetArchiverProvider() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetArchiverProvider", reflect.TypeOf((*MockResource)(nil).GetArchiverProvider)) +} + +// GetAsyncWorkflowQueueProvider mocks base method. +func (m *MockResource) GetAsyncWorkflowQueueProvider() queue.Provider { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAsyncWorkflowQueueProvider") + ret0, _ := ret[0].(queue.Provider) + return ret0 +} + +// GetAsyncWorkflowQueueProvider indicates an expected call of GetAsyncWorkflowQueueProvider. +func (mr *MockResourceMockRecorder) GetAsyncWorkflowQueueProvider() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAsyncWorkflowQueueProvider", reflect.TypeOf((*MockResource)(nil).GetAsyncWorkflowQueueProvider)) +} + +// GetBlobstoreClient mocks base method. +func (m *MockResource) GetBlobstoreClient() blobstore.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBlobstoreClient") + ret0, _ := ret[0].(blobstore.Client) + return ret0 +} + +// GetBlobstoreClient indicates an expected call of GetBlobstoreClient. +func (mr *MockResourceMockRecorder) GetBlobstoreClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlobstoreClient", reflect.TypeOf((*MockResource)(nil).GetBlobstoreClient)) +} + +// GetClientBean mocks base method. +func (m *MockResource) GetClientBean() client.Bean { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetClientBean") + ret0, _ := ret[0].(client.Bean) + return ret0 +} + +// GetClientBean indicates an expected call of GetClientBean. +func (mr *MockResourceMockRecorder) GetClientBean() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClientBean", reflect.TypeOf((*MockResource)(nil).GetClientBean)) +} + +// GetClusterMetadata mocks base method. +func (m *MockResource) GetClusterMetadata() cluster.Metadata { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetClusterMetadata") + ret0, _ := ret[0].(cluster.Metadata) + return ret0 +} + +// GetClusterMetadata indicates an expected call of GetClusterMetadata. +func (mr *MockResourceMockRecorder) GetClusterMetadata() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusterMetadata", reflect.TypeOf((*MockResource)(nil).GetClusterMetadata)) +} + +// GetDispatcher mocks base method. +func (m *MockResource) GetDispatcher() *yarpc.Dispatcher { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDispatcher") + ret0, _ := ret[0].(*yarpc.Dispatcher) + return ret0 +} + +// GetDispatcher indicates an expected call of GetDispatcher. +func (mr *MockResourceMockRecorder) GetDispatcher() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDispatcher", reflect.TypeOf((*MockResource)(nil).GetDispatcher)) +} + +// GetDomainCache mocks base method. +func (m *MockResource) GetDomainCache() cache.DomainCache { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDomainCache") + ret0, _ := ret[0].(cache.DomainCache) + return ret0 +} + +// GetDomainCache indicates an expected call of GetDomainCache. +func (mr *MockResourceMockRecorder) GetDomainCache() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDomainCache", reflect.TypeOf((*MockResource)(nil).GetDomainCache)) +} + +// GetDomainManager mocks base method. +func (m *MockResource) GetDomainManager() persistence.DomainManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDomainManager") + ret0, _ := ret[0].(persistence.DomainManager) + return ret0 +} + +// GetDomainManager indicates an expected call of GetDomainManager. +func (mr *MockResourceMockRecorder) GetDomainManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDomainManager", reflect.TypeOf((*MockResource)(nil).GetDomainManager)) +} + +// GetDomainMetricsScopeCache mocks base method. +func (m *MockResource) GetDomainMetricsScopeCache() cache.DomainMetricsScopeCache { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDomainMetricsScopeCache") + ret0, _ := ret[0].(cache.DomainMetricsScopeCache) + return ret0 +} + +// GetDomainMetricsScopeCache indicates an expected call of GetDomainMetricsScopeCache. +func (mr *MockResourceMockRecorder) GetDomainMetricsScopeCache() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDomainMetricsScopeCache", reflect.TypeOf((*MockResource)(nil).GetDomainMetricsScopeCache)) +} + +// GetDomainReplicationQueue mocks base method. +func (m *MockResource) GetDomainReplicationQueue() domain.ReplicationQueue { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDomainReplicationQueue") + ret0, _ := ret[0].(domain.ReplicationQueue) + return ret0 +} + +// GetDomainReplicationQueue indicates an expected call of GetDomainReplicationQueue. +func (mr *MockResourceMockRecorder) GetDomainReplicationQueue() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDomainReplicationQueue", reflect.TypeOf((*MockResource)(nil).GetDomainReplicationQueue)) +} + +// GetExecutionManager mocks base method. +func (m *MockResource) GetExecutionManager(arg0 int) (persistence.ExecutionManager, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetExecutionManager", arg0) + ret0, _ := ret[0].(persistence.ExecutionManager) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetExecutionManager indicates an expected call of GetExecutionManager. +func (mr *MockResourceMockRecorder) GetExecutionManager(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetExecutionManager", reflect.TypeOf((*MockResource)(nil).GetExecutionManager), arg0) +} + +// GetFrontendClient mocks base method. +func (m *MockResource) GetFrontendClient() frontend.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetFrontendClient") + ret0, _ := ret[0].(frontend.Client) + return ret0 +} + +// GetFrontendClient indicates an expected call of GetFrontendClient. +func (mr *MockResourceMockRecorder) GetFrontendClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFrontendClient", reflect.TypeOf((*MockResource)(nil).GetFrontendClient)) +} + +// GetFrontendRawClient mocks base method. +func (m *MockResource) GetFrontendRawClient() frontend.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetFrontendRawClient") + ret0, _ := ret[0].(frontend.Client) + return ret0 +} + +// GetFrontendRawClient indicates an expected call of GetFrontendRawClient. +func (mr *MockResourceMockRecorder) GetFrontendRawClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFrontendRawClient", reflect.TypeOf((*MockResource)(nil).GetFrontendRawClient)) +} + +// GetHistoryClient mocks base method. +func (m *MockResource) GetHistoryClient() history.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryClient") + ret0, _ := ret[0].(history.Client) + return ret0 +} + +// GetHistoryClient indicates an expected call of GetHistoryClient. +func (mr *MockResourceMockRecorder) GetHistoryClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryClient", reflect.TypeOf((*MockResource)(nil).GetHistoryClient)) +} + +// GetHistoryManager mocks base method. +func (m *MockResource) GetHistoryManager() persistence.HistoryManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryManager") + ret0, _ := ret[0].(persistence.HistoryManager) + return ret0 +} + +// GetHistoryManager indicates an expected call of GetHistoryManager. +func (mr *MockResourceMockRecorder) GetHistoryManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryManager", reflect.TypeOf((*MockResource)(nil).GetHistoryManager)) +} + +// GetHistoryRawClient mocks base method. +func (m *MockResource) GetHistoryRawClient() history.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHistoryRawClient") + ret0, _ := ret[0].(history.Client) + return ret0 +} + +// GetHistoryRawClient indicates an expected call of GetHistoryRawClient. +func (mr *MockResourceMockRecorder) GetHistoryRawClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHistoryRawClient", reflect.TypeOf((*MockResource)(nil).GetHistoryRawClient)) +} + +// GetHostInfo mocks base method. +func (m *MockResource) GetHostInfo() membership.HostInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHostInfo") + ret0, _ := ret[0].(membership.HostInfo) + return ret0 +} + +// GetHostInfo indicates an expected call of GetHostInfo. +func (mr *MockResourceMockRecorder) GetHostInfo() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHostInfo", reflect.TypeOf((*MockResource)(nil).GetHostInfo)) +} + +// GetHostName mocks base method. +func (m *MockResource) GetHostName() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetHostName") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetHostName indicates an expected call of GetHostName. +func (mr *MockResourceMockRecorder) GetHostName() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHostName", reflect.TypeOf((*MockResource)(nil).GetHostName)) +} + +// GetIsolationGroupState mocks base method. +func (m *MockResource) GetIsolationGroupState() isolationgroup.State { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetIsolationGroupState") + ret0, _ := ret[0].(isolationgroup.State) + return ret0 +} + +// GetIsolationGroupState indicates an expected call of GetIsolationGroupState. +func (mr *MockResourceMockRecorder) GetIsolationGroupState() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIsolationGroupState", reflect.TypeOf((*MockResource)(nil).GetIsolationGroupState)) +} + +// GetIsolationGroupStore mocks base method. +func (m *MockResource) GetIsolationGroupStore() configstore.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetIsolationGroupStore") + ret0, _ := ret[0].(configstore.Client) + return ret0 +} + +// GetIsolationGroupStore indicates an expected call of GetIsolationGroupStore. +func (mr *MockResourceMockRecorder) GetIsolationGroupStore() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIsolationGroupStore", reflect.TypeOf((*MockResource)(nil).GetIsolationGroupStore)) +} + +// GetLogger mocks base method. +func (m *MockResource) GetLogger() log.Logger { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLogger") + ret0, _ := ret[0].(log.Logger) + return ret0 +} + +// GetLogger indicates an expected call of GetLogger. +func (mr *MockResourceMockRecorder) GetLogger() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLogger", reflect.TypeOf((*MockResource)(nil).GetLogger)) +} + +// GetMatchingClient mocks base method. +func (m *MockResource) GetMatchingClient() matching.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMatchingClient") + ret0, _ := ret[0].(matching.Client) + return ret0 +} + +// GetMatchingClient indicates an expected call of GetMatchingClient. +func (mr *MockResourceMockRecorder) GetMatchingClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMatchingClient", reflect.TypeOf((*MockResource)(nil).GetMatchingClient)) +} + +// GetMatchingRawClient mocks base method. +func (m *MockResource) GetMatchingRawClient() matching.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMatchingRawClient") + ret0, _ := ret[0].(matching.Client) + return ret0 +} + +// GetMatchingRawClient indicates an expected call of GetMatchingRawClient. +func (mr *MockResourceMockRecorder) GetMatchingRawClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMatchingRawClient", reflect.TypeOf((*MockResource)(nil).GetMatchingRawClient)) +} + +// GetMembershipResolver mocks base method. +func (m *MockResource) GetMembershipResolver() membership.Resolver { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMembershipResolver") + ret0, _ := ret[0].(membership.Resolver) + return ret0 +} + +// GetMembershipResolver indicates an expected call of GetMembershipResolver. +func (mr *MockResourceMockRecorder) GetMembershipResolver() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMembershipResolver", reflect.TypeOf((*MockResource)(nil).GetMembershipResolver)) +} + +// GetMessagingClient mocks base method. +func (m *MockResource) GetMessagingClient() messaging.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMessagingClient") + ret0, _ := ret[0].(messaging.Client) + return ret0 +} + +// GetMessagingClient indicates an expected call of GetMessagingClient. +func (mr *MockResourceMockRecorder) GetMessagingClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMessagingClient", reflect.TypeOf((*MockResource)(nil).GetMessagingClient)) +} + +// GetMetricsClient mocks base method. +func (m *MockResource) GetMetricsClient() metrics.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricsClient") + ret0, _ := ret[0].(metrics.Client) + return ret0 +} + +// GetMetricsClient indicates an expected call of GetMetricsClient. +func (mr *MockResourceMockRecorder) GetMetricsClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricsClient", reflect.TypeOf((*MockResource)(nil).GetMetricsClient)) +} + +// GetPartitioner mocks base method. +func (m *MockResource) GetPartitioner() partition.Partitioner { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPartitioner") + ret0, _ := ret[0].(partition.Partitioner) + return ret0 +} + +// GetPartitioner indicates an expected call of GetPartitioner. +func (mr *MockResourceMockRecorder) GetPartitioner() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPartitioner", reflect.TypeOf((*MockResource)(nil).GetPartitioner)) +} + +// GetPayloadSerializer mocks base method. +func (m *MockResource) GetPayloadSerializer() persistence.PayloadSerializer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPayloadSerializer") + ret0, _ := ret[0].(persistence.PayloadSerializer) + return ret0 +} + +// GetPayloadSerializer indicates an expected call of GetPayloadSerializer. +func (mr *MockResourceMockRecorder) GetPayloadSerializer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPayloadSerializer", reflect.TypeOf((*MockResource)(nil).GetPayloadSerializer)) +} + +// GetPersistenceBean mocks base method. +func (m *MockResource) GetPersistenceBean() client0.Bean { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPersistenceBean") + ret0, _ := ret[0].(client0.Bean) + return ret0 +} + +// GetPersistenceBean indicates an expected call of GetPersistenceBean. +func (mr *MockResourceMockRecorder) GetPersistenceBean() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPersistenceBean", reflect.TypeOf((*MockResource)(nil).GetPersistenceBean)) +} + +// GetRatelimiterAggregatorsClient mocks base method. +func (m *MockResource) GetRatelimiterAggregatorsClient() rpc.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRatelimiterAggregatorsClient") + ret0, _ := ret[0].(rpc.Client) + return ret0 +} + +// GetRatelimiterAggregatorsClient indicates an expected call of GetRatelimiterAggregatorsClient. +func (mr *MockResourceMockRecorder) GetRatelimiterAggregatorsClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRatelimiterAggregatorsClient", reflect.TypeOf((*MockResource)(nil).GetRatelimiterAggregatorsClient)) +} + +// GetRemoteAdminClient mocks base method. +func (m *MockResource) GetRemoteAdminClient(cluster string) admin.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRemoteAdminClient", cluster) + ret0, _ := ret[0].(admin.Client) + return ret0 +} + +// GetRemoteAdminClient indicates an expected call of GetRemoteAdminClient. +func (mr *MockResourceMockRecorder) GetRemoteAdminClient(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRemoteAdminClient", reflect.TypeOf((*MockResource)(nil).GetRemoteAdminClient), cluster) +} + +// GetRemoteFrontendClient mocks base method. +func (m *MockResource) GetRemoteFrontendClient(cluster string) frontend.Client { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRemoteFrontendClient", cluster) + ret0, _ := ret[0].(frontend.Client) + return ret0 +} + +// GetRemoteFrontendClient indicates an expected call of GetRemoteFrontendClient. +func (mr *MockResourceMockRecorder) GetRemoteFrontendClient(cluster interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRemoteFrontendClient", reflect.TypeOf((*MockResource)(nil).GetRemoteFrontendClient), cluster) +} + +// GetSDKClient mocks base method. +func (m *MockResource) GetSDKClient() workflowserviceclient.Interface { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSDKClient") + ret0, _ := ret[0].(workflowserviceclient.Interface) + return ret0 +} + +// GetSDKClient indicates an expected call of GetSDKClient. +func (mr *MockResourceMockRecorder) GetSDKClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSDKClient", reflect.TypeOf((*MockResource)(nil).GetSDKClient)) +} + +// GetServiceName mocks base method. +func (m *MockResource) GetServiceName() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetServiceName") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetServiceName indicates an expected call of GetServiceName. +func (mr *MockResourceMockRecorder) GetServiceName() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceName", reflect.TypeOf((*MockResource)(nil).GetServiceName)) +} + +// GetShardManager mocks base method. +func (m *MockResource) GetShardManager() persistence.ShardManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetShardManager") + ret0, _ := ret[0].(persistence.ShardManager) + return ret0 +} + +// GetShardManager indicates an expected call of GetShardManager. +func (mr *MockResourceMockRecorder) GetShardManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShardManager", reflect.TypeOf((*MockResource)(nil).GetShardManager)) +} + +// GetTaskManager mocks base method. +func (m *MockResource) GetTaskManager() persistence.TaskManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTaskManager") + ret0, _ := ret[0].(persistence.TaskManager) + return ret0 +} + +// GetTaskManager indicates an expected call of GetTaskManager. +func (mr *MockResourceMockRecorder) GetTaskManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTaskManager", reflect.TypeOf((*MockResource)(nil).GetTaskManager)) +} + +// GetThrottledLogger mocks base method. +func (m *MockResource) GetThrottledLogger() log.Logger { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetThrottledLogger") + ret0, _ := ret[0].(log.Logger) + return ret0 +} + +// GetThrottledLogger indicates an expected call of GetThrottledLogger. +func (mr *MockResourceMockRecorder) GetThrottledLogger() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetThrottledLogger", reflect.TypeOf((*MockResource)(nil).GetThrottledLogger)) +} + +// GetTimeSource mocks base method. +func (m *MockResource) GetTimeSource() clock.TimeSource { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTimeSource") + ret0, _ := ret[0].(clock.TimeSource) + return ret0 +} + +// GetTimeSource indicates an expected call of GetTimeSource. +func (mr *MockResourceMockRecorder) GetTimeSource() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTimeSource", reflect.TypeOf((*MockResource)(nil).GetTimeSource)) +} + +// GetVisibilityManager mocks base method. +func (m *MockResource) GetVisibilityManager() persistence.VisibilityManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetVisibilityManager") + ret0, _ := ret[0].(persistence.VisibilityManager) + return ret0 +} + +// GetVisibilityManager indicates an expected call of GetVisibilityManager. +func (mr *MockResourceMockRecorder) GetVisibilityManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVisibilityManager", reflect.TypeOf((*MockResource)(nil).GetVisibilityManager)) +} + +// Start mocks base method. +func (m *MockResource) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockResourceMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockResource)(nil).Start)) +} + +// Stop mocks base method. +func (m *MockResource) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockResourceMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockResource)(nil).Stop)) +} diff --git a/common/rpc.go b/common/rpc.go index 4d48eeac8e4..3ac42837049 100644 --- a/common/rpc.go +++ b/common/rpc.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination rpc_mock.go -self_package github.com/uber/cadence/common + package common import ( diff --git a/common/rpc_mock.go b/common/rpc_mock.go new file mode 100644 index 00000000000..95d00863ffa --- /dev/null +++ b/common/rpc_mock.go @@ -0,0 +1,63 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: rpc.go + +// Package common is a generated GoMock package. +package common + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + yarpc "go.uber.org/yarpc" +) + +// MockRPCFactory is a mock of RPCFactory interface. +type MockRPCFactory struct { + ctrl *gomock.Controller + recorder *MockRPCFactoryMockRecorder +} + +// MockRPCFactoryMockRecorder is the mock recorder for MockRPCFactory. +type MockRPCFactoryMockRecorder struct { + mock *MockRPCFactory +} + +// NewMockRPCFactory creates a new mock instance. +func NewMockRPCFactory(ctrl *gomock.Controller) *MockRPCFactory { + mock := &MockRPCFactory{ctrl: ctrl} + mock.recorder = &MockRPCFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRPCFactory) EXPECT() *MockRPCFactoryMockRecorder { + return m.recorder +} + +// GetDispatcher mocks base method. +func (m *MockRPCFactory) GetDispatcher() *yarpc.Dispatcher { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDispatcher") + ret0, _ := ret[0].(*yarpc.Dispatcher) + return ret0 +} + +// GetDispatcher indicates an expected call of GetDispatcher. +func (mr *MockRPCFactoryMockRecorder) GetDispatcher() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDispatcher", reflect.TypeOf((*MockRPCFactory)(nil).GetDispatcher)) +} + +// GetMaxMessageSize mocks base method. +func (m *MockRPCFactory) GetMaxMessageSize() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMaxMessageSize") + ret0, _ := ret[0].(int) + return ret0 +} + +// GetMaxMessageSize indicates an expected call of GetMaxMessageSize. +func (mr *MockRPCFactoryMockRecorder) GetMaxMessageSize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMaxMessageSize", reflect.TypeOf((*MockRPCFactory)(nil).GetMaxMessageSize)) +} diff --git a/service/shardmanager/service.go b/service/shardmanager/service.go index 96d1ab4906b..24fe6e6dfac 100644 --- a/service/shardmanager/service.go +++ b/service/shardmanager/service.go @@ -43,6 +43,7 @@ type Service struct { // NewService builds a new task manager service func NewService( params *resource.Params, + factory resource.ResourceFactory, ) (resource.Resource, error) { serviceConfig := config.NewConfig( @@ -54,7 +55,7 @@ func NewService( params.HostName, ) - serviceResource, err := resource.New( + serviceResource, err := factory.NewResource( params, service.ShardManager, &service.Config{ @@ -62,7 +63,7 @@ func NewService( PersistenceGlobalMaxQPS: serviceConfig.PersistenceGlobalMaxQPS, ThrottledLoggerMaxRPS: serviceConfig.ThrottledLogRPS, IsErrorRetryableFunction: common.IsServiceTransientError, - // matching doesn't need visibility config as it never read or write visibility + // shard manager doesn't need visibility config as it never read or write visibility }, ) if err != nil { diff --git a/service/shardmanager/service_test.go b/service/shardmanager/service_test.go new file mode 100644 index 00000000000..9a596c17795 --- /dev/null +++ b/service/shardmanager/service_test.go @@ -0,0 +1,61 @@ +package shardmanager + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/resource" +) + +func TestNewService(t *testing.T) { + ctrl := gomock.NewController(t) + resourceMock := resource.NewMockResource(ctrl) + factoryMock := resource.NewMockResourceFactory(ctrl) + factoryMock.EXPECT().NewResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(resourceMock, nil).AnyTimes() + + service, err := NewService(&resource.Params{}, factoryMock) + assert.NoError(t, err) + assert.NotNil(t, service) +} + +func TestServiceStartStop(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + resourceMock := resource.NewMockResource(ctrl) + resourceMock.EXPECT().GetLogger().Return(log.NewNoop()).AnyTimes() + resourceMock.EXPECT().Start().Return() + resourceMock.EXPECT().Stop().Return() + + service := &Service{ + Resource: resourceMock, + status: common.DaemonStatusInitialized, + stopC: make(chan struct{}), + } + + go service.Start() + + time.Sleep(100 * time.Millisecond) // The code assumes the service is started when calling Stop + assert.Equal(t, int32(common.DaemonStatusStarted), atomic.LoadInt32(&service.status)) + + service.Stop() + assert.Equal(t, int32(common.DaemonStatusStopped), atomic.LoadInt32(&service.status)) +} + +func TestStartAndStopReturnsImmediatelyWhenAlreadyStopped(t *testing.T) { + + service := &Service{ + status: common.DaemonStatusStopped, + } + + service.Start() + assert.Equal(t, int32(common.DaemonStatusStopped), atomic.LoadInt32(&service.status)) + + service.Stop() + assert.Equal(t, int32(common.DaemonStatusStopped), atomic.LoadInt32(&service.status)) +} From ac547df66d02810155646d9d217f5149975786be Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Thu, 26 Sep 2024 13:29:41 +0200 Subject: [PATCH 3/6] make pr: --- common/resource/resource.go | 2 +- common/resource/resource_mock.go | 27 +++++++++++++++++++++++++-- common/rpc_mock.go | 22 ++++++++++++++++++++++ service/shardmanager/service_test.go | 23 +++++++++++++++++++++++ 4 files changed, 71 insertions(+), 3 deletions(-) diff --git a/common/resource/resource.go b/common/resource/resource.go index 80dfb8070e0..796b0846313 100644 --- a/common/resource/resource.go +++ b/common/resource/resource.go @@ -23,7 +23,6 @@ package resource import ( - "github.com/uber/cadence/common/service" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/yarpc" @@ -51,6 +50,7 @@ import ( "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" "github.com/uber/cadence/common/quotas/global/rpc" + "github.com/uber/cadence/common/service" ) type ResourceFactory interface { diff --git a/common/resource/resource_mock.go b/common/resource/resource_mock.go index 980e5a6cb3f..3e6c1a8d40d 100644 --- a/common/resource/resource_mock.go +++ b/common/resource/resource_mock.go @@ -1,3 +1,25 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + // Code generated by MockGen. DO NOT EDIT. // Source: resource.go @@ -8,6 +30,9 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" + workflowserviceclient "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + yarpc "go.uber.org/yarpc" + client "github.com/uber/cadence/client" admin "github.com/uber/cadence/client/admin" frontend "github.com/uber/cadence/client/frontend" @@ -32,8 +57,6 @@ import ( client0 "github.com/uber/cadence/common/persistence/client" rpc "github.com/uber/cadence/common/quotas/global/rpc" service "github.com/uber/cadence/common/service" - workflowserviceclient "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" - yarpc "go.uber.org/yarpc" ) // MockResourceFactory is a mock of ResourceFactory interface. diff --git a/common/rpc_mock.go b/common/rpc_mock.go index 95d00863ffa..ad16acb2ab5 100644 --- a/common/rpc_mock.go +++ b/common/rpc_mock.go @@ -1,3 +1,25 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + // Code generated by MockGen. DO NOT EDIT. // Source: rpc.go diff --git a/service/shardmanager/service_test.go b/service/shardmanager/service_test.go index 9a596c17795..cfd4f00c26a 100644 --- a/service/shardmanager/service_test.go +++ b/service/shardmanager/service_test.go @@ -1,3 +1,25 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + package shardmanager import ( @@ -7,6 +29,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/resource" From fb1324877829f126af9d7e9760636240a1f92bcc Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Fri, 27 Sep 2024 08:48:49 +0200 Subject: [PATCH 4/6] forgot to pass the new factory --- cmd/server/cadence/server.go | 2 +- common/resource/resourceImpl.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index f1a0829b1ec..fe6df173d27 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -324,7 +324,7 @@ func (s *server) startService() common.Daemon { case service.Worker: daemon, err = worker.NewService(¶ms) case service.ShardManager: - daemon, err = shardmanager.NewService(¶ms) + daemon, err = shardmanager.NewService(¶ms, resource.NewResourceFactory()) } if err != nil { params.Logger.Fatal("Fail to start "+s.name+" service ", tag.Error(err)) diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index 91ca8c660f6..36895d76e7d 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -63,6 +63,10 @@ import ( "github.com/uber/cadence/common/service" ) +func NewResourceFactory() ResourceFactory { + return &resourceImplFactory{} +} + type resourceImplFactory struct{} func (*resourceImplFactory) NewResource(params *Params, From b99ed900034f8ebe70e949ed70d6c77b5348096d Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Fri, 27 Sep 2024 09:46:23 +0200 Subject: [PATCH 5/6] Added obligatory dynamic configs for the shard manager service --- common/dynamicconfig/constants.go | 38 +++++++++++++++++++++++++++ service/shardmanager/config/config.go | 6 ++--- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index c46aee7b232..048fda94cd2 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1466,6 +1466,29 @@ const ( // Value type: Int // Default value: 100 ESAnalyzerMinNumWorkflowsForAvg + + // key for shard manager + + // ShardManagerPersistenceMaxQPS is the max qps a shard manager host can query DB + // KeyName: shardManager.persistenceMaxQPS + // Value type: Int + // Default value: 3000 + // Allowed filters: N/A + ShardManagerPersistenceMaxQPS + // ShardManagerPersistenceGlobalMaxQPS is the max qps matching cluster can query DB + // KeyName: shardManager.persistenceGlobalMaxQPS + // Value type: Int + // Default value: 0 + // Allowed filters: N/A + ShardManagerPersistenceGlobalMaxQPS + + // ShardManagerThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger + // KeyName: shardManager.throttledLogRPS + // Value type: Int + // Default value: 20 + // Allowed filters: N/A + ShardManagerThrottledLogRPS + // Usage: VisibilityArchivalQueryMaxRangeInDays is the maximum number of days for a visibility archival query // KeyName: N/A // Default value: N/A @@ -3921,6 +3944,21 @@ var IntKeys = map[IntKey]DynamicInt{ Description: "ESAnalyzerMinNumWorkflowsForAvg controls how many workflows to have at least to rely on workflow run time avg per type", DefaultValue: 100, }, + ShardManagerPersistenceMaxQPS: { + KeyName: "shardManager.persistenceMaxQPS", + Description: "ShardManagerPersistenceMaxQPS is the max qps shard manager host can query DB", + DefaultValue: 3000, + }, + ShardManagerPersistenceGlobalMaxQPS: { + KeyName: "shardManager.persistenceGlobalMaxQPS", + Description: "ShardManagerPersistenceGlobalMaxQPS is the max qps shard manager cluster can query DB", + DefaultValue: 0, + }, + ShardManagerThrottledLogRPS: { + KeyName: "shardManager.throttledLogRPS", + Description: "ShardManagerThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger", + DefaultValue: 20, + }, VisibilityArchivalQueryMaxRangeInDays: { KeyName: "frontend.visibilityArchivalQueryMaxRangeInDays", Description: "VisibilityArchivalQueryMaxRangeInDays is the maximum number of days for a visibility archival query", diff --git a/service/shardmanager/config/config.go b/service/shardmanager/config/config.go index 384d77ca1cf..7444267adb4 100644 --- a/service/shardmanager/config/config.go +++ b/service/shardmanager/config/config.go @@ -41,9 +41,9 @@ type ( // NewConfig returns new service config with default values func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { return &Config{ - PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceMaxQPS), - PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceGlobalMaxQPS), - ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.MatchingThrottledLogRPS), + PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.ShardManagerPersistenceMaxQPS), + PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.ShardManagerPersistenceGlobalMaxQPS), + ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.ShardManagerThrottledLogRPS), HostName: hostName, } } From a8709afecd96a1d8f2b2ae35f72b1afd6933f9b6 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Mon, 30 Sep 2024 13:35:32 +0200 Subject: [PATCH 6/6] - Added goleak - Renamed test --- service/shardmanager/service_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/service/shardmanager/service_test.go b/service/shardmanager/service_test.go index cfd4f00c26a..dd757fadf16 100644 --- a/service/shardmanager/service_test.go +++ b/service/shardmanager/service_test.go @@ -29,6 +29,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "go.uber.org/goleak" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" @@ -47,6 +48,8 @@ func TestNewService(t *testing.T) { } func TestServiceStartStop(t *testing.T) { + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -70,7 +73,7 @@ func TestServiceStartStop(t *testing.T) { assert.Equal(t, int32(common.DaemonStatusStopped), atomic.LoadInt32(&service.status)) } -func TestStartAndStopReturnsImmediatelyWhenAlreadyStopped(t *testing.T) { +func TestStartAndStopDoesNotChangeStatusWhenAlreadyStopped(t *testing.T) { service := &Service{ status: common.DaemonStatusStopped,