Skip to content

Commit

Permalink
Seperate liveness of task list into a dedicated entity (#5105)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Feb 21, 2023
1 parent 35e2212 commit f9d04ea
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 65 deletions.
7 changes: 4 additions & 3 deletions common/clock/time_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package clock

import (
"sync/atomic"
"time"

// clockwork is not currently used but it is useful to have the option to use this in testing code
Expand All @@ -41,7 +42,7 @@ type (

// EventTimeSource serves fake controlled time
EventTimeSource struct {
now time.Time
now int64
}
)

Expand All @@ -64,11 +65,11 @@ func NewEventTimeSource() *EventTimeSource {

// Now return the fake current time
func (ts *EventTimeSource) Now() time.Time {
return ts.now
return time.Unix(0, atomic.LoadInt64(&ts.now)).UTC()
}

// Update update the fake current time
func (ts *EventTimeSource) Update(now time.Time) *EventTimeSource {
ts.now = now
atomic.StoreInt64(&ts.now, now.UnixNano())
return ts
}
125 changes: 125 additions & 0 deletions service/matching/liveness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Modifications Copyright (c) 2020 Uber Technologies Inc.

// Copyright (c) 2020 Temporal Technologies, Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package matching

import (
"sync"
"sync/atomic"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
)

type (
liveness struct {
status int32
timeSource clock.TimeSource
ttl time.Duration
// internal shutdown channel
shutdownChan chan struct{}

// broadcast shutdown functions
broadcastShutdownFn func()

sync.Mutex
lastEventTime time.Time
}
)

var _ common.Daemon = (*liveness)(nil)

func newLiveness(
timeSource clock.TimeSource,
ttl time.Duration,
broadcastShutdownFn func(),
) *liveness {
return &liveness{
status: common.DaemonStatusInitialized,
timeSource: timeSource,
ttl: ttl,
shutdownChan: make(chan struct{}),

broadcastShutdownFn: broadcastShutdownFn,

lastEventTime: timeSource.Now().UTC(),
}
}

func (l *liveness) Start() {
if !atomic.CompareAndSwapInt32(
&l.status,
common.DaemonStatusInitialized,
common.DaemonStatusStarted,
) {
return
}

go l.eventLoop()
}

func (l *liveness) Stop() {
if !atomic.CompareAndSwapInt32(
&l.status,
common.DaemonStatusStarted,
common.DaemonStatusStopped,
) {
return
}

close(l.shutdownChan)
l.broadcastShutdownFn()
}

func (l *liveness) eventLoop() {
ttlTimer := time.NewTicker(l.ttl)
defer ttlTimer.Stop()

for {
select {
case <-ttlTimer.C:
if !l.isAlive() {
l.Stop()
}

case <-l.shutdownChan:
return
}
}
}

func (l *liveness) isAlive() bool {
l.Lock()
defer l.Unlock()
return l.lastEventTime.Add(l.ttl).After(l.timeSource.Now())
}

func (l *liveness) markAlive(
now time.Time,
) {
l.Lock()
defer l.Unlock()
if l.lastEventTime.Before(now) {
l.lastEventTime = now.UTC()
}
}
124 changes: 124 additions & 0 deletions service/matching/liveness_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Modifications Copyright (c) 2020 Uber Technologies Inc.

// Copyright (c) 2020 Temporal Technologies, Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package matching

import (
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/common/clock"
)

type (
livenessSuite struct {
suite.Suite
*require.Assertions

timeSource *clock.EventTimeSource
ttl time.Duration
shutdownFlag int32
}
)

func TestLivenessSuite(t *testing.T) {
s := new(livenessSuite)
suite.Run(t, s)
}

func (s *livenessSuite) SetupSuite() {
}

func (s *livenessSuite) TearDownSuite() {
}

func (s *livenessSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.ttl = 2 * time.Second
s.timeSource = clock.NewEventTimeSource()
s.timeSource.Update(time.Now())
s.shutdownFlag = 0
}

func (s *livenessSuite) TearDownTest() {

}

func (s *livenessSuite) TestIsAlive_No() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
s.timeSource.Update(time.Now().Add(s.ttl * 2))
alive := liveness.isAlive()
s.False(alive)
}

func (s *livenessSuite) TestIsAlive_Yes() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
s.timeSource.Update(time.Now().Add(s.ttl / 2))
alive := liveness.isAlive()
s.True(alive)
}

func (s *livenessSuite) TestMarkAlive_Noop() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
lastEventTime := liveness.lastEventTime
newEventTime := s.timeSource.Now().Add(-1)
liveness.markAlive(newEventTime)
s.True(lastEventTime.Equal(liveness.lastEventTime))
}

func (s *livenessSuite) TestMarkAlive_Updated() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
newEventTime := s.timeSource.Now().Add(1)
liveness.markAlive(newEventTime)
s.True(newEventTime.Equal(liveness.lastEventTime))
}

func (s *livenessSuite) TestEventLoop_Noop() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
liveness.Start()

now := time.Now().Add(s.ttl * 4)
s.timeSource.Update(now)
liveness.markAlive(now)

timer := time.NewTimer(s.ttl * 2)
select {
case <-liveness.shutdownChan:
s.Fail("should not shutdown")
case <-timer.C:
s.Equal(int32(0), atomic.LoadInt32(&s.shutdownFlag))
}
}

func (s *livenessSuite) TestEventLoop_Shutdown() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
liveness.Start()

s.timeSource.Update(time.Now().Add(s.ttl * 4))
<-liveness.shutdownChan
s.Equal(int32(1), atomic.LoadInt32(&s.shutdownFlag))
}
2 changes: 1 addition & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ func (s *matchingEngineSuite) TestTaskExpiryAndCompletion() {
s.matchingEngine.config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(2)
// set idle timer check to a really small value to assert that we don't accidentally drop tasks while blocking
// on enqueuing a task to task buffer
s.matchingEngine.config.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(time.Microsecond)
s.matchingEngine.config.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

testCases := []struct {
batchSize int
Expand Down
24 changes: 18 additions & 6 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Copyright (c) 2017-2020 Uber Technologies Inc.

// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package matching

Expand All @@ -32,6 +34,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
Expand Down Expand Up @@ -96,6 +99,7 @@ type (
engine *matchingEngineImpl
taskWriter *taskWriter
taskReader *taskReader // reads tasks from db and async matches it with poller
liveness *liveness
taskGC *taskGC
taskAckManager messaging.AckManager // tracks ackLevel for delivered messages
matcher *TaskMatcher // for matching a task producer with a poller
Expand Down Expand Up @@ -173,6 +177,7 @@ func newTaskListManager(
taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
float64(len(tlMgr.pollerHistory.getPollerInfo(time.Time{}))))
})
tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), taskListConfig.IdleTasklistCheckInterval(), tlMgr.Stop)
tlMgr.taskWriter = newTaskWriter(tlMgr)
tlMgr.taskReader = newTaskReader(tlMgr)
var fwdr *Forwarder
Expand All @@ -189,6 +194,7 @@ func newTaskListManager(
func (c *taskListManagerImpl) Start() error {
defer c.startWG.Done()

c.liveness.Start()
if err := c.taskWriter.Start(); err != nil {
c.Stop()
return err
Expand All @@ -205,6 +211,7 @@ func (c *taskListManagerImpl) Stop() {
}
c.engine.removeTaskListManager(c)
close(c.shutdownCh)
c.liveness.Stop()
c.taskWriter.Stop()
c.taskReader.Stop()
c.logger.Info("Task list manager state changed", tag.LifeCycleStopped)
Expand All @@ -230,6 +237,10 @@ func (c *taskListManagerImpl) handleErr(err error) error {
// be written to database and later asynchronously matched with a poller
func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams) (bool, error) {
c.startWG.Wait()
if params.forwardedFrom == "" {
// request sent by history service
c.liveness.markAlive(time.Now())
}
var syncMatch bool
_, err := c.executeWithRetry(func() (interface{}, error) {
if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -312,6 +323,7 @@ func (c *taskListManagerImpl) GetTask(
ctx context.Context,
maxDispatchPerSecond *float64,
) (*InternalTask, error) {
c.liveness.markAlive(time.Now())
task, err := c.getTask(ctx, maxDispatchPerSecond)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit f9d04ea

Please sign in to comment.