diff --git a/common/cache/cache.go b/common/cache/cache.go index af4a354005e..f2da166e39c 100644 --- a/common/cache/cache.go +++ b/common/cache/cache.go @@ -26,6 +26,7 @@ import ( "time" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/metrics" ) @@ -93,6 +94,10 @@ type Options struct { // - when the cache is full // - when the item is accessed ActivelyEvict bool + + // TimeSource is used to get the current time + // It is optional and defaults to clock.NewRealTimeSource() + TimeSource clock.TimeSource } // SimpleOptions provides options that can be used to configure SimpleCache diff --git a/common/cache/lru.go b/common/cache/lru.go index ee00bc59ae6..3d3be426e8a 100644 --- a/common/cache/lru.go +++ b/common/cache/lru.go @@ -25,6 +25,8 @@ import ( "errors" "sync" "time" + + "github.com/uber/cadence/common/clock" ) var ( @@ -52,7 +54,7 @@ type ( isSizeBased bool activelyEvict bool // We use this instead of time.Now() in order to make testing easier - now func() time.Time + timeSource clock.TimeSource } iteratorImpl struct { @@ -117,7 +119,7 @@ func (c *lru) Iterator() Iterator { c.mut.Lock() iterator := &iteratorImpl{ lru: c, - createTime: c.now(), + createTime: c.timeSource.Now(), nextItem: c.byAccess.Front(), } iterator.prepareNext() @@ -143,6 +145,10 @@ func New(opts *Options) Cache { "MaxSize and GetCacheItemSizeFunc (size based) options must be provided for the LRU cache") } + timeSource := opts.TimeSource + if timeSource == nil { + timeSource = clock.NewRealTimeSource() + } cache := &lru{ byAccess: list.New(), byKey: make(map[interface{}]*list.Element, opts.InitialCapacity), @@ -150,7 +156,7 @@ func New(opts *Options) Cache { pin: opts.Pin, rmFunc: opts.RemovedFunc, activelyEvict: opts.ActivelyEvict, - now: time.Now, + timeSource: timeSource, } cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0 @@ -183,7 +189,7 @@ func (c *lru) Get(key interface{}) interface{} { entry := element.Value.(*entryImpl) - if c.isEntryExpired(entry, c.now()) { + if c.isEntryExpired(entry, c.timeSource.Now()) { // Entry has expired c.deleteInternal(element) return nil @@ -262,7 +268,7 @@ func (c *lru) evictExpiredItems() { return // do nothing if activelyEvict is not set } - now := c.now() + now := c.timeSource.Now() for elt := c.byAccess.Back(); elt != nil; elt = c.byAccess.Back() { if !c.isEntryExpired(elt.Value.(*entryImpl), now) { // List is sorted by item age, so we can stop as soon as we found first non expired item. @@ -284,7 +290,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) elt := c.byKey[key] if elt != nil { entry := elt.Value.(*entryImpl) - if c.isEntryExpired(entry, c.now()) { + if c.isEntryExpired(entry, c.timeSource.Now()) { // Entry has expired c.deleteInternal(elt) } else { @@ -292,7 +298,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) if allowUpdate { entry.value = value if c.ttl != 0 { - entry.createTime = c.now() + entry.createTime = c.timeSource.Now() } } @@ -314,7 +320,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) } if c.ttl != 0 { - entry.createTime = c.now() + entry.createTime = c.timeSource.Now() } c.byKey[key] = c.byAccess.PushFront(entry) diff --git a/common/cache/lru_test.go b/common/cache/lru_test.go index 09ac0d33126..602f6ed303e 100644 --- a/common/cache/lru_test.go +++ b/common/cache/lru_test.go @@ -27,6 +27,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common/clock" ) type keyType struct { @@ -91,19 +93,17 @@ func TestGenerics(t *testing.T) { } func TestLRUWithTTL(t *testing.T) { + mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0)) cache := New(&Options{ - MaxCount: 5, - TTL: time.Millisecond * 100, + MaxCount: 5, + TTL: time.Millisecond * 100, + TimeSource: mockTimeSource, }).(*lru) - // We will capture this in the caches now function, and advance time as needed - currentTime := time.UnixMilli(0) - cache.now = func() time.Time { return currentTime } - cache.Put("A", "foo") assert.Equal(t, "foo", cache.Get("A")) - currentTime = currentTime.Add(time.Millisecond * 300) + mockTimeSource.Advance(time.Millisecond * 300) assert.Nil(t, cache.Get("A")) assert.Equal(t, 0, cache.Size()) @@ -188,6 +188,7 @@ func TestRemoveFunc(t *testing.T) { func TestRemovedFuncWithTTL(t *testing.T) { ch := make(chan bool) + mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0)) cache := New(&Options{ MaxCount: 5, TTL: time.Millisecond * 50, @@ -196,29 +197,27 @@ func TestRemovedFuncWithTTL(t *testing.T) { assert.True(t, ok) ch <- true }, + TimeSource: mockTimeSource, }).(*lru) - // We will capture this in the caches now function, and advance time as needed - currentTime := time.UnixMilli(0) - cache.now = func() time.Time { return currentTime } - cache.Put("A", t) assert.Equal(t, t, cache.Get("A")) - currentTime = currentTime.Add(time.Millisecond * 100) + mockTimeSource.Advance(time.Millisecond * 100) assert.Nil(t, cache.Get("A")) select { case b := <-ch: assert.True(t, b) - case <-time.After(100 * time.Millisecond): + case <-mockTimeSource.After(100 * time.Millisecond): t.Error("RemovedFunc did not send true on channel ch") } } func TestRemovedFuncWithTTL_Pin(t *testing.T) { ch := make(chan bool) + mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0)) cache := New(&Options{ MaxCount: 5, TTL: time.Millisecond * 50, @@ -228,16 +227,13 @@ func TestRemovedFuncWithTTL_Pin(t *testing.T) { assert.True(t, ok) ch <- true }, + TimeSource: mockTimeSource, }).(*lru) - // We will capture this in the caches now function, and advance time as needed - currentTime := time.UnixMilli(0) - cache.now = func() time.Time { return currentTime } - _, err := cache.PutIfNotExist("A", t) assert.NoError(t, err) assert.Equal(t, t, cache.Get("A")) - currentTime = currentTime.Add(time.Millisecond * 100) + mockTimeSource.Advance(time.Millisecond * 100) assert.Equal(t, t, cache.Get("A")) // release 3 time since put if not exist also increase the counter cache.Release("A") @@ -248,7 +244,7 @@ func TestRemovedFuncWithTTL_Pin(t *testing.T) { select { case b := <-ch: assert.True(t, b) - case <-time.After(300 * time.Millisecond): + case <-mockTimeSource.After(300 * time.Millisecond): t.Error("RemovedFunc did not send true on channel ch") } } @@ -403,24 +399,22 @@ func TestPanicOptionsIsNil(t *testing.T) { func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) { // Create the cache with a TTL of 75s + mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0)) cache, ok := New(&Options{ MaxCount: 5, TTL: time.Second * 75, ActivelyEvict: true, + TimeSource: mockTimeSource, }).(*lru) require.True(t, ok) - // We will capture this in the caches now function, and advance time as needed - currentTime := time.UnixMilli(0) - cache.now = func() time.Time { return currentTime } - _, err := cache.PutIfNotExist("A", 1) require.NoError(t, err) _, err = cache.PutIfNotExist("B", 2) require.NoError(t, err) // Nothing is expired after 50s - currentTime = currentTime.Add(time.Second * 50) + mockTimeSource.Advance(time.Second * 50) assert.Equal(t, 2, cache.Size()) _, err = cache.PutIfNotExist("C", 3) @@ -432,10 +426,10 @@ func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) { assert.Equal(t, 4, cache.Size()) // Advance time to 100s, so A and B should be expired - currentTime = currentTime.Add(time.Second * 50) + mockTimeSource.Advance(time.Second * 50) assert.Equal(t, 2, cache.Size()) // Advance time to 150s, so C and D should be expired as well - currentTime = currentTime.Add(time.Second * 50) + mockTimeSource.Advance(time.Second * 50) assert.Equal(t, 0, cache.Size()) } diff --git a/service/matching/handler/engine_integration_test.go b/service/matching/handler/engine_integration_test.go index e591fd9833d..5d3f7a75344 100644 --- a/service/matching/handler/engine_integration_test.go +++ b/service/matching/handler/engine_integration_test.go @@ -118,7 +118,8 @@ func (s *matchingEngineSuite) SetupTest() { s.mockExecutionManager = &mocks.ExecutionManager{} s.controller = gomock.NewController(s.T()) s.mockHistoryClient = history.NewMockClient(s.controller) - s.taskManager = tasklist.NewTestTaskManager(s.T(), s.logger) + s.mockTimeSource = clock.NewMockedTimeSourceAt(time.Now()) + s.taskManager = tasklist.NewTestTaskManager(s.T(), s.logger, s.mockTimeSource) s.mockDomainCache = cache.NewMockDomainCache(s.controller) s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes() @@ -130,7 +131,6 @@ func (s *matchingEngineSuite) SetupTest() { dc := dynamicconfig.NewCollection(dcClient, s.logger) isolationGroupState, _ := defaultisolationgroupstate.NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(s.logger, dc, s.mockDomainCache, s.mockIsolationStore, metrics.NewNoopMetricsClient()) s.partitioner = partition.NewDefaultPartitioner(s.logger, isolationGroupState) - s.mockTimeSource = clock.NewMockedTimeSourceAt(time.Unix(1000000, 0)) s.handlerContext = newHandlerContext( context.Background(), matchingTestDomainName, diff --git a/service/matching/poller/history.go b/service/matching/poller/history.go index 0e948df7b9b..f69adfc1206 100644 --- a/service/matching/poller/history.go +++ b/service/matching/poller/history.go @@ -25,6 +25,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/types" ) @@ -55,12 +56,13 @@ type History struct { // HistoryUpdatedFunc is a type for notifying applications when the poller history was updated type HistoryUpdatedFunc func() -func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc) *History { +func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.TimeSource) *History { opts := &cache.Options{ InitialCapacity: pollerHistoryInitSize, TTL: pollerHistoryTTL, Pin: false, MaxCount: pollerHistoryInitMaxSize, + TimeSource: timeSource, } return &History{ diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index 5acda7e78cc..f44c81d3f4d 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -213,7 +213,7 @@ func NewManager( tlMgr.pollerHistory = poller.NewPollerHistory(func() { taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter, float64(len(tlMgr.pollerHistory.GetPollerInfo(time.Time{})))) - }) + }, timeSource) livenessInterval := taskListConfig.IdleTasklistCheckInterval() tlMgr.liveness = liveness.NewLiveness(clock.NewRealTimeSource(), livenessInterval, func() { diff --git a/service/matching/tasklist/task_list_manager_test.go b/service/matching/tasklist/task_list_manager_test.go index 0795fd36a6f..4abf7719a8a 100644 --- a/service/matching/tasklist/task_list_manager_test.go +++ b/service/matching/tasklist/task_list_manager_test.go @@ -157,7 +157,7 @@ func createTestTaskListManager(t *testing.T, logger log.Logger, controller *gomo } func createTestTaskListManagerWithConfig(t *testing.T, logger log.Logger, controller *gomock.Controller, cfg *config.Config) *taskListManagerImpl { - tm := NewTestTaskManager(t, logger) + tm := NewTestTaskManager(t, logger, clock.NewRealTimeSource()) mockPartitioner := partition.NewMockPartitioner(controller) mockPartitioner.EXPECT().GetIsolationGroupByDomainID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", nil).AnyTimes() mockDomainCache := cache.NewMockDomainCache(controller) @@ -552,9 +552,9 @@ func TestTaskListManagerGetTaskBatch(t *testing.T) { mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry("domainName"), nil).AnyTimes() mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("domainName", nil).AnyTimes() logger := testlogger.New(t) - tm := NewTestTaskManager(t, logger) - taskListID := NewTestTaskListID(t, "domainId", "tl", 0) timeSource := clock.NewRealTimeSource() + tm := NewTestTaskManager(t, logger, timeSource) + taskListID := NewTestTaskListID(t, "domainId", "tl", 0) cfg := defaultTestConfig() cfg.RangeSize = rangeSize tlMgr, err := NewManager( @@ -721,9 +721,9 @@ func TestTaskExpiryAndCompletion(t *testing.T) { mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry("domainName"), nil).AnyTimes() mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("domainName", nil).AnyTimes() logger := testlogger.New(t) - tm := NewTestTaskManager(t, logger) - taskListID := NewTestTaskListID(t, "domainId", "tl", 0) timeSource := clock.NewRealTimeSource() + tm := NewTestTaskManager(t, logger, timeSource) + taskListID := NewTestTaskListID(t, "domainId", "tl", 0) cfg := defaultTestConfig() cfg.RangeSize = rangeSize cfg.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(tc.batchSize) diff --git a/service/matching/tasklist/task_reader.go b/service/matching/tasklist/task_reader.go index 0410797b477..21a48051dd8 100644 --- a/service/matching/tasklist/task_reader.go +++ b/service/matching/tasklist/task_reader.go @@ -32,6 +32,7 @@ import ( "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -66,6 +67,7 @@ type ( taskAckManager messaging.AckManager domainCache cache.DomainCache clusterMetadata cluster.Metadata + timeSource clock.TimeSource // The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal // approach is to use request-scoped contexts and use a unique one for each call to Wait. However // in order to cancel it on shutdown, we need a new goroutine for each call that would wait on @@ -111,6 +113,7 @@ func newTaskReader(tlMgr *taskListManagerImpl, isolationGroups []string) *taskRe taskBuffers: taskBuffers, domainCache: tlMgr.domainCache, clusterMetadata: tlMgr.clusterMetadata, + timeSource: tlMgr.timeSource, logger: tlMgr.logger, scope: tlMgr.scope, handleErr: tlMgr.handleErr, @@ -280,20 +283,12 @@ func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, erro return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed } -func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo, now time.Time) bool { - return t.Expiry.After(epochStartTime) && time.Now().After(t.Expiry) +func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo) bool { + return t.Expiry.After(epochStartTime) && tr.timeSource.Now().After(t.Expiry) } func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool { - now := time.Now() for _, t := range tasks { - if tr.isTaskExpired(t, now) { - tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter) - // Also increment readLevel for expired tasks otherwise it could result in - // looping over the same tasks if all tasks read in the batch are expired - tr.taskAckManager.SetReadLevel(t.TaskID) - continue - } if !tr.addSingleTaskToBuffer(t) { return false // we are shutting down the task list } @@ -302,6 +297,13 @@ func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool { } func (tr *taskReader) addSingleTaskToBuffer(task *persistence.TaskInfo) bool { + if tr.isTaskExpired(task) { + tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter) + // Also increment readLevel for expired tasks otherwise it could result in + // looping over the same tasks if all tasks read in the batch are expired + tr.taskAckManager.SetReadLevel(task.TaskID) + return true + } err := tr.taskAckManager.ReadItem(task.TaskID) if err != nil { tr.logger.Fatal("critical bug when adding item to ackManager", tag.Error(err)) diff --git a/service/matching/tasklist/testing.go b/service/matching/tasklist/testing.go index 742e889876f..fe88e450fe2 100644 --- a/service/matching/tasklist/testing.go +++ b/service/matching/tasklist/testing.go @@ -32,6 +32,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/emirpasic/gods/maps/treemap" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/persistence" ) @@ -41,9 +42,10 @@ var _ persistence.TaskManager = (*TestTaskManager)(nil) // Asserts that interfac type ( TestTaskManager struct { sync.Mutex - t *testing.T - taskLists map[Identifier]*testTaskListManager - logger log.Logger + t *testing.T + taskLists map[Identifier]*testTaskListManager + logger log.Logger + timeSource clock.TimeSource } testTaskListManager struct { @@ -59,11 +61,12 @@ func newTestTaskListManager() *testTaskListManager { return &testTaskListManager{tasks: treemap.NewWith(int64Comparator)} } -func NewTestTaskManager(t *testing.T, logger log.Logger) *TestTaskManager { +func NewTestTaskManager(t *testing.T, logger log.Logger, timeSource clock.TimeSource) *TestTaskManager { return &TestTaskManager{ - t: t, - taskLists: make(map[Identifier]*testTaskListManager), - logger: logger, + t: t, + taskLists: make(map[Identifier]*testTaskListManager), + logger: logger, + timeSource: timeSource, } } @@ -227,7 +230,7 @@ func (m *TestTaskManager) CreateTasks( PartitionConfig: task.Data.PartitionConfig, } if task.Data.ScheduleToStartTimeout != 0 { - info.Expiry = time.Now().Add(time.Duration(task.Data.ScheduleToStartTimeout) * time.Second) + info.Expiry = m.timeSource.Now().Add(time.Duration(task.Data.ScheduleToStartTimeout) * time.Second) } tlm.tasks.Put(task.TaskID, info) tlm.createTaskCount++