Skip to content

Commit

Permalink
Fix check idle tasklist to include active task writers
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Jul 3, 2018
1 parent 7eaf360 commit 904dba0
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
20 changes: 14 additions & 6 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
const (
_defaultTaskDispatchRPS = 100000.0
_defaultTaskDispatchRPSTTL = 60 * time.Second

maxAddTasksIdleTime = 5 * time.Minute
)

var errAddTasklistThrottled = errors.New("cannot add to tasklist, limit exceeded")
Expand Down Expand Up @@ -742,14 +744,17 @@ func (c *taskListManagerImpl) getTasksPump() {

go c.deliverBufferTasksForPoll()
updateAckTimer := time.NewTimer(c.config.UpdateAckInterval())
checkPollerTimer := time.NewTimer(c.config.IdleTasklistCheckInterval())
checkIdleTaskListTimer := time.NewTimer(c.config.IdleTasklistCheckInterval())
lastTimeWriteTask := time.Time{}
getTasksPumpLoop:
for {
select {
case <-c.shutdownCh:
break getTasksPumpLoop
case <-c.notifyCh:
{
lastTimeWriteTask = time.Now()

tasks, readLevel, err := c.getTaskBatch()
if err != nil {
c.signalNewTask() // re-enqueue the event
Expand Down Expand Up @@ -796,19 +801,18 @@ getTasksPumpLoop:
c.signalNewTask() // periodically signal pump to check persistence for tasks
updateAckTimer = time.NewTimer(c.config.UpdateAckInterval())
}
case <-checkPollerTimer.C:
case <-checkIdleTaskListTimer.C:
{
pollers := c.GetAllPollerInfo()
if len(pollers) == 0 {
if !isTaskAddedRecently(lastTimeWriteTask) && len(c.GetAllPollerInfo()) == 0 {
c.Stop()
}
checkPollerTimer = time.NewTimer(c.config.IdleTasklistCheckInterval())
checkIdleTaskListTimer = time.NewTimer(c.config.IdleTasklistCheckInterval())
}
}
}

updateAckTimer.Stop()
checkPollerTimer.Stop()
checkIdleTaskListTimer.Stop()
}

// Retry operation on transient error and on rangeID change. On rangeID update by another process calls c.Stop().
Expand Down Expand Up @@ -953,3 +957,7 @@ func (c *taskContext) completeTask(err error) {
func createServiceBusyError(msg string) *s.ServiceBusyError {
return &s.ServiceBusyError{Message: msg}
}

func isTaskAddedRecently(lastAddTime time.Time) bool {
return time.Now().Sub(lastAddTime) <= maxAddTasksIdleTime
}
52 changes: 51 additions & 1 deletion service/matching/taskListManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package matching

import (
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -32,11 +33,13 @@ import (

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/uber-common/bark"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
)

const _minBurst = 10000
Expand Down Expand Up @@ -83,9 +86,12 @@ func TestNewRateLimiter(t *testing.T) {
}

func createTestTaskListManager() *taskListManagerImpl {
return createTestTaskListManagerWithConfig(defaultTestConfig())
}

func createTestTaskListManagerWithConfig(cfg *Config) *taskListManagerImpl {
logger := bark.NewLoggerFromLogrus(log.New())
tm := newTestTaskManager(logger)
cfg := defaultTestConfig()
mockDomainCache := &cache.DomainCacheMock{}
mockDomainCache.On("GetDomainByID", mock.Anything).Return(cache.CreateDomainCacheEntry("domainName"), nil)
me := newMatchingEngine(
Expand All @@ -101,3 +107,47 @@ func createTestTaskListManager() *taskListManagerImpl {
}
return tlMgr.(*taskListManagerImpl)
}

func TestIsTaskAddedRecently(t *testing.T) {
require.True(t, isTaskAddedRecently(time.Now()))
require.False(t, isTaskAddedRecently(time.Now().Add(-maxAddTasksIdleTime)))
require.True(t, isTaskAddedRecently(time.Now().Add(1*time.Second)))
require.False(t, isTaskAddedRecently(time.Time{}))
}

func tlMgrStartWithoutNotifyEvent(tlm *taskListManagerImpl) {
// mimic tlm.Start() but avoid calling notifyEvent
tlm.startWG.Done()
go tlm.getTasksPump()
}

func TestCheckIdleTaskList(t *testing.T) {
cfg := NewConfig(dynamicconfig.NewNopCollection())
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

// Idle
tlm := createTestTaskListManagerWithConfig(cfg)
tlMgrStartWithoutNotifyEvent(tlm)
time.Sleep(20 * time.Millisecond)
require.False(t, atomic.CompareAndSwapInt32(&tlm.stopped, 0, 1))

// Active poll-er
tlm = createTestTaskListManagerWithConfig(cfg)
tlm.updatePollerInfo(pollerIdentity{identity: "test-poll"})
require.Equal(t, 1, len(tlm.GetAllPollerInfo()))
tlMgrStartWithoutNotifyEvent(tlm)
time.Sleep(20 * time.Millisecond)
require.Equal(t, int32(0), tlm.stopped)
tlm.Stop()
require.Equal(t, int32(1), tlm.stopped)

// Active adding task
tlm = createTestTaskListManagerWithConfig(cfg)
require.Equal(t, 0, len(tlm.GetAllPollerInfo()))
tlMgrStartWithoutNotifyEvent(tlm)
tlm.signalNewTask()
time.Sleep(20 * time.Millisecond)
require.Equal(t, int32(0), tlm.stopped)
tlm.Stop()
require.Equal(t, int32(1), tlm.stopped)
}

0 comments on commit 904dba0

Please sign in to comment.