diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 4dfe26d76e1..df515327e22 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -60,6 +60,7 @@ var keys = map[Key]string{ MatchingEnableSyncMatch: "matching.enableSyncMatch", MatchingUpdateAckInterval: "matching.updateAckInterval", MatchingIdleTasklistCheckInterval: "matching.idleTasklistCheckInterval", + MaxTasklistIdleTime: "matching.maxTasklistIdleTime", MatchingOutstandingTaskAppendsThreshold: "matching.outstandingTaskAppendsThreshold", MatchingMaxTaskBatchSize: "matching.maxTaskBatchSize", MatchingRPS: "matching.rps", @@ -149,6 +150,8 @@ const ( MatchingUpdateAckInterval // MatchingIdleTasklistCheckInterval is the IdleTasklistCheckInterval MatchingIdleTasklistCheckInterval + // MaxTasklistIdleTime is the max time tasklist being idle + MaxTasklistIdleTime // MatchingOutstandingTaskAppendsThreshold is the threshold for outstanding task appends MatchingOutstandingTaskAppendsThreshold // MatchingMaxTaskBatchSize is max batch size for task writer diff --git a/service/matching/service.go b/service/matching/service.go index 33fefdc23a2..772a334dc33 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -39,6 +39,7 @@ type Config struct { GetTasksBatchSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters UpdateAckInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters IdleTasklistCheckInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters + MaxTasklistIdleTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters // Time to hold a poll request before returning an empty response if there are no tasks LongPollExpirationInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters MinTaskThrottlingBurstSize dynamicconfig.IntPropertyFnWithTaskListInfoFilters @@ -57,6 +58,7 @@ func NewConfig(dc *dynamicconfig.Collection) *Config { GetTasksBatchSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingGetTasksBatchSize, 1000), UpdateAckInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingUpdateAckInterval, 1*time.Minute), IdleTasklistCheckInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingIdleTasklistCheckInterval, 5*time.Minute), + MaxTasklistIdleTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MaxTasklistIdleTime, 5*time.Minute), LongPollExpirationInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingLongPollExpirationInterval, time.Minute), MinTaskThrottlingBurstSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingMinTaskThrottlingBurstSize, 1), OutstandingTaskAppendsThreshold: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingOutstandingTaskAppendsThreshold, 250), diff --git a/service/matching/taskListManager.go b/service/matching/taskListManager.go index ff758a300d2..5220bafac31 100644 --- a/service/matching/taskListManager.go +++ b/service/matching/taskListManager.go @@ -74,6 +74,7 @@ type taskListConfig struct { GetTasksBatchSize func() int UpdateAckInterval func() time.Duration IdleTasklistCheckInterval func() time.Duration + MaxTasklistIdleTime func() time.Duration MinTaskThrottlingBurstSize func() int // taskWriter configuration OutstandingTaskAppendsThreshold func() int @@ -100,6 +101,9 @@ func newTaskListConfig(id *taskListID, config *Config, domainCache cache.DomainC IdleTasklistCheckInterval: func() time.Duration { return config.IdleTasklistCheckInterval(domain, taskListName, taskType) }, + MaxTasklistIdleTime: func() time.Duration { + return config.MaxTasklistIdleTime(domain, taskListName, taskType) + }, MinTaskThrottlingBurstSize: func() int { return config.MinTaskThrottlingBurstSize(domain, taskListName, taskType) }, @@ -742,7 +746,8 @@ func (c *taskListManagerImpl) getTasksPump() { go c.deliverBufferTasksForPoll() updateAckTimer := time.NewTimer(c.config.UpdateAckInterval()) - checkPollerTimer := time.NewTimer(c.config.IdleTasklistCheckInterval()) + checkIdleTaskListTimer := time.NewTimer(c.config.IdleTasklistCheckInterval()) + lastTimeWriteTask := time.Time{} getTasksPumpLoop: for { select { @@ -750,6 +755,8 @@ getTasksPumpLoop: break getTasksPumpLoop case <-c.notifyCh: { + lastTimeWriteTask = time.Now() + tasks, readLevel, err := c.getTaskBatch() if err != nil { c.signalNewTask() // re-enqueue the event @@ -796,19 +803,18 @@ getTasksPumpLoop: c.signalNewTask() // periodically signal pump to check persistence for tasks updateAckTimer = time.NewTimer(c.config.UpdateAckInterval()) } - case <-checkPollerTimer.C: + case <-checkIdleTaskListTimer.C: { - pollers := c.GetAllPollerInfo() - if len(pollers) == 0 { + if !c.isTaskAddedRecently(lastTimeWriteTask) && len(c.GetAllPollerInfo()) == 0 { c.Stop() } - checkPollerTimer = time.NewTimer(c.config.IdleTasklistCheckInterval()) + checkIdleTaskListTimer = time.NewTimer(c.config.IdleTasklistCheckInterval()) } } } updateAckTimer.Stop() - checkPollerTimer.Stop() + checkIdleTaskListTimer.Stop() } // Retry operation on transient error and on rangeID change. On rangeID update by another process calls c.Stop(). @@ -953,3 +959,7 @@ func (c *taskContext) completeTask(err error) { func createServiceBusyError(msg string) *s.ServiceBusyError { return &s.ServiceBusyError{Message: msg} } + +func (c *taskListManagerImpl) isTaskAddedRecently(lastAddTime time.Time) bool { + return time.Now().Sub(lastAddTime) <= c.config.MaxTasklistIdleTime() +} diff --git a/service/matching/taskListManager_test.go b/service/matching/taskListManager_test.go index c17d40444df..bb745605240 100644 --- a/service/matching/taskListManager_test.go +++ b/service/matching/taskListManager_test.go @@ -22,6 +22,7 @@ package matching import ( "sync" + "sync/atomic" "testing" "time" @@ -32,11 +33,13 @@ import ( log "github.com/sirupsen/logrus" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/uber-common/bark" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" ) const _minBurst = 10000 @@ -83,9 +86,12 @@ func TestNewRateLimiter(t *testing.T) { } func createTestTaskListManager() *taskListManagerImpl { + return createTestTaskListManagerWithConfig(defaultTestConfig()) +} + +func createTestTaskListManagerWithConfig(cfg *Config) *taskListManagerImpl { logger := bark.NewLoggerFromLogrus(log.New()) tm := newTestTaskManager(logger) - cfg := defaultTestConfig() mockDomainCache := &cache.DomainCacheMock{} mockDomainCache.On("GetDomainByID", mock.Anything).Return(cache.CreateDomainCacheEntry("domainName"), nil) me := newMatchingEngine( @@ -101,3 +107,48 @@ func createTestTaskListManager() *taskListManagerImpl { } return tlMgr.(*taskListManagerImpl) } + +func TestIsTaskAddedRecently(t *testing.T) { + tlm := createTestTaskListManager() + require.True(t, tlm.isTaskAddedRecently(time.Now())) + require.False(t, tlm.isTaskAddedRecently(time.Now().Add(-tlm.config.MaxTasklistIdleTime()))) + require.True(t, tlm.isTaskAddedRecently(time.Now().Add(1*time.Second))) + require.False(t, tlm.isTaskAddedRecently(time.Time{})) +} + +func tlMgrStartWithoutNotifyEvent(tlm *taskListManagerImpl) { + // mimic tlm.Start() but avoid calling notifyEvent + tlm.startWG.Done() + go tlm.getTasksPump() +} + +func TestCheckIdleTaskList(t *testing.T) { + cfg := NewConfig(dynamicconfig.NewNopCollection()) + cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) + + // Idle + tlm := createTestTaskListManagerWithConfig(cfg) + tlMgrStartWithoutNotifyEvent(tlm) + time.Sleep(20 * time.Millisecond) + require.False(t, atomic.CompareAndSwapInt32(&tlm.stopped, 0, 1)) + + // Active poll-er + tlm = createTestTaskListManagerWithConfig(cfg) + tlm.updatePollerInfo(pollerIdentity{identity: "test-poll"}) + require.Equal(t, 1, len(tlm.GetAllPollerInfo())) + tlMgrStartWithoutNotifyEvent(tlm) + time.Sleep(20 * time.Millisecond) + require.Equal(t, int32(0), tlm.stopped) + tlm.Stop() + require.Equal(t, int32(1), tlm.stopped) + + // Active adding task + tlm = createTestTaskListManagerWithConfig(cfg) + require.Equal(t, 0, len(tlm.GetAllPollerInfo())) + tlMgrStartWithoutNotifyEvent(tlm) + tlm.signalNewTask() + time.Sleep(20 * time.Millisecond) + require.Equal(t, int32(0), tlm.stopped) + tlm.Stop() + require.Equal(t, int32(1), tlm.stopped) +}