Skip to content

Commit

Permalink
HashRing: proper debounce instead of sleep-if-seen-in4s
Browse files Browse the repository at this point in the history
The current mechanism of HashRing handling updates from PeerProvider
(ringpop/uns) works like ratelimiter: it allows the first update, but then it
drop another updates withtin the next 4s. This leads to a situation
which often takes place in production:
1. Some host dissapeared from membership
   HashRing refresh()-es the state and notifies listeners
2. In half a second another host appears
   HashRing throttles this update
3. After periodic forced refresh (10 seconds) HashRing finally keeps up
   - it performs refresh() and notifies listeners

This leads to 10s delay for matching instances to realise the ownership
has changed.

I believe the idea behind initial 4s throttling was "we don't want to
have updates 100/s because of massive restarts since we think
grabbing ownership of tasklists and especially history is bad".
Therefore, the change brings the fair debounce mechanism (do a single,
postponed call in a second instead of hundreds of calls which could take
place). We also preserve the initial behaviour - to perform an immediate
call if there were no calls for a long time (twice as long as debounce
interval).

The debouncer itself is implemented in a library with two ways of using
it: via calling handler or via wrapper that implements a signal-channel pattern we use in HashRing.
  • Loading branch information
dkrotx committed Oct 8, 2024
1 parent fd46d4c commit 243fd2d
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 42 deletions.
112 changes: 112 additions & 0 deletions common/debounce/callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package debounce

import (
"context"
"sync"
"time"

"github.com/uber/cadence/common/clock"
)

type DebouncedCallback struct {
sync.Mutex

lastHandlerCall time.Time
callback func()
interval time.Duration
timeSource clock.TimeSource
updateCh chan struct{}

loopContext context.Context
cancelLoop context.CancelFunc
waitGroup sync.WaitGroup
}

// NewDebouncedCallback creates a new DebouncedCallback which will
// accumulate calls to .Handler function and will only call `callback` on given interval if there were any calls to
// .Handler in between. It is almost like rate limiter, but it never ignores calls to handler - only postpone.
func NewDebouncedCallback(timeSource clock.TimeSource, interval time.Duration, callback func()) *DebouncedCallback {
ctx, cancelFn := context.WithCancel(context.Background())

return &DebouncedCallback{
timeSource: timeSource,
interval: interval,
callback: callback,
updateCh: make(chan struct{}, 1),
loopContext: ctx,
cancelLoop: cancelFn,
}
}

func (d *DebouncedCallback) Start() {
d.waitGroup.Add(1)
go func() {
defer d.waitGroup.Done()
d.backgroundLoop()
}()
}

func (d *DebouncedCallback) Stop() {
d.cancelLoop()
d.waitGroup.Wait()
}

func (d *DebouncedCallback) Handler() {
select {
case d.updateCh <- struct{}{}:
default:
}

// special case for the handler call which wasn't issued for some time
// in this case we call the callback immediately
d.Lock()
defer d.Unlock()
if d.timeSource.Since(d.lastHandlerCall) > 2*d.interval {
d.callbackIfRequired()
}
d.lastHandlerCall = d.timeSource.Now()
}

func (d *DebouncedCallback) backgroundLoop() {
ticker := d.timeSource.NewTicker(d.interval)
defer ticker.Stop()

for {
select {
case <-d.loopContext.Done():
return
case <-ticker.Chan():
d.callbackIfRequired()
}
}
}

func (d *DebouncedCallback) callbackIfRequired() {
select {
case <-d.updateCh:
d.callback()
default:
}
}
125 changes: 125 additions & 0 deletions common/debounce/callback_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package debounce

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/uber/cadence/common/clock"
)

const (
testDebounceInterval = time.Second
// this one is required since we're often testing for absence of extra calls, so we have to wait
testSleepAmount = time.Second / 2
)

type callbackTestData struct {
mockedTimeSource clock.MockedTimeSource
debouncedCallback *DebouncedCallback
calls int
}

func waitCondition(fn func() bool, duration time.Duration) bool {
started := time.Now()

for time.Since(started) < duration {
if fn() {
return true
}
time.Sleep(time.Millisecond)
}
return false
}

func newCallbackTestData(t *testing.T) *callbackTestData {
var td callbackTestData

t.Cleanup(
func() {
td.debouncedCallback.Stop()
goleak.VerifyNone(t)
},
)

td.mockedTimeSource = clock.NewMockedTimeSourceAt(time.Now())
callback := func() {
td.calls++
}

td.debouncedCallback = NewDebouncedCallback(td.mockedTimeSource, testDebounceInterval, callback)
td.debouncedCallback.Start()
td.mockedTimeSource.BlockUntil(1) // we should wait until ticker is created

return &td
}

func TestDebouncedCallbackWorks(t *testing.T) {
td := newCallbackTestData(t)

td.debouncedCallback.Handler()
require.True(
t,
waitCondition(func() bool { return td.calls > 0 }, testTimeout),
"first callback is expected to be issued immediately after handler",
)
assert.Equal(t, 1, td.calls, "should be just once call since handler() called once")

// issue more calls to handler(); they all should be postponed to testDebounceInterval
for i := 0; i < 10; i++ {
td.debouncedCallback.Handler()
}

td.mockedTimeSource.Advance(testDebounceInterval)
time.Sleep(testSleepAmount)
assert.Equal(t, 2, td.calls)

// now call handler again, but advance time only by little - no callbacks are expected
for i := 0; i < 10; i++ {
td.debouncedCallback.Handler()
}

td.mockedTimeSource.Advance(testDebounceInterval / 2)
time.Sleep(testSleepAmount)
assert.Equal(t, 2, td.calls, "should not have new callbacks")
}

func TestDebouncedCallbackDoesntCallHandlerIfThereWereNoUpdates(t *testing.T) {
td := newCallbackTestData(t)

td.mockedTimeSource.Advance(2 * testDebounceInterval)
time.Sleep(testSleepAmount)
assert.Equal(t, 0, td.calls)
}

func TestDebouncedCallbackDoubleStopIsOK(t *testing.T) {
td := newCallbackTestData(t)

td.debouncedCallback.Stop()
assert.NotPanics(t, func() { td.debouncedCallback.Stop() }, "double stop should be OK")
}
55 changes: 55 additions & 0 deletions common/debounce/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package debounce

import (
"time"

"github.com/uber/cadence/common/clock"
)

// DebouncedChannel is a wrapper around DebouncedCallback
// providing channel instead of callback
type DebouncedChannel struct {
*DebouncedCallback
signalCh chan struct{}
}

// NewDebouncedChannel creates a new NewDebouncedChannel which will
// accumulate calls to .Handler function and will write data to Chan() on given interval if there were any calls to
// .Handler in between. It is almost like rate limiter, but it never ignores calls to handler - only postpone.
func NewDebouncedChannel(timeSource clock.TimeSource, interval time.Duration) *DebouncedChannel {
res := &DebouncedChannel{}

res.signalCh = make(chan struct{}, 1)
callback := func() {
select {
case res.signalCh <- struct{}{}:
default:
}
}
res.DebouncedCallback = NewDebouncedCallback(timeSource, interval, callback)
return res
}

func (d *DebouncedChannel) Chan() <-chan struct{} { return d.signalCh }
Loading

0 comments on commit 243fd2d

Please sign in to comment.