diff --git a/cmd/server/server.go b/cmd/server/server.go index dd48e3145a7..8647e197441 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -121,13 +121,13 @@ func (s *server) startService() common.Daemon { switch s.name { case frontendService: - daemon = frontend.NewService(¶ms, frontend.NewConfig()) + daemon = frontend.NewService(¶ms) case historyService: - daemon = history.NewService(¶ms, history.NewConfig(s.cfg.Cassandra.NumHistoryShards)) + daemon = history.NewService(¶ms) case matchingService: - daemon = matching.NewService(¶ms, matching.NewConfig()) + daemon = matching.NewService(¶ms) case workerService: - daemon = worker.NewService(¶ms, worker.NewConfig()) + daemon = worker.NewService(¶ms) } go execute(daemon, s.doneC) diff --git a/common/service/dynamicconfig/config.go b/common/service/dynamicconfig/config.go new file mode 100644 index 00000000000..a362b9cb0bf --- /dev/null +++ b/common/service/dynamicconfig/config.go @@ -0,0 +1,131 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dynamicconfig + +import ( + "errors" + "time" +) + +// Client allows fetching values from a dynamic configuration system NOTE: This does not have async +// options right now. In the interest of keeping it minimal, we can add when requirement arises. +type Client interface { + GetValue(name Key) (interface{}, error) + GetValueWithFilters(name Key, filters map[Filter]interface{}) (interface{}, error) +} + +type nopClient struct{} + +func (mc *nopClient) GetValue(name Key) (interface{}, error) { + return nil, errors.New("unable to find key") +} + +func (mc *nopClient) GetValueWithFilters( + name Key, filters map[Filter]interface{}, +) (interface{}, error) { + return nil, errors.New("unable to find key") +} + +// NewNopCollection creates a new nop collection +func NewNopCollection() *Collection { + return NewCollection(&nopClient{}) +} + +// NewCollection creates a new collection +func NewCollection(client Client) *Collection { + return &Collection{client} +} + +// Collection wraps dynamic config client with a closure so that across the code, the config values +// can be directly accessed by calling the function without propagating the client everywhere in +// code +type Collection struct { + client Client +} + +// GetIntPropertyWithTaskList gets property with taskList filter and asserts that it's an integer +func (c *Collection) GetIntPropertyWithTaskList(key Key, defaultVal int) func(string) int { + return func(taskList string) int { + return c.getPropertyWithStringFilter(key, defaultVal, TaskListName)(taskList).(int) + } +} + +// GetDurationPropertyWithTaskList gets property with taskList filter and asserts that it's time.Duration +func (c *Collection) GetDurationPropertyWithTaskList( + key Key, defaultVal time.Duration, +) func(string) time.Duration { + return func(taskList string) time.Duration { + return c.getPropertyWithStringFilter(key, defaultVal, TaskListName)(taskList).(time.Duration) + } +} + +func (c *Collection) getPropertyWithStringFilter( + key Key, defaultVal interface{}, filter Filter, +) func(string) interface{} { + return func(filterVal string) interface{} { + filters := make(map[Filter]interface{}) + filters[filter] = filterVal + val, err := c.client.GetValueWithFilters(key, filters) + if err != nil { + return defaultVal + } + return val + } +} + +// GetProperty gets a eface property and returns defaultVal if property is not found +func (c *Collection) GetProperty(key Key, defaultVal interface{}) func() interface{} { + return func() interface{} { + val, err := c.client.GetValue(key) + if err != nil { + return defaultVal + } + return val + } +} + +// GetIntProperty gets property and asserts that it's an integer +func (c *Collection) GetIntProperty(key Key, defaultVal int) func() int { + return func() int { + return c.GetProperty(key, defaultVal)().(int) + } +} + +// GetFloat64Property gets property and asserts that it's a float64 +func (c *Collection) GetFloat64Property(key Key, defaultVal float64) func() float64 { + return func() float64 { + return c.GetProperty(key, defaultVal)().(float64) + } +} + +// GetDurationProperty gets property and asserts that it's a duration +func (c *Collection) GetDurationProperty(key Key, defaultVal time.Duration) func() time.Duration { + return func() time.Duration { + return c.GetProperty(key, defaultVal)().(time.Duration) + } +} + +// GetBoolProperty gets property and asserts that it's an bool +func (c *Collection) GetBoolProperty(key Key, defaultVal bool) func() bool { + return func() bool { + return c.GetProperty(key, defaultVal)().(bool) + } +} diff --git a/common/service/dynamicconfig/config_benchmark_test.go b/common/service/dynamicconfig/config_benchmark_test.go new file mode 100644 index 00000000000..7d7089e4c55 --- /dev/null +++ b/common/service/dynamicconfig/config_benchmark_test.go @@ -0,0 +1,37 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dynamicconfig + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func BenchmarkGetIntProperty(b *testing.B) { + client := newInMemoryClient() + cln := NewCollection(client) + key := MaxTaskBatchSize + for i := 0; i < b.N; i++ { + size := cln.GetIntProperty(key, 10) + assert.Equal(b, 10, size()) + } +} diff --git a/common/service/dynamicconfig/config_test.go b/common/service/dynamicconfig/config_test.go new file mode 100644 index 00000000000..768818567ba --- /dev/null +++ b/common/service/dynamicconfig/config_test.go @@ -0,0 +1,96 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dynamicconfig + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +type inMemoryClient struct { + values map[Key]interface{} + sync.RWMutex +} + +func newInMemoryClient() *inMemoryClient { + return &inMemoryClient{values: make(map[Key]interface{})} +} + +func (mc *inMemoryClient) SetValue(key Key, value interface{}) { + mc.Lock() + defer mc.Unlock() + mc.values[key] = value +} + +func (mc *inMemoryClient) GetValue(key Key) (interface{}, error) { + mc.RLock() + defer mc.RUnlock() + if val, ok := mc.values[key]; ok { + return val, nil + } + return nil, errors.New("unable to find key") +} + +func (mc *inMemoryClient) GetValueWithFilters( + name Key, filters map[Filter]interface{}, +) (interface{}, error) { + return mc.GetValue(name) +} + +type configSuite struct { + suite.Suite + client *inMemoryClient + cln *Collection +} + +func TestConfigSuite(t *testing.T) { + s := new(configSuite) + suite.Run(t, s) +} + +func (s *configSuite) SetupSuite() { + s.client = newInMemoryClient() + s.cln = NewCollection(s.client) +} + +func (s *configSuite) TestGetIntProperty() { + key := MaxTaskBatchSize + size := s.cln.GetIntProperty(key, 10) + s.Equal(10, size()) + s.client.SetValue(key, 50) + s.Equal(50, size()) + s.client.SetValue(key, "hello world") + s.Panics(func() { size() }, "Should panic") +} + +func (s *configSuite) TestGetDurationProperty() { + key := MatchingLongPollExpirationInterval + interval := s.cln.GetDurationProperty(key, time.Second) + s.Equal(time.Second, interval()) + s.client.SetValue(key, time.Minute) + s.Equal(time.Minute, interval()) + s.client.SetValue(key, 10) + s.Panics(func() { interval() }, "Should panic") +} diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go new file mode 100644 index 00000000000..bb5609aadec --- /dev/null +++ b/common/service/dynamicconfig/constants.go @@ -0,0 +1,83 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dynamicconfig + +// Key represents a key/property stored in dynamic config +type Key int + +func (k Key) String() string { + if k <= unknownKey || int(k) >= len(keys) { + return keys[unknownKey] + } + return keys[k] +} + +const ( + _matchingRoot = "matching." + _matchingTaskListRoot = _matchingRoot + "taskList." + _historyRoot = "history." +) + +var keys = []string{ + "unknownKey", + _matchingTaskListRoot + "MinTaskThrottlingBurstSize", + _matchingTaskListRoot + "MaxTaskBatchSize", + _matchingTaskListRoot + "LongPollExpirationInterval", + _historyRoot + "LongPollExpirationInterval", +} + +const ( + // The order of constants is important. It should match the order in the keys array above. + unknownKey Key = iota + // Matching keys + + // MinTaskThrottlingBurstSize is the minimum burst size for task list throttling + MinTaskThrottlingBurstSize + // MaxTaskBatchSize is the maximum batch size to fetch from the task buffer + MaxTaskBatchSize + // MatchingLongPollExpirationInterval is the long poll expiration interval in the matching service + MatchingLongPollExpirationInterval + // HistoryLongPollExpirationInterval is the long poll expiration interval in the history service + HistoryLongPollExpirationInterval +) + +// Filter represents a filter on the dynamic config key +type Filter int + +func (k Filter) String() string { + keys := []string{ + "unknownFilter", + "domainName", + "taskListName", + } + if k <= unknownFilter || k > TaskListName { + return keys[unknownFilter] + } + return keys[k] +} + +const ( + unknownFilter Filter = iota + // DomainName is the domain name + DomainName + // TaskListName is the tasklist name + TaskListName +) diff --git a/common/service/service.go b/common/service/service.go index e92291fa3eb..60338b8da40 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -33,6 +33,7 @@ import ( "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/service/config" + "github.com/uber/cadence/common/service/dynamicconfig" "github.com/uber-common/bark" "github.com/uber-go/tally" @@ -61,6 +62,7 @@ type ( ClusterMetadata cluster.Metadata ReplicatorConfig config.Replicator MessagingClient messaging.Client + DynamicConfig dynamicconfig.Client } // RingpopFactory provides a bootstrapped ringpop @@ -87,6 +89,7 @@ type ( runtimeMetricsReporter *metrics.RuntimeMetricsReporter metricsClient metrics.Client clusterMetadata cluster.Metadata + dynamicCollection *dynamicconfig.Collection } ) @@ -102,6 +105,7 @@ func New(params *BootstrapParams) Service { metricsScope: params.MetricScope, numberOfHistoryShards: params.CassandraConfig.NumHistoryShards, clusterMetadata: params.ClusterMetadata, + dynamicCollection: dynamicconfig.NewCollection(params.DynamicConfig), } sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(params.MetricScope, time.Minute, sVice.logger) sVice.metricsClient = metrics.NewClient(params.MetricScope, getMetricsServiceIdx(params.Name, params.Logger)) diff --git a/host/onebox.go b/host/onebox.go index c04ed77705c..33322c6cb3d 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/service/config" + "github.com/uber/cadence/common/service/dynamicconfig" "github.com/uber/cadence/service/frontend" "github.com/uber/cadence/service/history" "github.com/uber/cadence/service/matching" @@ -224,7 +225,7 @@ func (c *cadenceImpl) startHistory(logger bark.Logger, shardMgr persistence.Shar params.ClusterMetadata = c.clusterMetadata params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards service := service.New(params) - historyConfig := history.NewConfig(c.numberOfHistoryShards) + historyConfig := history.NewConfig(dynamicconfig.NewNopCollection(), c.numberOfHistoryShards) historyConfig.HistoryMgrNumConns = c.numberOfHistoryShards historyConfig.ExecutionMgrNumConns = c.numberOfHistoryShards handler := history.NewHandler(service, historyConfig, shardMgr, metadataMgr, @@ -250,7 +251,9 @@ func (c *cadenceImpl) startMatching(logger bark.Logger, taskMgr persistence.Task params.ClusterMetadata = c.clusterMetadata params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards service := service.New(params) - c.matchingHandler = matching.NewHandler(service, matching.NewConfig(), taskMgr) + c.matchingHandler = matching.NewHandler( + service, matching.NewConfig(dynamicconfig.NewNopCollection()), taskMgr, + ) c.matchingHandler.Start() startWG.Done() <-c.shutdownCh diff --git a/service/frontend/service.go b/service/frontend/service.go index 05c1f256ec9..b8a122198bd 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -54,10 +54,10 @@ type Service struct { } // NewService builds a new cadence-frontend service -func NewService(params *service.BootstrapParams, config *Config) common.Daemon { +func NewService(params *service.BootstrapParams) common.Daemon { return &Service{ params: params, - config: config, + config: NewConfig(), stopC: make(chan struct{}), } } diff --git a/service/history/historyBuilder_test.go b/service/history/historyBuilder_test.go index c547b90efb9..1f8460e596f 100644 --- a/service/history/historyBuilder_test.go +++ b/service/history/historyBuilder_test.go @@ -33,6 +33,7 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" ) type ( @@ -58,7 +59,7 @@ func (s *historyBuilderSuite) SetupTest() { // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil s.Assertions = require.New(s.T()) s.domainID = "history-builder-test-domain" - s.msBuilder = newMutableStateBuilder(NewConfig(1), s.logger) + s.msBuilder = newMutableStateBuilder(NewConfig(dynamicconfig.NewNopCollection(), 1), s.logger) s.builder = newHistoryBuilder(s.msBuilder, s.logger) } diff --git a/service/history/historyCache_test.go b/service/history/historyCache_test.go index f957906f8ac..499571ceef5 100644 --- a/service/history/historyCache_test.go +++ b/service/history/historyCache_test.go @@ -36,6 +36,7 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" ) type ( @@ -68,7 +69,7 @@ func (s *historyCacheSuite) SetupTest() { shardManager: &mocks.ShardManager{}, maxTransferSequenceNumber: 100000, closeCh: make(chan int, 100), - config: NewConfig(1), + config: NewConfig(dynamicconfig.NewNopCollection(), 1), logger: s.logger, metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 9e73f98c1c6..4ae0ad2d7b3 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -406,7 +406,7 @@ func (e *historyEngineImpl) GetMutableState(ctx context.Context, return response, nil } - timer := time.NewTimer(e.shard.GetConfig().LongPollExpirationInterval) + timer := time.NewTimer(e.shard.GetConfig().LongPollExpirationInterval()) defer timer.Stop() for { select { diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 6b6c2cd3ffc..d23f0b81ac5 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -43,6 +43,7 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" ) type ( @@ -79,7 +80,7 @@ func (s *engine2Suite) SetupSuite() { l := log.New() l.Level = log.DebugLevel s.logger = bark.NewLoggerFromLogrus(l) - s.config = NewConfig(1) + s.config = NewConfig(dynamicconfig.NewNopCollection(), 1) } func (s *engine2Suite) TearDownSuite() { diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 0534df134ef..25729f2f77b 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -28,13 +28,6 @@ import ( "testing" "time" - "github.com/pborman/uuid" - log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "github.com/uber-common/bark" - "github.com/uber-go/tally" "github.com/uber/cadence/.gen/go/history" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" @@ -42,6 +35,15 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" + + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" + "github.com/uber-go/tally" ) type ( @@ -77,7 +79,7 @@ func (s *engineSuite) SetupSuite() { } s.logger = bark.NewLoggerFromLogrus(log.New()) - s.config = NewConfig(1) + s.config = NewConfig(dynamicconfig.NewNopCollection(), 1) } func (s *engineSuite) TearDownSuite() { diff --git a/service/history/historyTestBase.go b/service/history/historyTestBase.go index 64c4ec3e741..390b3c39356 100644 --- a/service/history/historyTestBase.go +++ b/service/history/historyTestBase.go @@ -34,6 +34,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" ) const ( @@ -201,7 +202,7 @@ func (s *TestShardContext) GetTimeSource() common.TimeSource { func (s *TestBase) SetupWorkflowStoreWithOptions(options persistence.TestBaseOptions) { s.TestBase.SetupWorkflowStoreWithOptions(options) log := bark.NewLoggerFromLogrus(log.New()) - config := NewConfig(1) + config := NewConfig(dynamicconfig.NewNopCollection(), 1) domainCache := cache.NewDomainCache(s.MetadataManager, log) s.ShardContext = newTestShardContext(s.ShardInfo, 0, s.HistoryMgr, s.WorkflowMgr, domainCache, config, log) s.TestBase.TaskIDGenerator = s.ShardContext @@ -211,7 +212,7 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options persistence.TestBaseOpt func (s *TestBase) SetupWorkflowStore() { s.TestBase.SetupWorkflowStore() log := bark.NewLoggerFromLogrus(log.New()) - config := NewConfig(1) + config := NewConfig(dynamicconfig.NewNopCollection(), 1) domainCache := cache.NewDomainCache(s.MetadataManager, log) s.ShardContext = newTestShardContext(s.ShardInfo, 0, s.HistoryMgr, s.WorkflowMgr, domainCache, config, log) s.TestBase.TaskIDGenerator = s.ShardContext diff --git a/service/history/service.go b/service/history/service.go index eb4eb643d7f..085601cd3c3 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -27,6 +27,7 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/service/dynamicconfig" ) // Config represents configuration for cadence-history service @@ -69,11 +70,11 @@ type Config struct { // Time to hold a poll request before returning an empty response // right now only used by GetMutableState - LongPollExpirationInterval time.Duration + LongPollExpirationInterval func() time.Duration } // NewConfig returns new service config with default values -func NewConfig(numberOfShards int) *Config { +func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { return &Config{ NumberOfShards: numberOfShards, HistoryCacheInitialSize: 128, @@ -99,7 +100,9 @@ func NewConfig(numberOfShards int) *Config { ExecutionMgrNumConns: 100, HistoryMgrNumConns: 100, // history client: client/history/client.go set the client timeout 30s - LongPollExpirationInterval: time.Second * 20, + LongPollExpirationInterval: dc.GetDurationProperty( + dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20, + ), } } @@ -117,11 +120,13 @@ type Service struct { } // NewService builds a new cadence-history service -func NewService(params *service.BootstrapParams, config *Config) common.Daemon { +func NewService(params *service.BootstrapParams) common.Daemon { return &Service{ params: params, stopC: make(chan struct{}), - config: config, + config: NewConfig( + dynamicconfig.NewCollection(params.DynamicConfig), params.CassandraConfig.NumHistoryShards, + ), } } diff --git a/service/history/shardController_test.go b/service/history/shardController_test.go index bd3cd59c04f..2dfbacbf6fa 100644 --- a/service/history/shardController_test.go +++ b/service/history/shardController_test.go @@ -39,6 +39,7 @@ import ( "github.com/uber/cadence/common/metrics" mmocks "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" ) type ( @@ -65,7 +66,7 @@ func TestShardControllerSuite(t *testing.T) { func (s *shardControllerSuite) SetupTest() { s.logger = bark.NewLoggerFromLogrus(log.New()) - s.config = NewConfig(1) + s.config = NewConfig(dynamicconfig.NewNopCollection(), 1) s.metricsClient = metrics.NewClient(tally.NoopScope, metrics.History) s.hostInfo = membership.NewHostInfo("shardController-host-test", nil) s.mockShardManager = &mmocks.ShardManager{} diff --git a/service/history/timerBuilder_test.go b/service/history/timerBuilder_test.go index 981965c15d4..13780fecca8 100644 --- a/service/history/timerBuilder_test.go +++ b/service/history/timerBuilder_test.go @@ -27,6 +27,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" "encoding/hex" @@ -68,7 +69,7 @@ func (s *timerBuilderProcessorSuite) SetupSuite() { logger := log.New() //logger.Level = log.DebugLevel s.logger = bark.NewLoggerFromLogrus(logger) - s.config = NewConfig(1) + s.config = NewConfig(dynamicconfig.NewNopCollection(), 1) s.tb = newTimerBuilder(s.config, s.logger, &mockTimeSource{currTime: time.Now()}) } diff --git a/service/history/timerQueueProcessor2_test.go b/service/history/timerQueueProcessor2_test.go index 6320538550f..e079c55473b 100644 --- a/service/history/timerQueueProcessor2_test.go +++ b/service/history/timerQueueProcessor2_test.go @@ -31,6 +31,7 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" "github.com/pborman/uuid" log "github.com/sirupsen/logrus" @@ -73,7 +74,7 @@ func (s *timerQueueProcessor2Suite) SetupSuite() { log2.Level = log.DebugLevel s.logger = bark.NewLoggerFromLogrus(log2) - s.config = NewConfig(1) + s.config = NewConfig(dynamicconfig.NewNopCollection(), 1) } func (s *timerQueueProcessor2Suite) SetupTest() { diff --git a/service/matching/handler.go b/service/matching/handler.go index 02962c81305..669c349f92e 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -68,7 +68,9 @@ func (h *Handler) Start() error { return err } h.metricsClient = h.Service.GetMetricsClient() - h.engine = NewEngine(h.taskPersistence, history, h.config, h.Service.GetLogger(), h.Service.GetMetricsClient()) + h.engine = NewEngine( + h.taskPersistence, history, h.config, h.Service.GetLogger(), h.Service.GetMetricsClient(), + ) h.startWG.Done() return nil } diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 94329156d4c..88e4479d9cd 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -26,8 +26,6 @@ import ( "math" "sync" - "github.com/pborman/uuid" - "github.com/uber-common/bark" h "github.com/uber/cadence/.gen/go/history" m "github.com/uber/cadence/.gen/go/matching" workflow "github.com/uber/cadence/.gen/go/shared" @@ -37,6 +35,9 @@ import ( "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" + + "github.com/pborman/uuid" + "github.com/uber-common/bark" ) // Implements matching.Engine @@ -103,7 +104,8 @@ func NewEngine(taskManager persistence.TaskManager, historyService history.Client, config *Config, logger bark.Logger, - metricsClient metrics.Client) Engine { + metricsClient metrics.Client, +) Engine { return &matchingEngineImpl{ taskManager: taskManager, diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 35b5eab7d6b..3ae883701aa 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -31,22 +31,23 @@ import ( "testing" "time" - "github.com/davecgh/go-spew/spew" - "github.com/emirpasic/gods/maps/treemap" - log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - "github.com/uber-common/bark" - "github.com/uber/cadence/client/history" - - "github.com/uber-go/tally" gohistory "github.com/uber/cadence/.gen/go/history" "github.com/uber/cadence/.gen/go/matching" workflow "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/client/history" "github.com/uber/cadence/common" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" + + "github.com/davecgh/go-spew/spew" + "github.com/emirpasic/gods/maps/treemap" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" + "github.com/uber-go/tally" ) type ( @@ -195,7 +196,7 @@ func (s *matchingEngineSuite) TestPollForDecisionTasksEmptyResult() { func (s *matchingEngineSuite) PollForTasksEmptyResultTest(taskType int) { s.matchingEngine.config.RangeSize = 2 // to test that range is not updated without tasks - s.matchingEngine.config.LongPollExpirationInterval = 10 * time.Millisecond + s.matchingEngine.config.LongPollExpirationInterval = func(string) time.Duration { return 10 * time.Millisecond } domainID := "domainId" tl := "makeToast" @@ -354,7 +355,7 @@ func (s *matchingEngineSuite) TestTaskWriterShutdown() { } func (s *matchingEngineSuite) TestAddThenConsumeActivities() { - s.matchingEngine.config.LongPollExpirationInterval = 10 * time.Millisecond + s.matchingEngine.config.LongPollExpirationInterval = func(string) time.Duration { return 10 * time.Millisecond } runID := "run1" workflowID := "workflow1" @@ -468,7 +469,7 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() { func (s *matchingEngineSuite) TestSyncMatchActivities() { // Set a short long poll expiration so we don't have to wait too long for 0 throttling cases - s.matchingEngine.config.LongPollExpirationInterval = 50 * time.Millisecond + s.matchingEngine.config.LongPollExpirationInterval = func(string) time.Duration { return 50 * time.Millisecond } runID := "run1" workflowID := "workflow1" @@ -491,7 +492,7 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { dispatchTTL := time.Nanosecond dPtr := _defaultTaskDispatchRPS mgr := newTaskListManagerWithRateLimiter( - s.matchingEngine, tlID, tlKind, s.matchingEngine.config, + s.matchingEngine, tlID, tlKind, newTaskListConfig(tlID, s.matchingEngine.config), newRateLimiter(&dPtr, dispatchTTL, _minBurst), ) s.matchingEngine.updateTaskList(tlID, mgr) @@ -614,7 +615,7 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivities() { func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivitiesWithZeroDispatch() { // Set a short long poll expiration so we don't have to wait too long for 0 throttling cases - s.matchingEngine.config.LongPollExpirationInterval = 20 * time.Millisecond + s.matchingEngine.config.LongPollExpirationInterval = func(string) time.Duration { return 20 * time.Millisecond } dispatchLimitFn := func(wc int, tc int64) float64 { if tc%50 == 0 && wc%5 == 0 { // Gets triggered atleast 20 times return 0 @@ -650,7 +651,7 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities( s.matchingEngine.config.RangeSize = rangeSize // override to low number for the test dPtr := _defaultTaskDispatchRPS mgr := newTaskListManagerWithRateLimiter( - s.matchingEngine, tlID, tlKind, s.matchingEngine.config, + s.matchingEngine, tlID, tlKind, newTaskListConfig(tlID, s.matchingEngine.config), newRateLimiter(&dPtr, dispatchTTL, _minBurst), ) s.matchingEngine.updateTaskList(tlID, mgr) @@ -1656,7 +1657,7 @@ func validateTimeRange(t time.Time, expectedDuration time.Duration) bool { } func defaultTestConfig() *Config { - config := NewConfig() - config.LongPollExpirationInterval = 100 * time.Millisecond + config := NewConfig(dynamicconfig.NewNopCollection()) + config.LongPollExpirationInterval = func(string) time.Duration { return 100 * time.Millisecond } return config } diff --git a/service/matching/service.go b/service/matching/service.go index f0aee4d8bc2..b79428bbda0 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -26,20 +26,21 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/service/dynamicconfig" ) // Config represents configuration for cadence-matching service type Config struct { EnableSyncMatch bool - // Time to hold a poll request before returning an empty response if there are no tasks - LongPollExpirationInterval time.Duration // taskListManager configuration - RangeSize int64 - GetTasksBatchSize int - UpdateAckInterval time.Duration - IdleTasklistCheckInterval time.Duration - MinTaskThrottlingBurstSize int + RangeSize int64 + GetTasksBatchSize int + UpdateAckInterval time.Duration + IdleTasklistCheckInterval time.Duration + // Time to hold a poll request before returning an empty response if there are no tasks + LongPollExpirationInterval func(string) time.Duration + MinTaskThrottlingBurstSize func(string) int // taskWriter configuration OutstandingTaskAppendsThreshold int @@ -47,17 +48,21 @@ type Config struct { } // NewConfig returns new service config with default values -func NewConfig() *Config { +func NewConfig(dc *dynamicconfig.Collection) *Config { return &Config{ - EnableSyncMatch: true, - LongPollExpirationInterval: time.Minute, - RangeSize: 100000, - GetTasksBatchSize: 1000, - UpdateAckInterval: 10 * time.Second, - IdleTasklistCheckInterval: 5 * time.Minute, + EnableSyncMatch: true, + RangeSize: 100000, + GetTasksBatchSize: 1000, + UpdateAckInterval: 10 * time.Second, + IdleTasklistCheckInterval: 5 * time.Minute, + LongPollExpirationInterval: dc.GetDurationPropertyWithTaskList( + dynamicconfig.MatchingLongPollExpirationInterval, time.Minute, + ), + MinTaskThrottlingBurstSize: dc.GetIntPropertyWithTaskList( + dynamicconfig.MinTaskThrottlingBurstSize, 1, + ), OutstandingTaskAppendsThreshold: 250, MaxTaskBatchSize: 100, - MinTaskThrottlingBurstSize: 1, } } @@ -69,10 +74,10 @@ type Service struct { } // NewService builds a new cadence-matching service -func NewService(params *service.BootstrapParams, config *Config) common.Daemon { +func NewService(params *service.BootstrapParams) common.Daemon { return &Service{ params: params, - config: config, + config: NewConfig(dynamicconfig.NewCollection(params.DynamicConfig)), stopC: make(chan struct{}), } } diff --git a/service/matching/taskListManager.go b/service/matching/taskListManager.go index 05b9cd0413c..c1a3997df16 100644 --- a/service/matching/taskListManager.go +++ b/service/matching/taskListManager.go @@ -65,6 +65,38 @@ type taskListManager interface { String() string } +type taskListConfig struct { + EnableSyncMatch bool + // Time to hold a poll request before returning an empty response if there are no tasks + LongPollExpirationInterval func() time.Duration + RangeSize int64 + GetTasksBatchSize int + UpdateAckInterval time.Duration + IdleTasklistCheckInterval time.Duration + MinTaskThrottlingBurstSize func() int + + // taskWriter configuration + OutstandingTaskAppendsThreshold int + MaxTaskBatchSize int +} + +func newTaskListConfig(id *taskListID, config *Config) *taskListConfig { + taskListName := id.taskListName + return &taskListConfig{ + RangeSize: config.RangeSize, + GetTasksBatchSize: config.GetTasksBatchSize, + UpdateAckInterval: config.UpdateAckInterval, + IdleTasklistCheckInterval: config.IdleTasklistCheckInterval, + MinTaskThrottlingBurstSize: func() int { + return config.MinTaskThrottlingBurstSize(taskListName) + }, + EnableSyncMatch: config.EnableSyncMatch, + LongPollExpirationInterval: func() time.Duration { + return config.LongPollExpirationInterval(taskListName) + }, + } +} + type rateLimiter struct { sync.RWMutex maxDispatchPerSecond *float64 @@ -138,12 +170,17 @@ func newTaskListManager( e *matchingEngineImpl, taskList *taskListID, taskListKind *s.TaskListKind, config *Config, ) taskListManager { dPtr := _defaultTaskDispatchRPS - rl := newRateLimiter(&dPtr, _defaultTaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize) - return newTaskListManagerWithRateLimiter(e, taskList, taskListKind, config, rl) + rl := newRateLimiter( + &dPtr, _defaultTaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize(taskList.taskListName), + ) + return newTaskListManagerWithRateLimiter( + e, taskList, taskListKind, newTaskListConfig(taskList, config), rl, + ) } func newTaskListManagerWithRateLimiter( - e *matchingEngineImpl, taskList *taskListID, taskListKind *s.TaskListKind, config *Config, rl rateLimiter, + e *matchingEngineImpl, taskList *taskListID, taskListKind *s.TaskListKind, config *taskListConfig, + rl rateLimiter, ) taskListManager { // To perform one db operation if there are no pollers taskBufferSize := config.GetTasksBatchSize - 1 @@ -196,7 +233,7 @@ type taskListManagerImpl struct { logger bark.Logger metricsClient metrics.Client engine *matchingEngineImpl - config *Config + config *taskListConfig // pollerHistory stores poller which poll from this tasklist in last few minutes pollerHistory *pollerHistory @@ -436,7 +473,7 @@ func (c *taskListManagerImpl) completeTaskPoll(taskID int64) (ackLevel int64) { // Loads task from taskBuffer (which is populated from persistence) or from sync match to add task call func (c *taskListManagerImpl) getTask(ctx context.Context) (*getTaskResult, error) { scope := metrics.MatchingTaskListMgrScope - timer := time.NewTimer(c.config.LongPollExpirationInterval) + timer := time.NewTimer(c.config.LongPollExpirationInterval()) defer timer.Stop() pollerID, ok := ctx.Value(pollerIDKey).(string) @@ -574,8 +611,8 @@ func (c *taskListManagerImpl) updateRangeIfNeededLocked(e *matchingEngineImpl) e tli := resp.TaskListInfo c.rangeID = tli.RangeID // Starts from 1 c.taskAckManager.setAckLevel(tli.AckLevel) - c.taskSequenceNumber = (tli.RangeID-1)*e.config.RangeSize + 1 - c.nextRangeSequenceNumber = (tli.RangeID)*e.config.RangeSize + 1 + c.taskSequenceNumber = (tli.RangeID-1)*c.config.RangeSize + 1 + c.nextRangeSequenceNumber = (tli.RangeID)*c.config.RangeSize + 1 c.logger.Debugf("updateRangeLocked rangeID=%v, c.taskSequenceNumber=%v, c.nextRangeSequenceNumber=%v", c.rangeID, c.taskSequenceNumber, c.nextRangeSequenceNumber) return nil diff --git a/service/matching/taskWriter.go b/service/matching/taskWriter.go index ea8c16b9a75..ef3dbc76de6 100644 --- a/service/matching/taskWriter.go +++ b/service/matching/taskWriter.go @@ -47,7 +47,7 @@ type ( // taskWriter writes tasks sequentially to persistence taskWriter struct { tlMgr *taskListManagerImpl - config *Config + config *taskListConfig taskListID *taskListID taskManager persistence.TaskManager appendCh chan *writeTaskRequest diff --git a/service/worker/service.go b/service/worker/service.go index 8a3f58ed2fb..d5303836b9a 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -45,10 +45,10 @@ type ( ) // NewService builds a new cadence-worker service -func NewService(params *service.BootstrapParams, config *Config) common.Daemon { +func NewService(params *service.BootstrapParams) common.Daemon { return &Service{ params: params, - config: config, + config: NewConfig(), stopC: make(chan struct{}), } }