Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DrainBacklogNoPollersIsolationGroup tests #6080

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/metrics"
)

Expand Down Expand Up @@ -93,6 +94,10 @@ type Options struct {
// - when the cache is full
// - when the item is accessed
ActivelyEvict bool

// TimeSource is used to get the current time
// It is optional and defaults to clock.NewRealTimeSource()
TimeSource clock.TimeSource
}

// SimpleOptions provides options that can be used to configure SimpleCache
Expand Down
22 changes: 14 additions & 8 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
"errors"
"sync"
"time"

"github.com/uber/cadence/common/clock"
)

var (
Expand Down Expand Up @@ -52,7 +54,7 @@
isSizeBased bool
activelyEvict bool
// We use this instead of time.Now() in order to make testing easier
now func() time.Time
timeSource clock.TimeSource
}

iteratorImpl struct {
Expand Down Expand Up @@ -117,7 +119,7 @@
c.mut.Lock()
iterator := &iteratorImpl{
lru: c,
createTime: c.now(),
createTime: c.timeSource.Now(),
nextItem: c.byAccess.Front(),
}
iterator.prepareNext()
Expand All @@ -143,14 +145,18 @@
"MaxSize and GetCacheItemSizeFunc (size based) options must be provided for the LRU cache")
}

timeSource := opts.TimeSource
if timeSource == nil {
timeSource = clock.NewRealTimeSource()
}
cache := &lru{
byAccess: list.New(),
byKey: make(map[interface{}]*list.Element, opts.InitialCapacity),
ttl: opts.TTL,
pin: opts.Pin,
rmFunc: opts.RemovedFunc,
activelyEvict: opts.ActivelyEvict,
now: time.Now,
timeSource: timeSource,
}

cache.isSizeBased = opts.GetCacheItemSizeFunc != nil && opts.MaxSize > 0
Expand Down Expand Up @@ -183,7 +189,7 @@

entry := element.Value.(*entryImpl)

if c.isEntryExpired(entry, c.now()) {
if c.isEntryExpired(entry, c.timeSource.Now()) {
// Entry has expired
c.deleteInternal(element)
return nil
Expand Down Expand Up @@ -262,7 +268,7 @@
return // do nothing if activelyEvict is not set
}

now := c.now()
now := c.timeSource.Now()
for elt := c.byAccess.Back(); elt != nil; elt = c.byAccess.Back() {
if !c.isEntryExpired(elt.Value.(*entryImpl), now) {
// List is sorted by item age, so we can stop as soon as we found first non expired item.
Expand All @@ -284,15 +290,15 @@
elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
if c.isEntryExpired(entry, c.now()) {
if c.isEntryExpired(entry, c.timeSource.Now()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = c.now()
entry.createTime = c.timeSource.Now()

Check warning on line 301 in common/cache/lru.go

View check run for this annotation

Codecov / codecov/patch

common/cache/lru.go#L301

Added line #L301 was not covered by tests
}
}

Expand All @@ -314,7 +320,7 @@
}

if c.ttl != 0 {
entry.createTime = c.now()
entry.createTime = c.timeSource.Now()
}

c.byKey[key] = c.byAccess.PushFront(entry)
Expand Down
46 changes: 20 additions & 26 deletions common/cache/lru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common/clock"
)

type keyType struct {
Expand Down Expand Up @@ -91,19 +93,17 @@ func TestGenerics(t *testing.T) {
}

func TestLRUWithTTL(t *testing.T) {
mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0))
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 100,
MaxCount: 5,
TTL: time.Millisecond * 100,
TimeSource: mockTimeSource,
}).(*lru)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

cache.Put("A", "foo")
assert.Equal(t, "foo", cache.Get("A"))

currentTime = currentTime.Add(time.Millisecond * 300)
mockTimeSource.Advance(time.Millisecond * 300)

assert.Nil(t, cache.Get("A"))
assert.Equal(t, 0, cache.Size())
Expand Down Expand Up @@ -188,6 +188,7 @@ func TestRemoveFunc(t *testing.T) {

func TestRemovedFuncWithTTL(t *testing.T) {
ch := make(chan bool)
mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0))
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 50,
Expand All @@ -196,29 +197,27 @@ func TestRemovedFuncWithTTL(t *testing.T) {
assert.True(t, ok)
ch <- true
},
TimeSource: mockTimeSource,
}).(*lru)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

cache.Put("A", t)
assert.Equal(t, t, cache.Get("A"))

currentTime = currentTime.Add(time.Millisecond * 100)
mockTimeSource.Advance(time.Millisecond * 100)

assert.Nil(t, cache.Get("A"))

select {
case b := <-ch:
assert.True(t, b)
case <-time.After(100 * time.Millisecond):
case <-mockTimeSource.After(100 * time.Millisecond):
t.Error("RemovedFunc did not send true on channel ch")
}
}

func TestRemovedFuncWithTTL_Pin(t *testing.T) {
ch := make(chan bool)
mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0))
cache := New(&Options{
MaxCount: 5,
TTL: time.Millisecond * 50,
Expand All @@ -228,16 +227,13 @@ func TestRemovedFuncWithTTL_Pin(t *testing.T) {
assert.True(t, ok)
ch <- true
},
TimeSource: mockTimeSource,
}).(*lru)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

_, err := cache.PutIfNotExist("A", t)
assert.NoError(t, err)
assert.Equal(t, t, cache.Get("A"))
currentTime = currentTime.Add(time.Millisecond * 100)
mockTimeSource.Advance(time.Millisecond * 100)
assert.Equal(t, t, cache.Get("A"))
// release 3 time since put if not exist also increase the counter
cache.Release("A")
Expand All @@ -248,7 +244,7 @@ func TestRemovedFuncWithTTL_Pin(t *testing.T) {
select {
case b := <-ch:
assert.True(t, b)
case <-time.After(300 * time.Millisecond):
case <-mockTimeSource.After(300 * time.Millisecond):
t.Error("RemovedFunc did not send true on channel ch")
}
}
Expand Down Expand Up @@ -403,24 +399,22 @@ func TestPanicOptionsIsNil(t *testing.T) {

func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) {
// Create the cache with a TTL of 75s
mockTimeSource := clock.NewMockedTimeSourceAt(time.UnixMilli(0))
cache, ok := New(&Options{
MaxCount: 5,
TTL: time.Second * 75,
ActivelyEvict: true,
TimeSource: mockTimeSource,
}).(*lru)
require.True(t, ok)

// We will capture this in the caches now function, and advance time as needed
currentTime := time.UnixMilli(0)
cache.now = func() time.Time { return currentTime }

_, err := cache.PutIfNotExist("A", 1)
require.NoError(t, err)
_, err = cache.PutIfNotExist("B", 2)
require.NoError(t, err)

// Nothing is expired after 50s
currentTime = currentTime.Add(time.Second * 50)
mockTimeSource.Advance(time.Second * 50)
assert.Equal(t, 2, cache.Size())

_, err = cache.PutIfNotExist("C", 3)
Expand All @@ -432,10 +426,10 @@ func TestEvictItemsPastTimeToLive_ActivelyEvict(t *testing.T) {
assert.Equal(t, 4, cache.Size())

// Advance time to 100s, so A and B should be expired
currentTime = currentTime.Add(time.Second * 50)
mockTimeSource.Advance(time.Second * 50)
assert.Equal(t, 2, cache.Size())

// Advance time to 150s, so C and D should be expired as well
currentTime = currentTime.Add(time.Second * 50)
mockTimeSource.Advance(time.Second * 50)
assert.Equal(t, 0, cache.Size())
}
4 changes: 2 additions & 2 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (s *matchingEngineSuite) SetupTest() {
s.mockExecutionManager = &mocks.ExecutionManager{}
s.controller = gomock.NewController(s.T())
s.mockHistoryClient = history.NewMockClient(s.controller)
s.taskManager = tasklist.NewTestTaskManager(s.T(), s.logger)
s.mockTimeSource = clock.NewMockedTimeSourceAt(time.Now())
s.taskManager = tasklist.NewTestTaskManager(s.T(), s.logger, s.mockTimeSource)
s.mockDomainCache = cache.NewMockDomainCache(s.controller)
s.mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes()
Expand All @@ -130,7 +131,6 @@ func (s *matchingEngineSuite) SetupTest() {
dc := dynamicconfig.NewCollection(dcClient, s.logger)
isolationGroupState, _ := defaultisolationgroupstate.NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(s.logger, dc, s.mockDomainCache, s.mockIsolationStore, metrics.NewNoopMetricsClient())
s.partitioner = partition.NewDefaultPartitioner(s.logger, isolationGroupState)
s.mockTimeSource = clock.NewMockedTimeSourceAt(time.Unix(1000000, 0))
s.handlerContext = newHandlerContext(
context.Background(),
matchingTestDomainName,
Expand Down
4 changes: 3 additions & 1 deletion service/matching/poller/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -55,12 +56,13 @@ type History struct {
// HistoryUpdatedFunc is a type for notifying applications when the poller history was updated
type HistoryUpdatedFunc func()

func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc) *History {
func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.TimeSource) *History {
opts := &cache.Options{
InitialCapacity: pollerHistoryInitSize,
TTL: pollerHistoryTTL,
Pin: false,
MaxCount: pollerHistoryInitMaxSize,
TimeSource: timeSource,
}

return &History{
Expand Down
2 changes: 1 addition & 1 deletion service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewManager(
tlMgr.pollerHistory = poller.NewPollerHistory(func() {
taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
float64(len(tlMgr.pollerHistory.GetPollerInfo(time.Time{}))))
})
}, timeSource)

livenessInterval := taskListConfig.IdleTasklistCheckInterval()
tlMgr.liveness = liveness.NewLiveness(clock.NewRealTimeSource(), livenessInterval, func() {
Expand Down
10 changes: 5 additions & 5 deletions service/matching/tasklist/task_list_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func createTestTaskListManager(t *testing.T, logger log.Logger, controller *gomo
}

func createTestTaskListManagerWithConfig(t *testing.T, logger log.Logger, controller *gomock.Controller, cfg *config.Config) *taskListManagerImpl {
tm := NewTestTaskManager(t, logger)
tm := NewTestTaskManager(t, logger, clock.NewRealTimeSource())
mockPartitioner := partition.NewMockPartitioner(controller)
mockPartitioner.EXPECT().GetIsolationGroupByDomainID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("", nil).AnyTimes()
mockDomainCache := cache.NewMockDomainCache(controller)
Expand Down Expand Up @@ -552,9 +552,9 @@ func TestTaskListManagerGetTaskBatch(t *testing.T) {
mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry("domainName"), nil).AnyTimes()
mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("domainName", nil).AnyTimes()
logger := testlogger.New(t)
tm := NewTestTaskManager(t, logger)
taskListID := NewTestTaskListID(t, "domainId", "tl", 0)
timeSource := clock.NewRealTimeSource()
tm := NewTestTaskManager(t, logger, timeSource)
taskListID := NewTestTaskListID(t, "domainId", "tl", 0)
cfg := defaultTestConfig()
cfg.RangeSize = rangeSize
tlMgr, err := NewManager(
Expand Down Expand Up @@ -721,9 +721,9 @@ func TestTaskExpiryAndCompletion(t *testing.T) {
mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry("domainName"), nil).AnyTimes()
mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return("domainName", nil).AnyTimes()
logger := testlogger.New(t)
tm := NewTestTaskManager(t, logger)
taskListID := NewTestTaskListID(t, "domainId", "tl", 0)
timeSource := clock.NewRealTimeSource()
tm := NewTestTaskManager(t, logger, timeSource)
taskListID := NewTestTaskListID(t, "domainId", "tl", 0)
cfg := defaultTestConfig()
cfg.RangeSize = rangeSize
cfg.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(tc.batchSize)
Expand Down
22 changes: 12 additions & 10 deletions service/matching/tasklist/task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -66,6 +67,7 @@ type (
taskAckManager messaging.AckManager
domainCache cache.DomainCache
clusterMetadata cluster.Metadata
timeSource clock.TimeSource
// The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal
// approach is to use request-scoped contexts and use a unique one for each call to Wait. However
// in order to cancel it on shutdown, we need a new goroutine for each call that would wait on
Expand Down Expand Up @@ -111,6 +113,7 @@ func newTaskReader(tlMgr *taskListManagerImpl, isolationGroups []string) *taskRe
taskBuffers: taskBuffers,
domainCache: tlMgr.domainCache,
clusterMetadata: tlMgr.clusterMetadata,
timeSource: tlMgr.timeSource,
logger: tlMgr.logger,
scope: tlMgr.scope,
handleErr: tlMgr.handleErr,
Expand Down Expand Up @@ -280,20 +283,12 @@ func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, erro
return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed
}

func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo, now time.Time) bool {
return t.Expiry.After(epochStartTime) && time.Now().After(t.Expiry)
func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo) bool {
return t.Expiry.After(epochStartTime) && tr.timeSource.Now().After(t.Expiry)
}

func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {
now := time.Now()
for _, t := range tasks {
if tr.isTaskExpired(t, now) {
tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
// Also increment readLevel for expired tasks otherwise it could result in
// looping over the same tasks if all tasks read in the batch are expired
tr.taskAckManager.SetReadLevel(t.TaskID)
continue
}
if !tr.addSingleTaskToBuffer(t) {
return false // we are shutting down the task list
}
Expand All @@ -302,6 +297,13 @@ func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {
}

func (tr *taskReader) addSingleTaskToBuffer(task *persistence.TaskInfo) bool {
if tr.isTaskExpired(task) {
tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter)
// Also increment readLevel for expired tasks otherwise it could result in
// looping over the same tasks if all tasks read in the batch are expired
tr.taskAckManager.SetReadLevel(task.TaskID)
return true
}
err := tr.taskAckManager.ReadItem(task.TaskID)
if err != nil {
tr.logger.Fatal("critical bug when adding item to ackManager", tag.Error(err))
Expand Down
Loading
Loading