Skip to content

Commit

Permalink
[matching] Simplity poller extraction in task list manager (cadence-w…
Browse files Browse the repository at this point in the history
…orkflow#6333)

* [matching] Simplity poller extraction in task list manager
  • Loading branch information
3vilhamster authored Oct 8, 2024
1 parent e5304f6 commit 2757192
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 68 deletions.
70 changes: 55 additions & 15 deletions service/matching/poller/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,27 @@ type (
IsolationGroup string
}

History struct {
History interface {
UpdatePollerInfo(id Identity, info Info)
HasPollerAfter(earliestAccessTime time.Time) bool
GetPollerInfo(earliestAccessTime time.Time) []*types.PollerInfo
GetPollerIsolationGroups(earliestAccessTime time.Time) map[string]int
}

history struct {
// poller ID -> pollerInfo
// pollers map[pollerID]pollerInfo
history cache.Cache
historyCache cache.Cache

// OnHistoryUpdatedFunc is a function called when the poller history was updated
// OnHistoryUpdatedFunc is a function called when the poller historyCache was updated
onHistoryUpdatedFunc HistoryUpdatedFunc
}

// HistoryUpdatedFunc is a type for notifying applications when the poller history was updated
// HistoryUpdatedFunc is a type for notifying applications when the poller historyCache was updated
HistoryUpdatedFunc func()
)

func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.TimeSource) *History {
func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.TimeSource) History {
opts := &cache.Options{
InitialCapacity: pollerHistoryInitSize,
TTL: pollerHistoryTTL,
Expand All @@ -65,30 +72,60 @@ func NewPollerHistory(historyUpdatedFunc HistoryUpdatedFunc, timeSource clock.Ti
TimeSource: timeSource,
}

return &History{
history: cache.New(opts),
return &history{
historyCache: cache.New(opts),
onHistoryUpdatedFunc: historyUpdatedFunc,
}
}

func (pollers *History) UpdatePollerInfo(id Identity, info Info) {
pollers.history.Put(id, &info)
func (pollers *history) UpdatePollerInfo(id Identity, info Info) {
pollers.historyCache.Put(id, &info)
if pollers.onHistoryUpdatedFunc != nil {
pollers.onHistoryUpdatedFunc()
}
}

func (pollers *History) GetPollerInfo(earliestAccessTime time.Time) []*types.PollerInfo {
func (pollers *history) HasPollerAfter(earliestAccessTime time.Time) bool {
if pollers.historyCache.Size() == 0 {
return false
}

noTimeFilter := earliestAccessTime.IsZero()

ite := pollers.historyCache.Iterator()
defer ite.Close()

for ite.HasNext() {
entry := ite.Next()
lastAccessTime := entry.CreateTime()
if noTimeFilter || earliestAccessTime.Before(lastAccessTime) {
return true
}
}

return false
}

func (pollers *history) GetPollerInfo(earliestAccessTime time.Time) []*types.PollerInfo {
var result []*types.PollerInfo
ite := pollers.history.Iterator()
// optimistic size get, it can change before Iterator call.
size := pollers.historyCache.Size()

ite := pollers.historyCache.Iterator()
defer ite.Close()

noTimeFilter := earliestAccessTime.IsZero()
if noTimeFilter {
result = make([]*types.PollerInfo, 0, size)
}

for ite.HasNext() {
entry := ite.Next()
key := entry.Key().(Identity)
value := entry.Value().(*Info)
// TODO add IP, T1396795
lastAccessTime := entry.CreateTime()
if earliestAccessTime.Before(lastAccessTime) {
if noTimeFilter || earliestAccessTime.Before(lastAccessTime) {
result = append(result, &types.PollerInfo{
Identity: string(key),
LastAccessTime: common.Int64Ptr(lastAccessTime.UnixNano()),
Expand All @@ -99,15 +136,18 @@ func (pollers *History) GetPollerInfo(earliestAccessTime time.Time) []*types.Pol
return result
}

func (pollers *History) GetPollerIsolationGroups(earliestAccessTime time.Time) map[string]int {
func (pollers *history) GetPollerIsolationGroups(earliestAccessTime time.Time) map[string]int {
groupSet := make(map[string]int)
ite := pollers.history.Iterator()
ite := pollers.historyCache.Iterator()
defer ite.Close()

noTimeFilter := earliestAccessTime.IsZero()

for ite.HasNext() {
entry := ite.Next()
value := entry.Value().(*Info)
lastAccessTime := entry.CreateTime()
if earliestAccessTime.Before(lastAccessTime) {
if noTimeFilter || earliestAccessTime.Before(lastAccessTime) {
if value.IsolationGroup != "" {
groupSet[value.IsolationGroup]++
}
Expand Down
176 changes: 141 additions & 35 deletions service/matching/poller/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func TestNewPollerHistory(t *testing.T) {
p := NewPollerHistory(nil, nil)
assert.NotNil(t, p)
assert.NotNil(t, p.history)
assert.NotNil(t, p.(*history).historyCache)
}

func TestUpdatePollerInfo(t *testing.T) {
Expand All @@ -48,46 +48,152 @@ func TestUpdatePollerInfo(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockCache.EXPECT().Put(Identity("test"), &Info{IsolationGroup: "dca1"}).Return(nil)
p := &History{
p := &history{
onHistoryUpdatedFunc: updateFn,
history: mockCache,
historyCache: mockCache,
}
p.UpdatePollerInfo(Identity("test"), Info{IsolationGroup: "dca1"})
assert.True(t, updated)
}

func TestHistory_HasPollerAfter(t *testing.T) {
t.Run("empty historyCache", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockCache.EXPECT().Size().Return(0)

p := &history{
historyCache: mockCache,
}
assert.False(t, p.HasPollerAfter(time.Now()))
})
t.Run("no poller after", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockCache.EXPECT().Size().Return(2)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)

now := time.Now()

mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().CreateTime().Return(now.Add(-time.Second)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().CreateTime().Return(now.Add(-2*time.Second)),
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)

p := &history{
historyCache: mockCache,
}
assert.False(t, p.HasPollerAfter(now))
})
t.Run("has poller after", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockCache.EXPECT().Size().Return(2)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)

now := time.Now()

mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().CreateTime().Return(now.Add(-time.Second)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().CreateTime().Return(now.Add(time.Second)),
mockIter.EXPECT().Close(),
)

p := &history{
historyCache: mockCache,
}
assert.True(t, p.HasPollerAfter(now))
})
}

func TestGetPollerInfo(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)
t.Run("with_time_filter", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)

mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test0")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca1", RatePerSecond: 1.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(1000)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test1")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca2", RatePerSecond: 2.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(0)),
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)
p := &History{
history: mockCache,
}
info := p.GetPollerInfo(time.UnixMilli(500))
assert.Equal(t, []*types.PollerInfo{
{
Identity: "test0",
LastAccessTime: common.Ptr(time.UnixMilli(1000).UnixNano()),
RatePerSecond: 1.0,
},
}, info)
mockCache.EXPECT().Size().Return(2)

mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test0")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca1", RatePerSecond: 1.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(1000)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test1")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca2", RatePerSecond: 2.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(0)),
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)
p := &history{
historyCache: mockCache,
}
info := p.GetPollerInfo(time.UnixMilli(500))
assert.Equal(t, []*types.PollerInfo{
{
Identity: "test0",
LastAccessTime: common.Ptr(time.UnixMilli(1000).UnixNano()),
RatePerSecond: 1.0,
},
}, info)
})
t.Run("no_time_filter", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockCache := cache.NewMockCache(mockCtrl)
mockIter := cache.NewMockIterator(mockCtrl)
mockEntry := cache.NewMockEntry(mockCtrl)
mockCache.EXPECT().Size().Return(2)
mockCache.EXPECT().Iterator().Return(mockIter)
gomock.InOrder(
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test0")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca1", RatePerSecond: 1.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(1000)),
mockIter.EXPECT().HasNext().Return(true),
mockIter.EXPECT().Next().Return(mockEntry),
mockEntry.EXPECT().Key().Return(Identity("test1")),
mockEntry.EXPECT().Value().Return(&Info{IsolationGroup: "dca2", RatePerSecond: 2.0}),
mockEntry.EXPECT().CreateTime().Return(time.UnixMilli(0)),
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)
p := &history{
historyCache: mockCache,
}
info := p.GetPollerInfo(time.Time{})
assert.Equal(t, []*types.PollerInfo{
{
Identity: "test0",
LastAccessTime: common.Ptr(time.UnixMilli(1000).UnixNano()),
RatePerSecond: 1.0,
},
{
Identity: "test1",
LastAccessTime: common.Ptr(time.UnixMilli(0).UnixNano()),
RatePerSecond: 2.0,
},
}, info)
})
}

func TestGetPollerIsolationGroups(t *testing.T) {
Expand All @@ -109,8 +215,8 @@ func TestGetPollerIsolationGroups(t *testing.T) {
mockIter.EXPECT().HasNext().Return(false),
mockIter.EXPECT().Close(),
)
p := &History{
history: mockCache,
p := &history{
historyCache: mockCache,
}
groups := p.GetPollerIsolationGroups(time.UnixMilli(500))
assert.Equal(t, map[string]int{"dca1": 1}, groups)
Expand Down
42 changes: 24 additions & 18 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type (
timeSource clock.TimeSource
domainName string
// pollerHistory stores poller which poll from this tasklist in last few minutes
pollerHistory *poller.History
pollerHistory poller.History
// outstandingPollsMap is needed to keep track of all outstanding pollers for a
// particular tasklist. PollerID generated by frontend is used as the key and
// CancelFunc is the value. This is used to cancel the context to unblock any
Expand Down Expand Up @@ -400,14 +400,8 @@ func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond
if pollerID != "" {
// Found pollerID on context, add it to the map to allow it to be canceled in
// response to CancelPoller call
c.outstandingPollsLock.Lock()
c.outstandingPollsMap[pollerID] = outstandingPollerInfo{isolationGroup: isolationGroup, cancel: cancel}
c.outstandingPollsLock.Unlock()
defer func() {
c.outstandingPollsLock.Lock()
delete(c.outstandingPollsMap, pollerID)
c.outstandingPollsLock.Unlock()
}()
c.addOutstandingPoller(pollerID, isolationGroup, cancel)
defer c.removeOutstandingPoller(pollerID)()
}

identity := IdentityFromContext(ctx)
Expand Down Expand Up @@ -452,15 +446,7 @@ func (c *taskListManagerImpl) GetAllPollerInfo() []*types.PollerInfo {

// HasPollerAfter checks if there is any poller after a timestamp
func (c *taskListManagerImpl) HasPollerAfter(accessTime time.Time) bool {
inflightPollerCount := 0
c.outstandingPollsLock.Lock()
inflightPollerCount = len(c.outstandingPollsMap)
c.outstandingPollsLock.Unlock()
if inflightPollerCount > 0 {
return true
}
recentPollers := c.pollerHistory.GetPollerInfo(accessTime)
return len(recentPollers) > 0
return c.hasOutstandingPolls() || c.pollerHistory.HasPollerAfter(accessTime)
}

func (c *taskListManagerImpl) CancelPoller(pollerID string) {
Expand Down Expand Up @@ -528,6 +514,26 @@ func (c *taskListManagerImpl) TaskListID() *Identifier {
return c.taskListID
}

func (c *taskListManagerImpl) addOutstandingPoller(pollerID string, isolationGroup string, cancel context.CancelFunc) {
c.outstandingPollsLock.Lock()
defer c.outstandingPollsLock.Unlock()
c.outstandingPollsMap[pollerID] = outstandingPollerInfo{isolationGroup: isolationGroup, cancel: cancel}
}

func (c *taskListManagerImpl) removeOutstandingPoller(pollerID string) func() {
return func() {
c.outstandingPollsLock.Lock()
defer c.outstandingPollsLock.Unlock()
delete(c.outstandingPollsMap, pollerID)
}
}

func (c *taskListManagerImpl) hasOutstandingPolls() bool {
c.outstandingPollsLock.Lock()
defer c.outstandingPollsLock.Unlock()
return len(c.outstandingPollsMap) > 0
}

// Retry operation on transient error. On rangeID update by another process calls c.Stop().
func (c *taskListManagerImpl) executeWithRetry(
operation func() (interface{}, error),
Expand Down
Loading

0 comments on commit 2757192

Please sign in to comment.