From 3f1420984d90426eb71c3125252502c7bc0c531f Mon Sep 17 00:00:00 2001 From: Madhu Ravichandran Date: Mon, 5 Mar 2018 12:19:32 -0800 Subject: [PATCH] Create type functions and filter options for dynamic config (#587) --- common/service/dynamicconfig/client.go | 97 +++++++++++++ common/service/dynamicconfig/config.go | 128 ++++++++---------- .../dynamicconfig/config_benchmark_test.go | 6 +- common/service/dynamicconfig/config_test.go | 74 ++++++---- common/service/dynamicconfig/constants.go | 67 ++++++--- common/service/service.go | 2 +- host/onebox.go | 1 + service/history/service.go | 5 +- service/matching/matchingEngine_test.go | 12 +- service/matching/service.go | 38 ++++-- service/matching/taskListManager.go | 55 +++++--- 11 files changed, 325 insertions(+), 160 deletions(-) create mode 100644 common/service/dynamicconfig/client.go diff --git a/common/service/dynamicconfig/client.go b/common/service/dynamicconfig/client.go new file mode 100644 index 00000000000..a6ea1a896ed --- /dev/null +++ b/common/service/dynamicconfig/client.go @@ -0,0 +1,97 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package dynamicconfig + +import ( + "errors" + "time" + + "github.com/sirupsen/logrus" + "github.com/uber-common/bark" +) + +// Client allows fetching values from a dynamic configuration system NOTE: This does not have async +// options right now. In the interest of keeping it minimal, we can add when requirement arises. +type Client interface { + GetValue(name Key, defaultValue interface{}) (interface{}, error) + GetValueWithFilters(name Key, filters map[Filter]interface{}, defaultValue interface{}) (interface{}, error) + + GetIntValue(name Key, filters map[Filter]interface{}, defaultValue int) (int, error) + GetFloatValue(name Key, filters map[Filter]interface{}, defaultValue float64) (float64, error) + GetBoolValue(name Key, filters map[Filter]interface{}, defaultValue bool) (bool, error) + GetStringValue(name Key, filters map[Filter]interface{}, defaultValue string) (string, error) + GetMapValue( + name Key, filters map[Filter]interface{}, defaultValue map[string]interface{}, + ) (map[string]interface{}, error) + GetDurationValue( + name Key, filters map[Filter]interface{}, defaultValue time.Duration, + ) (time.Duration, error) +} + +type nopClient struct{} + +func (mc *nopClient) GetValue(name Key, defaultValue interface{}) (interface{}, error) { + return nil, errors.New("unable to find key") +} + +func (mc *nopClient) GetValueWithFilters( + name Key, filters map[Filter]interface{}, defaultValue interface{}, +) (interface{}, error) { + return nil, errors.New("unable to find key") +} + +func (mc *nopClient) GetIntValue(name Key, filters map[Filter]interface{}, defaultValue int) (int, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *nopClient) GetFloatValue(name Key, filters map[Filter]interface{}, defaultValue float64) (float64, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *nopClient) GetBoolValue(name Key, filters map[Filter]interface{}, defaultValue bool) (bool, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *nopClient) GetStringValue(name Key, filters map[Filter]interface{}, defaultValue string) (string, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *nopClient) GetMapValue( + name Key, filters map[Filter]interface{}, defaultValue map[string]interface{}, +) (map[string]interface{}, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *nopClient) GetDurationValue( + name Key, filters map[Filter]interface{}, defaultValue time.Duration, +) (time.Duration, error) { + return defaultValue, errors.New("unable to find key") +} + +// NewNopClient creates a nop client +func NewNopClient() Client { + return &nopClient{} +} + +// NewNopCollection creates a new nop collection +func NewNopCollection() *Collection { + return NewCollection(&nopClient{}, bark.NewLoggerFromLogrus(logrus.New())) +} diff --git a/common/service/dynamicconfig/config.go b/common/service/dynamicconfig/config.go index a362b9cb0bf..17b2b73bf76 100644 --- a/common/service/dynamicconfig/config.go +++ b/common/service/dynamicconfig/config.go @@ -21,37 +21,14 @@ package dynamicconfig import ( - "errors" "time" -) - -// Client allows fetching values from a dynamic configuration system NOTE: This does not have async -// options right now. In the interest of keeping it minimal, we can add when requirement arises. -type Client interface { - GetValue(name Key) (interface{}, error) - GetValueWithFilters(name Key, filters map[Filter]interface{}) (interface{}, error) -} -type nopClient struct{} - -func (mc *nopClient) GetValue(name Key) (interface{}, error) { - return nil, errors.New("unable to find key") -} - -func (mc *nopClient) GetValueWithFilters( - name Key, filters map[Filter]interface{}, -) (interface{}, error) { - return nil, errors.New("unable to find key") -} - -// NewNopCollection creates a new nop collection -func NewNopCollection() *Collection { - return NewCollection(&nopClient{}) -} + "github.com/uber-common/bark" +) // NewCollection creates a new collection -func NewCollection(client Client) *Collection { - return &Collection{client} +func NewCollection(client Client, logger bark.Logger) *Collection { + return &Collection{client, logger} } // Collection wraps dynamic config client with a closure so that across the code, the config values @@ -59,73 +36,88 @@ func NewCollection(client Client) *Collection { // code type Collection struct { client Client + logger bark.Logger } -// GetIntPropertyWithTaskList gets property with taskList filter and asserts that it's an integer -func (c *Collection) GetIntPropertyWithTaskList(key Key, defaultVal int) func(string) int { - return func(taskList string) int { - return c.getPropertyWithStringFilter(key, defaultVal, TaskListName)(taskList).(int) - } +func (c *Collection) logNoValue(key Key, err error) { + c.logger.Debugf("Failed to fetch key: %s from dynamic config with err: %s", key.String(), err.Error()) } -// GetDurationPropertyWithTaskList gets property with taskList filter and asserts that it's time.Duration -func (c *Collection) GetDurationPropertyWithTaskList( - key Key, defaultVal time.Duration, -) func(string) time.Duration { - return func(taskList string) time.Duration { - return c.getPropertyWithStringFilter(key, defaultVal, TaskListName)(taskList).(time.Duration) - } -} +// PropertyFn is a wrapper to get property from dynamic config +type PropertyFn func() interface{} + +// IntPropertyFn is a wrapper to get int property from dynamic config +type IntPropertyFn func(opts ...FilterOption) int + +// FloatPropertyFn is a wrapper to get float property from dynamic config +type FloatPropertyFn func(opts ...FilterOption) float64 + +// DurationPropertyFn is a wrapper to get duration property from dynamic config +type DurationPropertyFn func(opts ...FilterOption) time.Duration + +// BoolPropertyFn is a wrapper to get bool property from dynamic config +type BoolPropertyFn func(opts ...FilterOption) bool -func (c *Collection) getPropertyWithStringFilter( - key Key, defaultVal interface{}, filter Filter, -) func(string) interface{} { - return func(filterVal string) interface{} { - filters := make(map[Filter]interface{}) - filters[filter] = filterVal - val, err := c.client.GetValueWithFilters(key, filters) +// GetProperty gets a eface property and returns defaultValue if property is not found +func (c *Collection) GetProperty(key Key, defaultValue interface{}) PropertyFn { + return func() interface{} { + val, err := c.client.GetValue(key, defaultValue) if err != nil { - return defaultVal + c.logNoValue(key, err) } return val } } -// GetProperty gets a eface property and returns defaultVal if property is not found -func (c *Collection) GetProperty(key Key, defaultVal interface{}) func() interface{} { - return func() interface{} { - val, err := c.client.GetValue(key) - if err != nil { - return defaultVal - } - return val +func getFilterMap(opts ...FilterOption) map[Filter]interface{} { + l := len(opts) + m := make(map[Filter]interface{}, l) + for _, opt := range opts { + opt(m) } + return m } // GetIntProperty gets property and asserts that it's an integer -func (c *Collection) GetIntProperty(key Key, defaultVal int) func() int { - return func() int { - return c.GetProperty(key, defaultVal)().(int) +func (c *Collection) GetIntProperty(key Key, defaultValue int) IntPropertyFn { + return func(opts ...FilterOption) int { + val, err := c.client.GetIntValue(key, getFilterMap(opts...), defaultValue) + if err != nil { + c.logNoValue(key, err) + } + return val } } // GetFloat64Property gets property and asserts that it's a float64 -func (c *Collection) GetFloat64Property(key Key, defaultVal float64) func() float64 { - return func() float64 { - return c.GetProperty(key, defaultVal)().(float64) +func (c *Collection) GetFloat64Property(key Key, defaultValue float64) FloatPropertyFn { + return func(opts ...FilterOption) float64 { + val, err := c.client.GetFloatValue(key, getFilterMap(opts...), defaultValue) + if err != nil { + c.logNoValue(key, err) + } + return val } } // GetDurationProperty gets property and asserts that it's a duration -func (c *Collection) GetDurationProperty(key Key, defaultVal time.Duration) func() time.Duration { - return func() time.Duration { - return c.GetProperty(key, defaultVal)().(time.Duration) +func (c *Collection) GetDurationProperty(key Key, defaultValue time.Duration) DurationPropertyFn { + return func(opts ...FilterOption) time.Duration { + val, err := c.client.GetDurationValue(key, getFilterMap(opts...), defaultValue) + if err != nil { + c.logNoValue(key, err) + } + return val } } // GetBoolProperty gets property and asserts that it's an bool -func (c *Collection) GetBoolProperty(key Key, defaultVal bool) func() bool { - return func() bool { - return c.GetProperty(key, defaultVal)().(bool) +func (c *Collection) GetBoolProperty(key Key, defaultValue bool) BoolPropertyFn { + return func(opts ...FilterOption) bool { + val, err := c.client.GetBoolValue(key, getFilterMap(opts...), defaultValue) + if err != nil { + c.logNoValue(key, err) + } + return val } } diff --git a/common/service/dynamicconfig/config_benchmark_test.go b/common/service/dynamicconfig/config_benchmark_test.go index 7d7089e4c55..5c4a3b7259b 100644 --- a/common/service/dynamicconfig/config_benchmark_test.go +++ b/common/service/dynamicconfig/config_benchmark_test.go @@ -23,13 +23,15 @@ package dynamicconfig import ( "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/uber-common/bark" ) func BenchmarkGetIntProperty(b *testing.B) { client := newInMemoryClient() - cln := NewCollection(client) - key := MaxTaskBatchSize + cln := NewCollection(client, bark.NewLoggerFromLogrus(logrus.New())) + key := MatchingMaxTaskBatchSize for i := 0; i < b.N; i++ { size := cln.GetIntProperty(key, 10) assert.Equal(b, 10, size()) diff --git a/common/service/dynamicconfig/config_test.go b/common/service/dynamicconfig/config_test.go index 768818567ba..86ef99ec191 100644 --- a/common/service/dynamicconfig/config_test.go +++ b/common/service/dynamicconfig/config_test.go @@ -22,41 +22,71 @@ package dynamicconfig import ( "errors" - "sync" + "sync/atomic" "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" ) type inMemoryClient struct { - values map[Key]interface{} - sync.RWMutex + globalValues atomic.Value } func newInMemoryClient() *inMemoryClient { - return &inMemoryClient{values: make(map[Key]interface{})} + var globalValues atomic.Value + globalValues.Store(make(map[Key]interface{})) + return &inMemoryClient{globalValues: globalValues} } func (mc *inMemoryClient) SetValue(key Key, value interface{}) { - mc.Lock() - defer mc.Unlock() - mc.values[key] = value + v := mc.globalValues.Load().(map[Key]interface{}) + v[key] = value + mc.globalValues.Store(v) } -func (mc *inMemoryClient) GetValue(key Key) (interface{}, error) { - mc.RLock() - defer mc.RUnlock() - if val, ok := mc.values[key]; ok { +func (mc *inMemoryClient) GetValue(key Key, defaultValue interface{}) (interface{}, error) { + v := mc.globalValues.Load().(map[Key]interface{}) + if val, ok := v[key]; ok { return val, nil } - return nil, errors.New("unable to find key") + return defaultValue, errors.New("unable to find key") } func (mc *inMemoryClient) GetValueWithFilters( - name Key, filters map[Filter]interface{}, + name Key, filters map[Filter]interface{}, defaultValue interface{}, ) (interface{}, error) { - return mc.GetValue(name) + return mc.GetValue(name, defaultValue) +} + +func (mc *inMemoryClient) GetIntValue(name Key, filters map[Filter]interface{}, defaultValue int) (int, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *inMemoryClient) GetFloatValue(name Key, filters map[Filter]interface{}, defaultValue float64) (float64, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *inMemoryClient) GetBoolValue(name Key, filters map[Filter]interface{}, defaultValue bool) (bool, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *inMemoryClient) GetStringValue(name Key, filters map[Filter]interface{}, defaultValue string) (string, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *inMemoryClient) GetMapValue( + name Key, filters map[Filter]interface{}, defaultValue map[string]interface{}, +) (map[string]interface{}, error) { + return defaultValue, errors.New("unable to find key") +} + +func (mc *inMemoryClient) GetDurationValue( + name Key, filters map[Filter]interface{}, defaultValue time.Duration, +) (time.Duration, error) { + return defaultValue, errors.New("unable to find key") } type configSuite struct { @@ -72,25 +102,19 @@ func TestConfigSuite(t *testing.T) { func (s *configSuite) SetupSuite() { s.client = newInMemoryClient() - s.cln = NewCollection(s.client) + s.cln = NewCollection(s.client, bark.NewLoggerFromLogrus(logrus.New())) } -func (s *configSuite) TestGetIntProperty() { - key := MaxTaskBatchSize - size := s.cln.GetIntProperty(key, 10) +func (s *configSuite) TestGetPropertyInt() { + key := MatchingMaxTaskBatchSize + size := s.cln.GetProperty(key, 10) s.Equal(10, size()) s.client.SetValue(key, 50) - s.Equal(50, size()) - s.client.SetValue(key, "hello world") - s.Panics(func() { size() }, "Should panic") + s.Equal(50, size().(int)) } func (s *configSuite) TestGetDurationProperty() { key := MatchingLongPollExpirationInterval interval := s.cln.GetDurationProperty(key, time.Second) s.Equal(time.Second, interval()) - s.client.SetValue(key, time.Minute) - s.Equal(time.Minute, interval()) - s.client.SetValue(key, 10) - s.Panics(func() { interval() }, "Should panic") } diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index bb5609aadec..4d8be8847ac 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -31,17 +31,20 @@ func (k Key) String() string { } const ( - _matchingRoot = "matching." - _matchingTaskListRoot = _matchingRoot + "taskList." - _historyRoot = "history." + _matchingRoot = "matching." + _matchingDomainTaskListRoot = _matchingRoot + "domain." + "taskList." + _historyRoot = "history." ) var keys = []string{ "unknownKey", - _matchingTaskListRoot + "MinTaskThrottlingBurstSize", - _matchingTaskListRoot + "MaxTaskBatchSize", - _matchingTaskListRoot + "LongPollExpirationInterval", - _historyRoot + "LongPollExpirationInterval", + _matchingDomainTaskListRoot + "minTaskThrottlingBurstSize", + _matchingDomainTaskListRoot + "maxTaskBatchSize", + _matchingDomainTaskListRoot + "longPollExpirationInterval", + _matchingDomainTaskListRoot + "enableSyncMatch", + _matchingDomainTaskListRoot + "updateAckInterval", + _matchingDomainTaskListRoot + "idleTasklistCheckInterval", + _historyRoot + "longPollExpirationInterval", } const ( @@ -49,12 +52,18 @@ const ( unknownKey Key = iota // Matching keys - // MinTaskThrottlingBurstSize is the minimum burst size for task list throttling - MinTaskThrottlingBurstSize - // MaxTaskBatchSize is the maximum batch size to fetch from the task buffer - MaxTaskBatchSize + // MatchingMinTaskThrottlingBurstSize is the minimum burst size for task list throttling + MatchingMinTaskThrottlingBurstSize + // MatchingMaxTaskBatchSize is the maximum batch size to fetch from the task buffer + MatchingMaxTaskBatchSize // MatchingLongPollExpirationInterval is the long poll expiration interval in the matching service MatchingLongPollExpirationInterval + // MatchingEnableSyncMatch is to enable sync match + MatchingEnableSyncMatch + // MatchingUpdateAckInterval is the interval for update ack + MatchingUpdateAckInterval + // MatchingIdleTasklistCheckInterval is the IdleTasklistCheckInterval + MatchingIdleTasklistCheckInterval // HistoryLongPollExpirationInterval is the long poll expiration interval in the history service HistoryLongPollExpirationInterval ) @@ -62,16 +71,17 @@ const ( // Filter represents a filter on the dynamic config key type Filter int -func (k Filter) String() string { - keys := []string{ - "unknownFilter", - "domainName", - "taskListName", +func (f Filter) String() string { + if f <= unknownFilter || f > TaskListName { + return filters[unknownFilter] } - if k <= unknownFilter || k > TaskListName { - return keys[unknownFilter] - } - return keys[k] + return filters[f] +} + +var filters = []string{ + "unknownFilter", + "domainName", + "taskListName", } const ( @@ -81,3 +91,20 @@ const ( // TaskListName is the tasklist name TaskListName ) + +// FilterOption is used to provide filters for dynamic config keys +type FilterOption func(filterMap map[Filter]interface{}) + +// TaskListFilter filters by task list name +func TaskListFilter(name string) FilterOption { + return func(filterMap map[Filter]interface{}) { + filterMap[TaskListName] = name + } +} + +// DomainFilter filters by domain name +func DomainFilter(name string) FilterOption { + return func(filterMap map[Filter]interface{}) { + filterMap[DomainName] = name + } +} diff --git a/common/service/service.go b/common/service/service.go index e148c3f2b44..b40534fc5a3 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -107,7 +107,7 @@ func New(params *BootstrapParams) Service { numberOfHistoryShards: params.CassandraConfig.NumHistoryShards, clusterMetadata: params.ClusterMetadata, messagingClient: params.MessagingClient, - dynamicCollection: dynamicconfig.NewCollection(params.DynamicConfig), + dynamicCollection: dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), } sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(params.MetricScope, time.Minute, sVice.logger) sVice.metricsClient = metrics.NewClient(params.MetricScope, getMetricsServiceIdx(params.Name, params.Logger)) diff --git a/host/onebox.go b/host/onebox.go index 02f2717f8a4..abf3d56d41b 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -198,6 +198,7 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW params.ClusterMetadata = c.clusterMetadata params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards params.CassandraConfig.Hosts = "127.0.0.1" + params.DynamicConfig = dynamicconfig.NewNopClient() // TODO when cross DC is public, remove this temporary override kafkaProducer := &mocks.KafkaProducer{} diff --git a/service/history/service.go b/service/history/service.go index 085601cd3c3..1be4123112e 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -70,7 +70,7 @@ type Config struct { // Time to hold a poll request before returning an empty response // right now only used by GetMutableState - LongPollExpirationInterval func() time.Duration + LongPollExpirationInterval dynamicconfig.DurationPropertyFn } // NewConfig returns new service config with default values @@ -125,7 +125,8 @@ func NewService(params *service.BootstrapParams) common.Daemon { params: params, stopC: make(chan struct{}), config: NewConfig( - dynamicconfig.NewCollection(params.DynamicConfig), params.CassandraConfig.NumHistoryShards, + dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), + params.CassandraConfig.NumHistoryShards, ), } } diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index 3ae883701aa..94a7f29251c 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -196,7 +196,7 @@ func (s *matchingEngineSuite) TestPollForDecisionTasksEmptyResult() { func (s *matchingEngineSuite) PollForTasksEmptyResultTest(taskType int) { s.matchingEngine.config.RangeSize = 2 // to test that range is not updated without tasks - s.matchingEngine.config.LongPollExpirationInterval = func(string) time.Duration { return 10 * time.Millisecond } + s.matchingEngine.config.LongPollExpirationInterval = func(...dynamicconfig.FilterOption) time.Duration { return 10 * time.Millisecond } domainID := "domainId" tl := "makeToast" @@ -355,7 +355,7 @@ func (s *matchingEngineSuite) TestTaskWriterShutdown() { } func (s *matchingEngineSuite) TestAddThenConsumeActivities() { - s.matchingEngine.config.LongPollExpirationInterval = func(string) time.Duration { return 10 * time.Millisecond } + s.matchingEngine.config.LongPollExpirationInterval = func(...dynamicconfig.FilterOption) time.Duration { return 10 * time.Millisecond } runID := "run1" workflowID := "workflow1" @@ -469,7 +469,7 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() { func (s *matchingEngineSuite) TestSyncMatchActivities() { // Set a short long poll expiration so we don't have to wait too long for 0 throttling cases - s.matchingEngine.config.LongPollExpirationInterval = func(string) time.Duration { return 50 * time.Millisecond } + s.matchingEngine.config.LongPollExpirationInterval = func(...dynamicconfig.FilterOption) time.Duration { return 50 * time.Millisecond } runID := "run1" workflowID := "workflow1" @@ -615,7 +615,7 @@ func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivities() { func (s *matchingEngineSuite) TestConcurrentPublishConsumeActivitiesWithZeroDispatch() { // Set a short long poll expiration so we don't have to wait too long for 0 throttling cases - s.matchingEngine.config.LongPollExpirationInterval = func(string) time.Duration { return 20 * time.Millisecond } + s.matchingEngine.config.LongPollExpirationInterval = func(...dynamicconfig.FilterOption) time.Duration { return 20 * time.Millisecond } dispatchLimitFn := func(wc int, tc int64) float64 { if tc%50 == 0 && wc%5 == 0 { // Gets triggered atleast 20 times return 0 @@ -1650,7 +1650,7 @@ func validateTimeRange(t time.Time, expectedDuration time.Duration) bool { currentTime := time.Now() diff := time.Duration(currentTime.UnixNano() - t.UnixNano()) if diff > expectedDuration { - log.Infof("Current time: %v, Application time: %v, Differenrce: %v", currentTime, t, diff) + log.Infof("Current time: %v, Application time: %v, Difference: %v", currentTime, t, diff) return false } return true @@ -1658,6 +1658,6 @@ func validateTimeRange(t time.Time, expectedDuration time.Duration) bool { func defaultTestConfig() *Config { config := NewConfig(dynamicconfig.NewNopCollection()) - config.LongPollExpirationInterval = func(string) time.Duration { return 100 * time.Millisecond } + config.LongPollExpirationInterval = func(...dynamicconfig.FilterOption) time.Duration { return 100 * time.Millisecond } return config } diff --git a/service/matching/service.go b/service/matching/service.go index b79428bbda0..8c073b95f49 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -31,16 +31,16 @@ import ( // Config represents configuration for cadence-matching service type Config struct { - EnableSyncMatch bool + EnableSyncMatch dynamicconfig.BoolPropertyFn // taskListManager configuration RangeSize int64 - GetTasksBatchSize int - UpdateAckInterval time.Duration - IdleTasklistCheckInterval time.Duration + GetTasksBatchSize dynamicconfig.IntPropertyFn + UpdateAckInterval dynamicconfig.DurationPropertyFn + IdleTasklistCheckInterval dynamicconfig.DurationPropertyFn // Time to hold a poll request before returning an empty response if there are no tasks - LongPollExpirationInterval func(string) time.Duration - MinTaskThrottlingBurstSize func(string) int + LongPollExpirationInterval dynamicconfig.DurationPropertyFn + MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFn // taskWriter configuration OutstandingTaskAppendsThreshold int @@ -50,16 +50,24 @@ type Config struct { // NewConfig returns new service config with default values func NewConfig(dc *dynamicconfig.Collection) *Config { return &Config{ - EnableSyncMatch: true, - RangeSize: 100000, - GetTasksBatchSize: 1000, - UpdateAckInterval: 10 * time.Second, - IdleTasklistCheckInterval: 5 * time.Minute, - LongPollExpirationInterval: dc.GetDurationPropertyWithTaskList( + EnableSyncMatch: dc.GetBoolProperty( + dynamicconfig.MatchingEnableSyncMatch, true, + ), + RangeSize: 100000, + GetTasksBatchSize: dc.GetIntProperty( + dynamicconfig.MatchingMaxTaskBatchSize, 1000, + ), + UpdateAckInterval: dc.GetDurationProperty( + dynamicconfig.MatchingUpdateAckInterval, 10*time.Second, + ), + IdleTasklistCheckInterval: dc.GetDurationProperty( + dynamicconfig.MatchingIdleTasklistCheckInterval, 5*time.Minute, + ), + LongPollExpirationInterval: dc.GetDurationProperty( dynamicconfig.MatchingLongPollExpirationInterval, time.Minute, ), - MinTaskThrottlingBurstSize: dc.GetIntPropertyWithTaskList( - dynamicconfig.MinTaskThrottlingBurstSize, 1, + MinTaskThrottlingBurstSize: dc.GetIntProperty( + dynamicconfig.MatchingMinTaskThrottlingBurstSize, 1, ), OutstandingTaskAppendsThreshold: 250, MaxTaskBatchSize: 100, @@ -77,7 +85,7 @@ type Service struct { func NewService(params *service.BootstrapParams) common.Daemon { return &Service{ params: params, - config: NewConfig(dynamicconfig.NewCollection(params.DynamicConfig)), + config: NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger)), stopC: make(chan struct{}), } } diff --git a/service/matching/taskListManager.go b/service/matching/taskListManager.go index c1a3997df16..a8e1a1e4e3b 100644 --- a/service/matching/taskListManager.go +++ b/service/matching/taskListManager.go @@ -29,6 +29,8 @@ import ( "sync/atomic" "time" + "github.com/uber/cadence/common/service/dynamicconfig" + h "github.com/uber/cadence/.gen/go/history" m "github.com/uber/cadence/.gen/go/matching" s "github.com/uber/cadence/.gen/go/shared" @@ -66,15 +68,14 @@ type taskListManager interface { } type taskListConfig struct { - EnableSyncMatch bool + EnableSyncMatch func() bool // Time to hold a poll request before returning an empty response if there are no tasks LongPollExpirationInterval func() time.Duration RangeSize int64 - GetTasksBatchSize int - UpdateAckInterval time.Duration - IdleTasklistCheckInterval time.Duration + GetTasksBatchSize func() int + UpdateAckInterval func() time.Duration + IdleTasklistCheckInterval func() time.Duration MinTaskThrottlingBurstSize func() int - // taskWriter configuration OutstandingTaskAppendsThreshold int MaxTaskBatchSize int @@ -82,18 +83,29 @@ type taskListConfig struct { func newTaskListConfig(id *taskListID, config *Config) *taskListConfig { taskListName := id.taskListName + tlOpt := dynamicconfig.TaskListFilter(taskListName) return &taskListConfig{ - RangeSize: config.RangeSize, - GetTasksBatchSize: config.GetTasksBatchSize, - UpdateAckInterval: config.UpdateAckInterval, - IdleTasklistCheckInterval: config.IdleTasklistCheckInterval, + RangeSize: config.RangeSize, + GetTasksBatchSize: func() int { + return config.GetTasksBatchSize(tlOpt) + }, + UpdateAckInterval: func() time.Duration { + return config.UpdateAckInterval(tlOpt) + }, + IdleTasklistCheckInterval: func() time.Duration { + return config.IdleTasklistCheckInterval(tlOpt) + }, MinTaskThrottlingBurstSize: func() int { - return config.MinTaskThrottlingBurstSize(taskListName) + return config.MinTaskThrottlingBurstSize(tlOpt) + }, + EnableSyncMatch: func() bool { + return config.EnableSyncMatch(tlOpt) }, - EnableSyncMatch: config.EnableSyncMatch, LongPollExpirationInterval: func() time.Duration { - return config.LongPollExpirationInterval(taskListName) + return config.LongPollExpirationInterval(tlOpt) }, + OutstandingTaskAppendsThreshold: config.OutstandingTaskAppendsThreshold, + MaxTaskBatchSize: config.MaxTaskBatchSize, } } @@ -170,11 +182,12 @@ func newTaskListManager( e *matchingEngineImpl, taskList *taskListID, taskListKind *s.TaskListKind, config *Config, ) taskListManager { dPtr := _defaultTaskDispatchRPS + taskListConfig := newTaskListConfig(taskList, config) rl := newRateLimiter( - &dPtr, _defaultTaskDispatchRPSTTL, config.MinTaskThrottlingBurstSize(taskList.taskListName), + &dPtr, _defaultTaskDispatchRPSTTL, taskListConfig.MinTaskThrottlingBurstSize(), ) return newTaskListManagerWithRateLimiter( - e, taskList, taskListKind, newTaskListConfig(taskList, config), rl, + e, taskList, taskListKind, taskListConfig, rl, ) } @@ -183,7 +196,7 @@ func newTaskListManagerWithRateLimiter( rl rateLimiter, ) taskListManager { // To perform one db operation if there are no pollers - taskBufferSize := config.GetTasksBatchSize - 1 + taskBufferSize := config.GetTasksBatchSize() - 1 ctx, cancel := context.WithCancel(context.Background()) tlMgr := &taskListManagerImpl{ engine: e, @@ -562,7 +575,7 @@ func (c *taskListManagerImpl) getTaskBatchWithRange(readLevel int64, maxReadLeve DomainID: c.taskListID.domainID, TaskList: c.taskListID.taskListName, TaskType: c.taskListID.taskType, - BatchSize: c.config.GetTasksBatchSize, + BatchSize: c.config.GetTasksBatchSize(), RangeID: rangeID, ReadLevel: readLevel, // exclusive MaxReadLevel: maxReadLevel, // inclusive @@ -653,7 +666,7 @@ func (c *taskListManagerImpl) GetAllPollerInfo() []*pollerInfo { // and sent to a poller. So it not necessary to persist it. // Returns (nil, nil) if there is no waiting poller which indicates that task has to be persisted. func (c *taskListManagerImpl) trySyncMatch(task *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) { - if !c.config.EnableSyncMatch { + if !c.config.EnableSyncMatch() { return nil, nil } // Request from the point of view of Add(Activity|Decision)Task operation. @@ -712,8 +725,8 @@ func (c *taskListManagerImpl) getTasksPump() { c.startWG.Wait() go c.deliverBufferTasksForPoll() - updateAckTimer := time.NewTimer(c.config.UpdateAckInterval) - checkPollerTimer := time.NewTimer(c.config.IdleTasklistCheckInterval) + updateAckTimer := time.NewTimer(c.config.UpdateAckInterval()) + checkPollerTimer := time.NewTimer(c.config.IdleTasklistCheckInterval()) getTasksPumpLoop: for { select { @@ -765,7 +778,7 @@ getTasksPumpLoop: // keep going as saving ack is not critical } c.signalNewTask() // periodically signal pump to check persistence for tasks - updateAckTimer = time.NewTimer(c.config.UpdateAckInterval) + updateAckTimer = time.NewTimer(c.config.UpdateAckInterval()) } case <-checkPollerTimer.C: { @@ -773,7 +786,7 @@ getTasksPumpLoop: if len(pollers) == 0 { c.Stop() } - checkPollerTimer = time.NewTimer(c.config.IdleTasklistCheckInterval) + checkPollerTimer = time.NewTimer(c.config.IdleTasklistCheckInterval()) } } }