-
Notifications
You must be signed in to change notification settings - Fork 112
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(pkg): add
ticker
package (#2617)
* Add `pkg/ticker` * Sample ticker usage in evm observer * Change naming * Address PR comments * Address PR comments
- Loading branch information
Showing
3 changed files
with
347 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
// Package ticker provides a dynamic ticker that can change its interval at runtime. | ||
// The ticker can be stopped gracefully and handles context-based termination. | ||
// | ||
// This package is useful for scenarios where periodic execution of a function is needed | ||
// and the interval might need to change dynamically based on runtime conditions. | ||
// | ||
// It also invokes a first tick immediately after the ticker starts. It's safe to use it concurrently. | ||
// | ||
// It also terminates gracefully when the context is done (return ctx.Err()) or when the stop signal is received. | ||
// | ||
// Example usage: | ||
// | ||
// ticker := New(time.Second, func(ctx context.Context, t *Ticker) error { | ||
// resp, err := client.GetPrice(ctx) | ||
// if err != nil { | ||
// logger.Err(err).Error().Msg("failed to get price") | ||
// return nil | ||
// } | ||
// | ||
// observer.SetPrice(resp.GasPrice) | ||
// t.SetInterval(resp.GasPriceInterval) | ||
// | ||
// return nil | ||
// }) | ||
// | ||
// err := ticker.Run(ctx) | ||
package ticker | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"cosmossdk.io/errors" | ||
) | ||
|
||
// Ticker represents a ticker that will run a function periodically. | ||
// It also invokes BEFORE ticker starts. | ||
type Ticker struct { | ||
interval time.Duration | ||
ticker *time.Ticker | ||
task Task | ||
signalChan chan struct{} | ||
|
||
// runnerMu is a mutex to prevent double run | ||
runnerMu sync.Mutex | ||
|
||
// stateMu is a mutex to prevent concurrent SetInterval calls | ||
stateMu sync.Mutex | ||
|
||
stopped bool | ||
} | ||
|
||
// Task is a function that will be called by the Ticker | ||
type Task func(ctx context.Context, t *Ticker) error | ||
|
||
// New creates a new Ticker. | ||
func New(interval time.Duration, runner Task) *Ticker { | ||
return &Ticker{interval: interval, task: runner} | ||
} | ||
|
||
// Run creates and runs a new Ticker. | ||
func Run(ctx context.Context, interval time.Duration, task Task) error { | ||
return New(interval, task).Run(ctx) | ||
} | ||
|
||
// SecondsFromUint64 converts uint64 to time.Duration in seconds. | ||
func SecondsFromUint64(d uint64) time.Duration { | ||
return time.Duration(d) * time.Second | ||
} | ||
|
||
// Run runs the ticker by blocking current goroutine. It also invokes BEFORE ticker starts. | ||
// Stops when (if any): | ||
// - context is done (returns ctx.Err()) | ||
// - task returns an error or panics | ||
// - shutdown signal is received | ||
func (t *Ticker) Run(ctx context.Context) (err error) { | ||
defer func() { | ||
if r := recover(); r != nil { | ||
err = fmt.Errorf("panic during ticker run: %v", r) | ||
} | ||
}() | ||
|
||
// prevent concurrent runs | ||
t.runnerMu.Lock() | ||
defer t.runnerMu.Unlock() | ||
|
||
// setup | ||
t.ticker = time.NewTicker(t.interval) | ||
t.signalChan = make(chan struct{}) | ||
t.stopped = false | ||
|
||
// initial run | ||
if err := t.task(ctx, t); err != nil { | ||
return errors.Wrap(err, "ticker task failed") | ||
} | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-t.ticker.C: | ||
if err := t.task(ctx, t); err != nil { | ||
return errors.Wrap(err, "ticker task failed") | ||
} | ||
case <-t.signalChan: | ||
return nil | ||
} | ||
} | ||
} | ||
|
||
// SetInterval updates the interval of the ticker. | ||
func (t *Ticker) SetInterval(interval time.Duration) { | ||
t.stateMu.Lock() | ||
defer t.stateMu.Unlock() | ||
|
||
// noop | ||
if t.interval == interval || t.ticker == nil { | ||
return | ||
} | ||
|
||
t.interval = interval | ||
t.ticker.Reset(interval) | ||
} | ||
|
||
// Stop stops the ticker. Safe to call concurrently or multiple times. | ||
func (t *Ticker) Stop() { | ||
t.stateMu.Lock() | ||
defer t.stateMu.Unlock() | ||
|
||
// noop | ||
if t.stopped || t.signalChan == nil { | ||
return | ||
} | ||
|
||
close(t.signalChan) | ||
t.stopped = true | ||
t.ticker.Stop() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
package ticker | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestTicker(t *testing.T) { | ||
const ( | ||
dur = time.Millisecond * 100 | ||
durSmall = dur / 10 | ||
) | ||
|
||
t.Run("Basic case with context", func(t *testing.T) { | ||
// ARRANGE | ||
// Given a counter | ||
var counter int | ||
|
||
// And a context | ||
ctx, cancel := context.WithTimeout(context.Background(), dur+durSmall) | ||
defer cancel() | ||
|
||
// And a ticker | ||
ticker := New(dur, func(_ context.Context, t *Ticker) error { | ||
counter++ | ||
|
||
return nil | ||
}) | ||
|
||
// ACT | ||
err := ticker.Run(ctx) | ||
|
||
// ASSERT | ||
assert.ErrorIs(t, err, context.DeadlineExceeded) | ||
|
||
// two runs: start run + 1 tick | ||
assert.Equal(t, 2, counter) | ||
}) | ||
|
||
t.Run("Halts when error occurred", func(t *testing.T) { | ||
// ARRANGE | ||
// Given a counter | ||
var counter int | ||
|
||
ctx := context.Background() | ||
|
||
// And a ticker func that returns an error after 10 runs | ||
ticker := New(durSmall, func(_ context.Context, t *Ticker) error { | ||
counter++ | ||
if counter > 9 { | ||
return fmt.Errorf("oops") | ||
} | ||
|
||
return nil | ||
}) | ||
|
||
// ACT | ||
err := ticker.Run(ctx) | ||
|
||
// ASSERT | ||
assert.ErrorContains(t, err, "oops") | ||
assert.Equal(t, 10, counter) | ||
}) | ||
|
||
t.Run("Dynamic interval update", func(t *testing.T) { | ||
// ARRANGE | ||
// Given a counter | ||
var counter int | ||
|
||
// Given duration | ||
duration := dur * 10 | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), duration) | ||
defer cancel() | ||
|
||
// And a ticker what decreases the interval by 2 each time | ||
ticker := New(durSmall, func(_ context.Context, ticker *Ticker) error { | ||
t.Logf("Counter: %d, Duration: %s", counter, duration.String()) | ||
|
||
counter++ | ||
duration /= 2 | ||
|
||
ticker.SetInterval(duration) | ||
|
||
return nil | ||
}) | ||
|
||
// ACT | ||
err := ticker.Run(ctx) | ||
|
||
// ASSERT | ||
assert.ErrorIs(t, err, context.DeadlineExceeded) | ||
|
||
// It should have run at 2 times with ctxTimeout = tickerDuration (start + 1 tick), | ||
// But it should have run more than that because of the interval decrease | ||
assert.GreaterOrEqual(t, counter, 2) | ||
}) | ||
|
||
t.Run("Stop ticker", func(t *testing.T) { | ||
// ARRANGE | ||
// Given a counter | ||
var counter int | ||
|
||
// And a context | ||
ctx := context.Background() | ||
|
||
// And a ticker | ||
ticker := New(durSmall, func(_ context.Context, _ *Ticker) error { | ||
counter++ | ||
return nil | ||
}) | ||
|
||
// And a function with a stop signal | ||
go func() { | ||
time.Sleep(dur) | ||
ticker.Stop() | ||
}() | ||
|
||
// ACT | ||
err := ticker.Run(ctx) | ||
|
||
// ASSERT | ||
assert.NoError(t, err) | ||
assert.Greater(t, counter, 8) | ||
|
||
t.Run("Stop ticker for the second time", func(t *testing.T) { | ||
ticker.Stop() | ||
}) | ||
}) | ||
|
||
t.Run("Panic", func(t *testing.T) { | ||
// ARRANGE | ||
// Given a context | ||
ctx := context.Background() | ||
|
||
// And a ticker | ||
ticker := New(durSmall, func(_ context.Context, _ *Ticker) error { | ||
panic("oops") | ||
}) | ||
|
||
// ACT | ||
err := ticker.Run(ctx) | ||
|
||
// ASSERT | ||
assert.ErrorContains(t, err, "panic during ticker run: oops") | ||
}) | ||
|
||
t.Run("Run as a single call", func(t *testing.T) { | ||
// ARRANGE | ||
// Given a counter | ||
var counter int | ||
|
||
// Given a context | ||
ctx, cancel := context.WithTimeout(context.Background(), dur+durSmall) | ||
defer cancel() | ||
|
||
tick := func(ctx context.Context, t *Ticker) error { | ||
counter++ | ||
return nil | ||
} | ||
|
||
// ACT | ||
err := Run(ctx, dur, tick) | ||
|
||
// ASSERT | ||
assert.ErrorIs(t, err, context.DeadlineExceeded) | ||
assert.Equal(t, 2, counter) | ||
}) | ||
} |
Oops, something went wrong.