Skip to content

Commit

Permalink
Fix DrainBacklogNoPollersIsolationGroup tests (#6080)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Jun 3, 2024
1 parent 0aba895 commit f0f7efd
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 61 deletions.
5 changes: 5 additions & 0 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/metrics"
)

Expand Down Expand Up @@ -93,6 +94,10 @@ type Options struct {
// - when the cache is full
// - when the item is accessed
ActivelyEvict bool

// TimeSource is used to get the current time
// It is optional and defaults to clock.NewRealTimeSource()
TimeSource clock.TimeSource
}

// SimpleOptions provides options that can be used to configure SimpleCache
Expand Down
22 changes: 14 additions & 8 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"errors"
"sync"
"time"

"github.com/uber/cadence/common/clock"
)

var (
Expand Down Expand Up @@ -52,7 +54,7 @@ type (
isSizeBased bool
activelyEvict bool
// We use this instead of time.Now() in order to make testing easier
now func() time.Time
timeSource clock.TimeSource
}

iteratorImpl struct {
Expand Down Expand Up @@ -117,7 +119,7 @@ func (c *lru) Iterator() Iterator {
c.mut.Lock()
iterator := &iteratorImpl{
lru: c,
createTime: c.now(),
createTime: c.timeSource.Now(),
nextItem: c.byAccess.Front(),
}
iterator.prepareNext()
Expand All @@ -143,14 +145,18 @@ func New(opts *Options) Cache {
"MaxSize and GetCacheItemSizeFunc (size based) options must be provided for the LRU cache")
}

timeSource := opts.TimeSource
if timeSource == nil {
timeSource = clock.NewRealTimeSource()
}
cache := &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
pin: opts.Pin,
rmFunc: opts.RemovedFunc,
activelyEvict: opts.ActivelyEvict,
now: time.Now,
timeSource: timeSource,
}

cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0
Expand Down Expand Up @@ -183,7 +189,7 @@ func (c *lru) Get(key interface{}) interface{} {

entry := element.Value.(*entryImpl)

if c.isEntryExpired(entry, c.now()) {
if c.isEntryExpired(entry, c.timeSource.Now()) {
// Entry has expired
c.deleteInternal(element)
return nil
Expand Down Expand Up @@ -262,7 +268,7 @@ func (c *lru) evictExpiredItems() {
return // do nothing if activelyEvict is not set
}

now := c.now()
now := c.timeSource.Now()
for elt := c.byAccess.Back(); elt != nil; elt = c.byAccess.Back() {
if !c.isEntryExpired(elt.Value.(*entryImpl), now) {
// List is sorted by item age, so we can stop as soon as we found first non expired item.
Expand All @@ -284,15 +290,15 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
if c.isEntryExpired(entry, c.now()) {
if c.isEntryExpired(entry, c.timeSource.Now()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = c.now()
entry.createTime = c.timeSource.Now()
}
}

Expand All @@ -314,7 +320,7 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
}

if c.ttl != 0 {
entry.createTime = c.now()
entry.createTime = c.timeSource.Now()
}

c.byKey[key] = c.byAccess.PushFront(entry)
Expand Down
46 changes: 20 additions & 26 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common/clock"
)

type keyType struct {
Expand Down Expand Up @@ -91,19 +93,17 @@ func TestGenerics(t *testing.T) {
}

func TestLRUWithTTL(t *testing.T) {
mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0))
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 100,
MaxCount: 5,
TTL: time.Millisecond * 100,
TimeSource: mockTimeSource,
}).(*lru)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

cache.Put("A", "foo")
assert.Equal(t, "foo", cache.Get("A"))

currentTime = currentTime.Add(time.Millisecond * 300)
mockTimeSource.Advance(time.Millisecond * 300)

assert.Nil(t, cache.Get("A"))
assert.Equal(t, 0, cache.Size())
Expand Down Expand Up @@ -188,6 +188,7 @@ func TestRemoveFunc(t *testing.T) {

func TestRemovedFuncWithTTL(t *testing.T) {
ch := make(chan bool)
mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0))
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 50,
Expand All @@ -196,29 +197,27 @@ func TestRemovedFuncWithTTL(t *testing.T) {
assert.True(t, ok)
ch <- true
},
TimeSource: mockTimeSource,
}).(*lru)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

cache.Put("A", t)
assert.Equal(t, t, cache.Get("A"))

currentTime = currentTime.Add(time.Millisecond * 100)
mockTimeSource.Advance(time.Millisecond * 100)

assert.Nil(t, cache.Get("A"))

select {
case b := <-ch:
assert.True(t, b)
case <-time.After(100 * time.Millisecond):
case <-mockTimeSource.After(100 * time.Millisecond):
t.Error("RemovedFunc did not send true on channel ch")
}
}

func TestRemovedFuncWithTTL_Pin(t *testing.T) {
ch := make(chan bool)
mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0))
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 50,
Expand All @@ -228,16 +227,13 @@ func TestRemovedFuncWithTTL_Pin(t *testing.T) {
assert.True(t, ok)
ch <- true
},
TimeSource: mockTimeSource,
}).(*lru)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

_, err := cache.PutIfNotExist("A", t)
assert.NoError(t, err)
assert.Equal(t, t, cache.Get("A"))
currentTime = currentTime.Add(time.Millisecond * 100)
mockTimeSource.Advance(time.Millisecond * 100)
assert.Equal(t, t, cache.Get("A"))
// release 3 time since put if not exist also increase the counter
cache.Release("A")
Expand All @@ -248,7 +244,7 @@ func TestRemovedFuncWithTTL_Pin(t *testing.T) {
select {
case b := <-ch:
assert.True(t, b)
case <-time.After(300 * time.Millisecond):
case <-mockTimeSource.After(300 * time.Millisecond):
t.Error("RemovedFunc did not send true on channel ch")
}
}
Expand Down Expand Up @@ -403,24 +399,22 @@ func TestPanicOptionsIsNil(t *testing.T) {

func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) {
// Create the cache with a TTL of 75s
mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0))
cache, ok := New(&Options{
MaxCount: 5,
TTL: time.Second * 75,
ActivelyEvict: true,
TimeSource: mockTimeSource,
}).(*lru)
require.True(t, ok)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

_, err := cache.PutIfNotExist("A", 1)
require.NoError(t, err)
_, err = cache.PutIfNotExist("B", 2)
require.NoError(t, err)

// Nothing is expired after 50s
currentTime = currentTime.Add(time.Second * 50)
mockTimeSource.Advance(time.Second * 50)
assert.Equal(t, 2, cache.Size())

_, err = cache.PutIfNotExist("C", 3)
Expand All @@ -432,10 +426,10 @@ func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) {
assert.Equal(t, 4, cache.Size())

// Advance time to 100s, so A and B should be expired
currentTime = currentTime.Add(time.Second * 50)
mockTimeSource.Advance(time.Second * 50)
assert.Equal(t, 2, cache.Size())

// Advance time to 150s, so C and D should be expired as well
currentTime = currentTime.Add(time.Second * 50)
mockTimeSource.Advance(time.Second * 50)
assert.Equal(t, 0, cache.Size())
}
4 changes: 2 additions & 2 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (s *matchingEngineSuite) SetupTest() {
s.mockExecutionManager = &mocks.ExecutionManager{}
s.controller = gomock.NewController(s.T())
s.mockHistoryClient = history.NewMockClient(s.controller)
s.taskManager = tasklist.NewTestTaskManager(s.T(), s.logger)
s.mockTimeSource = clock.NewMockedTimeSourceAt(time.Now())
s.taskManager = tasklist.NewTestTaskManager(s.T(), s.logger, s.mockTimeSource)
s.mockDomainCache = cache.NewMockDomainCache(s.controller)
s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
Expand All @@ -130,7 +131,6 @@ func (s *matchingEngineSuite) SetupTest() {
dc := dynamicconfig.NewCollection(dcClient, s.logger)
isolationGroupState, _ := defaultisolationgroupstate.NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(s.logger, dc, s.mockDomainCache, s.mockIsolationStore, metrics.NewNoopMetricsClient())
s.partitioner = partition.NewDefaultPartitioner(s.logger, isolationGroupState)
s.mockTimeSource = clock.NewMockedTimeSourceAt(time.Unix(1000000, 0))
s.handlerContext = newHandlerContext(
context.Background(),
matchingTestDomainName,
Expand Down
4 changes: 3 additions & 1 deletion service/matching/poller/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -55,12 +56,13 @@ type History struct {
// HistoryUpdatedFunc is a type for notifying applications when the poller history was updated
type HistoryUpdatedFunc func()

func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc) *History {
func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.TimeSource) *History {
opts := &cache.Options{
InitialCapacity: pollerHistoryInitSize,
TTL: pollerHistoryTTL,
Pin: false,
MaxCount: pollerHistoryInitMaxSize,
TimeSource: timeSource,
}

return &History{
Expand Down
2 changes: 1 addition & 1 deletion service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewManager(
tlMgr.pollerHistory = poller.NewPollerHistory(func() {
taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
float64(len(tlMgr.pollerHistory.GetPollerInfo(time.Time{}))))
})
}, timeSource)

livenessInterval := taskListConfig.IdleTasklistCheckInterval()
tlMgr.liveness = liveness.NewLiveness(clock.NewRealTimeSource(), livenessInterval, func() {
Expand Down
10 changes: 5 additions & 5 deletions service/matching/tasklist/task_list_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func createTestTaskListManager(t *testing.T, logger log.Logger, controller *gomo
}

func createTestTaskListManagerWithConfig(t *testing.T, logger log.Logger, controller *gomock.Controller, cfg *config.Config) *taskListManagerImpl {
tm := NewTestTaskManager(t, logger)
tm := NewTestTaskManager(t, logger, clock.NewRealTimeSource())
mockPartitioner := partition.NewMockPartitioner(controller)
mockPartitioner.EXPECT().GetIsolationGroupByDomainID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", nil).AnyTimes()
mockDomainCache := cache.NewMockDomainCache(controller)
Expand Down Expand Up @@ -552,9 +552,9 @@ func TestTaskListManagerGetTaskBatch(t *testing.T) {
mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry("domainName"), nil).AnyTimes()
mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("domainName", nil).AnyTimes()
logger := testlogger.New(t)
tm := NewTestTaskManager(t, logger)
taskListID := NewTestTaskListID(t, "domainId", "tl", 0)
timeSource := clock.NewRealTimeSource()
tm := NewTestTaskManager(t, logger, timeSource)
taskListID := NewTestTaskListID(t, "domainId", "tl", 0)
cfg := defaultTestConfig()
cfg.RangeSize = rangeSize
tlMgr, err := NewManager(
Expand Down Expand Up @@ -721,9 +721,9 @@ func TestTaskExpiryAndCompletion(t *testing.T) {
mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry("domainName"), nil).AnyTimes()
mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("domainName", nil).AnyTimes()
logger := testlogger.New(t)
tm := NewTestTaskManager(t, logger)
taskListID := NewTestTaskListID(t, "domainId", "tl", 0)
timeSource := clock.NewRealTimeSource()
tm := NewTestTaskManager(t, logger, timeSource)
taskListID := NewTestTaskListID(t, "domainId", "tl", 0)
cfg := defaultTestConfig()
cfg.RangeSize = rangeSize
cfg.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(tc.batchSize)
Expand Down
22 changes: 12 additions & 10 deletions service/matching/tasklist/task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -66,6 +67,7 @@ type (
taskAckManager messaging.AckManager
domainCache cache.DomainCache
clusterMetadata cluster.Metadata
timeSource clock.TimeSource
// The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal
// approach is to use request-scoped contexts and use a unique one for each call to Wait. However
// in order to cancel it on shutdown, we need a new goroutine for each call that would wait on
Expand Down Expand Up @@ -111,6 +113,7 @@ func newTaskReader(tlMgr *taskListManagerImpl, isolationGroups []string) *taskRe
taskBuffers: taskBuffers,
domainCache: tlMgr.domainCache,
clusterMetadata: tlMgr.clusterMetadata,
timeSource: tlMgr.timeSource,
logger: tlMgr.logger,
scope: tlMgr.scope,
handleErr: tlMgr.handleErr,
Expand Down Expand Up @@ -280,20 +283,12 @@ func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, erro
return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed
}

func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo, now time.Time) bool {
return t.Expiry.After(epochStartTime) && time.Now().After(t.Expiry)
func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo) bool {
return t.Expiry.After(epochStartTime) && tr.timeSource.Now().After(t.Expiry)
}

func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {
now := time.Now()
for _, t := range tasks {
if tr.isTaskExpired(t, now) {
tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
// Also increment readLevel for expired tasks otherwise it could result in
// looping over the same tasks if all tasks read in the batch are expired
tr.taskAckManager.SetReadLevel(t.TaskID)
continue
}
if !tr.addSingleTaskToBuffer(t) {
return false // we are shutting down the task list
}
Expand All @@ -302,6 +297,13 @@ func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {
}

func (tr *taskReader) addSingleTaskToBuffer(task *persistence.TaskInfo) bool {
if tr.isTaskExpired(task) {
tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
// Also increment readLevel for expired tasks otherwise it could result in
// looping over the same tasks if all tasks read in the batch are expired
tr.taskAckManager.SetReadLevel(task.TaskID)
return true
}
err := tr.taskAckManager.ReadItem(task.TaskID)
if err != nil {
tr.logger.Fatal("critical bug when adding item to ackManager", tag.Error(err))
Expand Down
Loading

0 comments on commit f0f7efd

Please sign in to comment.