Skip to content

Commit

Permalink
Address issues
Browse files Browse the repository at this point in the history
More tricky test of DaemonStatus
Lock/Unlock are not longer exposed from CallbackDebounce
Moved Debounce to membership package so we can be more opinionated in
decisions
  • Loading branch information
dkrotx committed Oct 10, 2024
1 parent ab36cff commit dd0eede
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 198 deletions.
14 changes: 14 additions & 0 deletions common/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,45 @@ func TestDaemonStatusRaceCondition(t *testing.T) {
var dm DaemonStatusManager
var successes atomic.Int32
var wg sync.WaitGroup
var started sync.WaitGroup

started.Add(1) // make sure we wait until all goroutines are started

// try to issue multiple Start-s at once
for i := 0; i < 10; i++ {
wg.Add(1)
started.Add(1)
go func() {
defer wg.Done()
started.Done() // mark as "waiting"
started.Wait() // wait for other goroutines to catch up
if dm.TransitionToStart() {
successes.Add(1)
}
}()
}

started.Done() // allows resuming goroutines once they're all started
wg.Wait()
assert.Equal(t, 1, int(successes.Load()), "only one Start call should succeed")

// now do the same for stops
successes.Store(0)
started.Add(1) // make sure we wait until all goroutines are started
for i := 0; i < 10; i++ {
wg.Add(1)
started.Add(1)
go func() {
defer wg.Done()
started.Done() // mark as "waiting"
started.Wait() // wait for other goroutines to catch up
if dm.TransitionToStop() {
successes.Add(1)
}
}()
}

started.Done() // allows resuming goroutines once they're all started
wg.Wait()
assert.Equal(t, 1, int(successes.Load()), "only one Stop call should succeed")
}
126 changes: 0 additions & 126 deletions common/debounce/callback_test.go

This file was deleted.

55 changes: 0 additions & 55 deletions common/debounce/channel.go

This file was deleted.

49 changes: 37 additions & 12 deletions common/debounce/callback.go → common/membership/debounce.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package debounce
package membership

import (
"context"
Expand All @@ -32,14 +32,13 @@ import (
)

type DebouncedCallback struct {
sync.Mutex

status common.DaemonStatusManager
lastHandlerCall time.Time
callback func()
interval time.Duration
timeSource clock.TimeSource
updateCh chan struct{}
status common.DaemonStatusManager
lastHandlerCallMutex sync.Mutex
lastHandlerCall time.Time
callback func()
interval time.Duration
timeSource clock.TimeSource
updateCh chan struct{}

loopContext context.Context
cancelLoop context.CancelFunc
Expand All @@ -48,7 +47,7 @@ type DebouncedCallback struct {

// NewDebouncedCallback creates a new DebouncedCallback which will
// accumulate calls to .Handler function and will only call `callback` on given interval if there were any calls to
// .Handler in between. It is almost like rate limiter, but it never ignores calls to handler - only postpone.
// .Handler in between. It is almost like rate limiter, but it never ignores calls to handler - only postpone calling the callback once.
func NewDebouncedCallback(timeSource clock.TimeSource, interval time.Duration, callback func()) *DebouncedCallback {
ctx, cancelFn := context.WithCancel(context.Background())

Expand Down Expand Up @@ -91,8 +90,8 @@ func (d *DebouncedCallback) Handler() {

// special case for the handler call which wasn't issued for some time
// in this case we call the callback immediately
d.Lock()
defer d.Unlock()
d.lastHandlerCallMutex.Lock()
defer d.lastHandlerCallMutex.Unlock()
if d.timeSource.Since(d.lastHandlerCall) > 2*d.interval {
d.callbackIfRequired()
}
Expand Down Expand Up @@ -120,3 +119,29 @@ func (d *DebouncedCallback) callbackIfRequired() {
default:
}
}

// DebouncedChannel is a wrapper around DebouncedCallback
// providing channel instead of callback
type DebouncedChannel struct {
*DebouncedCallback
signalCh chan struct{}
}

// NewDebouncedChannel creates a new NewDebouncedChannel which will
// accumulate calls to .Handler function and will write data to Chan() on given interval if there were any calls to
// .Handler in between. It is almost like rate limiter, but it never ignores calls to handler - only postpone.
func NewDebouncedChannel(timeSource clock.TimeSource, interval time.Duration) *DebouncedChannel {
res := &DebouncedChannel{}

res.signalCh = make(chan struct{}, 1)
callback := func() {
select {
case res.signalCh <- struct{}{}:
default:
}
}
res.DebouncedCallback = NewDebouncedCallback(timeSource, interval, callback)
return res
}

func (d *DebouncedChannel) Chan() <-chan struct{} { return d.signalCh }
Loading

0 comments on commit dd0eede

Please sign in to comment.