Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

periodic: Add TriggerRun method. #2389

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 50 additions & 27 deletions go/lib/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ type Task interface {

// Runner runs a task periodically.
type Runner struct {
task Task
ticker Ticker
timeout time.Duration
stop chan struct{}
stopped chan struct{}
ctx context.Context
cancelF context.CancelFunc
task Task
ticker Ticker
timeout time.Duration
stop chan struct{}
loopFinished chan struct{}
ctx context.Context
cancelF context.CancelFunc
trigger chan struct{}
}

// StartPeriodicTask creates and starts a new Runner to run the given task peridiocally.
Expand All @@ -66,13 +67,14 @@ type Runner struct {
func StartPeriodicTask(task Task, ticker Ticker, timeout time.Duration) *Runner {
ctx, cancelF := context.WithCancel(context.Background())
runner := &Runner{
task: task,
ticker: ticker,
timeout: timeout,
stop: make(chan struct{}),
stopped: make(chan struct{}),
ctx: ctx,
cancelF: cancelF,
task: task,
ticker: ticker,
timeout: timeout,
stop: make(chan struct{}),
loopFinished: make(chan struct{}),
ctx: ctx,
cancelF: cancelF,
trigger: make(chan struct{}),
}
go func() {
defer log.LogPanicAndExit()
Expand All @@ -86,35 +88,56 @@ func StartPeriodicTask(task Task, ticker Ticker, timeout time.Duration) *Runner
func (r *Runner) Stop() {
r.ticker.Stop()
close(r.stop)
<-r.stopped
<-r.loopFinished
}

// Kill is like stop but it also cancels the context of the current running method.
func (r *Runner) Kill() {
r.ticker.Stop()
close(r.stop)
r.cancelF()
<-r.stopped
<-r.loopFinished
}

// TriggerRun triggers the periodic task to run now.
// This does not impact the normal periodicity of this task.
// That means if the periodicity is 5m and you call TriggerNow() after 2 minutes,
// the next execution will be in 3 minutes.
//
// The method blocks until either the triggered run was started or the runner was stopped,
// in which case the triggered run will not be executed.
func (r *Runner) TriggerRun() {
select {
// Either we were stopped or we can put something in the trigger channel.
case <-r.stop:
case r.trigger <- struct{}{}:
}
}

func (r *Runner) runLoop() {
defer close(r.stopped)
defer close(r.loopFinished)
defer r.cancelF()
for {
select {
case <-r.stop:
return
case <-r.ticker.Chan():
select {
// Make sure that stop case is evaluated first,
// so that when we kill and both channels are ready we always go into stop first.
case <-r.stop:
return
default:
ctx, cancelF := context.WithTimeout(r.ctx, r.timeout)
r.task.Run(ctx)
cancelF()
}
r.onTick()
case <-r.trigger:
r.onTick()
}
}
}

func (r *Runner) onTick() {
select {
// Make sure that stop case is evaluated first,
// so that when we kill and both channels are ready we always go into stop first.
case <-r.stop:
return
default:
ctx, cancelF := context.WithTimeout(r.ctx, r.timeout)
r.task.Run(ctx)
cancelF()
}
}
24 changes: 24 additions & 0 deletions go/lib/periodic/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,27 @@ func TestTaskDoesntRunAfterKill(t *testing.T) {
r.Kill()
})
}

func TestTriggerNow(t *testing.T) {
Convey("TestTriggerNow", t, func() {
done := make(chan struct{})
cnt := 0
fn := taskFunc(func(ctx context.Context) {
cnt++
done <- struct{}{}
})
tickC := make(chan time.Time)
ticker := &testTicker{C: tickC}
r := StartPeriodicTask(fn, ticker, time.Microsecond)
r.TriggerRun()
xtest.AssertReadReturnsBefore(t, done, 50*time.Millisecond)
tickC <- time.Now()
xtest.AssertReadReturnsBefore(t, done, 50*time.Millisecond)
r.TriggerRun()
xtest.AssertReadReturnsBefore(t, done, 50*time.Millisecond)
r.Stop()
// check that a trigger after stop doesn't do anything.
r.TriggerRun()
SoMsg("Must have executed 3 times", cnt, ShouldEqual, 3)
})
}