Skip to content

Commit

Permalink
fix(externalclock): fix deadlock when sending time on unstarted Clock
Browse files Browse the repository at this point in the history
Calling `SetTimestamp` on a clock will block until the `Run` goroutine
has read on channel and acked on another channel. If the Clock isn't
started (or has been closed) calling `SetTimestamp` blocks
forever (unless clock is started later).

This commit fixes this issue by setting time and signaling tickers in
the calling goroutine (as opposed to `Run` goroutine).

This change also means that `Run` doesn't do anything, so it is marked
as deprecated.
  • Loading branch information
ericwenn committed Jun 14, 2023
1 parent e6c3b46 commit 9f8389a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 47 deletions.
83 changes: 37 additions & 46 deletions externalclock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,49 @@ import (
)

type Clock struct {
Logger logr.Logger
timestampChan chan time.Time // synchronization SetTimestamp -> Run
timestampUpdated chan struct{} // synchronization Run -> SetTimestamp
timeMutex sync.Mutex
currentTime time.Time
tickerMutex sync.RWMutex
tickers map[string]*ticker
Logger logr.Logger
timeMutex sync.Mutex
currentTime time.Time
tickerMutex sync.RWMutex
tickers map[string]*ticker
}

func New(logger logr.Logger, initialTime time.Time) *Clock {
c := &Clock{
Logger: logger,
timestampChan: make(chan time.Time),
tickers: map[string]*ticker{},
timestampUpdated: make(chan struct{}),
Logger: logger,
tickers: map[string]*ticker{},
}
c.currentTime = initialTime
return c
}

func (g *Clock) SetTimestamp(t time.Time) {
g.timestampChan <- t
<-g.timestampUpdated // wait for time to be set before returning
g.timeMutex.Lock()
g.currentTime = t
g.timeMutex.Unlock()

g.signalTickers(t)
}

func (g *Clock) signalTickers(t time.Time) {
g.tickerMutex.RLock()
for _, tickerInstance := range g.tickers {
if !tickerInstance.IsDurationReached(t) {
continue
}
tickerInstance.SetLastTimestamp(t)
if !tickerInstance.isPeriodic {
g.tickerMutex.RUnlock()
tickerInstance.Stop()
g.tickerMutex.RLock()
}
select {
case tickerInstance.timeChan <- t:
case <-time.After(20 * time.Millisecond):
g.Logger.V(1).Info("ticker dropped message", "caller", tickerInstance.caller)
}
}
g.tickerMutex.RUnlock()
}

func (g *Clock) NumberOfTriggers() int {
Expand All @@ -46,40 +66,11 @@ func (g *Clock) NumberOfTriggers() int {
return len(g.tickers)
}

// Deprecated: Calling Run is not necessary anymore. The method only blocks until
// context is cancelled.
func (g *Clock) Run(ctx context.Context) error {
ctxDone := ctx.Done()
g.Logger.V(1).Info("clock started")
for {
select {
case <-ctxDone:
return nil
case recvTime := <-g.timestampChan:
g.timeMutex.Lock()
g.tickerMutex.RLock()

g.currentTime = recvTime // ok to set early since reading is locked while in here.
g.timestampUpdated <- struct{}{} // notify SetTimestamp.

for _, tickerInstance := range g.tickers {
if !tickerInstance.IsDurationReached(recvTime) {
continue
}
tickerInstance.SetLastTimestamp(recvTime)
if !tickerInstance.isPeriodic {
g.tickerMutex.RUnlock()
tickerInstance.Stop()
g.tickerMutex.RLock()
}
select {
case tickerInstance.timeChan <- recvTime:
case <-time.After(20 * time.Millisecond):
g.Logger.V(1).Info("ticker dropped message", "caller", tickerInstance.caller)
}
}
g.timeMutex.Unlock()
g.tickerMutex.RUnlock()
}
}
<-ctx.Done()
return nil
}

func (g *Clock) getTime() time.Time {
Expand Down
18 changes: 18 additions & 0 deletions externalclock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,24 @@ func TestExternalClock_NewTicker_Tick_Periodically(t *testing.T) {
}
}

func TestExternalClock_SendBeforeRun(t *testing.T) {
// test verifies that sending time on an unstarted clock does not deadlock
c := externalclock.New(testr.New(t), time.Unix(0, 0))
c.SetTimestamp(time.Unix(1, 0))
}

func TestExternalClock_SendAfterRun(t *testing.T) {
// test verifies that sending time on a cancelled clock does not deadlock
c := externalclock.New(testr.New(t), time.Unix(0, 0))
// start clock with a deadline
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
t.Cleanup(cancel)
assert.NilError(t, c.Run(ctx))

// sending time
c.SetTimestamp(time.Unix(1, 0))
}

func TestExternalClock_TestLooper(t *testing.T) {
externalClock := newTestFixture(t)
const target = 1000
Expand Down
5 changes: 4 additions & 1 deletion externalclock/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func (g *Clock) NewTicker(d time.Duration) clock.Ticker {
}

func (g *Clock) newTickerInternal(caller string, endFunc func(), d time.Duration, periodic bool) clock.Ticker {
c := make(chan time.Time)
// Give the channel a 1-element time buffer.
// If the client falls behind while reading, we drop ticks
// on the floor until the client catches up.
c := make(chan time.Time, 1)
uuid := makeUUID()
intervalTicker := &ticker{
caller: caller,
Expand Down
1 change: 1 addition & 0 deletions externalclock/ticker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestExternalClock_TickerReset(t *testing.T) {
for i := count / 2; i < count; i++ {
externalTime = externalTime.Add(delta)
externalClock.SetTimestamp(externalTime)
time.Sleep(time.Millisecond)
}
cancel <- struct{}{}
assert.DeepEqual(t, receivedTime, []time.Time{time.UnixMilli(3), time.UnixMilli(8)})
Expand Down

0 comments on commit 9f8389a

Please sign in to comment.