Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: Ensure schedules are cancelled when scheduler disabled. #77306

Merged
merged 1 commit into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand Down Expand Up @@ -375,6 +376,54 @@ func (s *jobScheduler) schedulerEnabledOnThisNode(ctx context.Context) bool {
return enabled
}

type syncCancelFunc struct {
syncutil.Mutex
context.CancelFunc
}

// newCancelWhenDisabled arranges for scheduler enabled setting callback to cancel
// currently executing context.
func newCancelWhenDisabled(sv *settings.Values) *syncCancelFunc {
sf := &syncCancelFunc{}
schedulerEnabledSetting.SetOnChange(sv, func(ctx context.Context) {
if !schedulerEnabledSetting.Get(sv) {
sf.Lock()
if sf.CancelFunc != nil {
sf.CancelFunc()
}
sf.Unlock()
}
})
return sf
}

// withCancelOnDisabled executes provided function with the context which will be cancelled
// if scheduler is disabled.
func (sf *syncCancelFunc) withCancelOnDisabled(
ctx context.Context, sv *settings.Values, f func(ctx context.Context) error,
) error {
ctx, cancel := func() (context.Context, context.CancelFunc) {
sf.Lock()
defer sf.Unlock()

ctx, cancel := context.WithCancel(ctx)
sf.CancelFunc = cancel

if !schedulerEnabledSetting.Get(sv) {
cancel()
}

return ctx, func() {
sf.Lock()
defer sf.Unlock()
cancel()
sf.CancelFunc = nil
}
}()
defer cancel()
return f(ctx)
}

func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "job-scheduler", func(ctx context.Context) {
initialDelay := getInitialScanDelay(s.TestingKnobs)
Expand All @@ -384,6 +433,8 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
log.Errorf(ctx, "error registering executor metrics: %+v", err)
}

whenDisabled := newCancelWhenDisabled(&s.Settings.SV)

for timer := time.NewTimer(initialDelay); ; timer.Reset(
getWaitPeriod(ctx, &s.Settings.SV, s.schedulerEnabledOnThisNode, jitter, s.TestingKnobs)) {
select {
Expand All @@ -395,10 +446,11 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
}

maxSchedules := schedulerMaxJobsPerIterationSetting.Get(&s.Settings.SV)
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return s.executeSchedules(ctx, maxSchedules, txn)
})
if err != nil {
if err := whenDisabled.withCancelOnDisabled(ctx, &s.Settings.SV, func(ctx context.Context) error {
return s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return s.executeSchedules(ctx, maxSchedules, txn)
})
}); err != nil {
log.Errorf(ctx, "error executing schedules: %+v", err)
}

Expand Down
81 changes: 81 additions & 0 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,3 +826,84 @@ func TestSchedulerCanBeRestrictedToSingleNode(t *testing.T) {
})
}
}

type blockUntilCancelledExecutor struct {
started, done chan struct{}
}

var _ ScheduledJobExecutor = (*blockUntilCancelledExecutor)(nil)

func (e *blockUntilCancelledExecutor) ExecuteJob(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
txn *kv.Txn,
) error {
defer close(e.done)
close(e.started)
<-ctx.Done()
return ctx.Err()
}

func (e *blockUntilCancelledExecutor) NotifyJobTermination(
ctx context.Context,
jobID jobspb.JobID,
jobStatus Status,
details jobspb.Details,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
return nil
}

func (e *blockUntilCancelledExecutor) Metrics() metric.Struct {
return nil
}

func (e *blockUntilCancelledExecutor) GetCreateScheduleStatement(
ctx context.Context,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
sj *ScheduledJob,
ex sqlutil.InternalExecutor,
) (string, error) {
return "", errors.AssertionFailedf("unexpected GetCreateScheduleStatement call")
}

func TestDisablingSchedulerCancelsSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const executorName = "block-until-cancelled-executor"
ex := &blockUntilCancelledExecutor{
started: make(chan struct{}),
done: make(chan struct{}),
}
defer registerScopedScheduledJobExecutor(executorName, ex)()

knobs := base.TestingKnobs{
JobsTestingKnobs: fastDaemonKnobs(overridePaceSetting(10 * time.Millisecond)),
}
ts, _, _ := serverutils.StartServer(t, base.TestServerArgs{Knobs: knobs})
defer ts.Stopper().Stop(context.Background())

// Create schedule which blocks until its context cancelled due to disabled scheduler.
// We only need to create one schedule. This is because
// scheduler executes its batch of schedules sequentially, and so, creating more
// than one doesn't change anything since we block.
schedule := NewScheduledJob(scheduledjobs.ProdJobSchedulerEnv)
schedule.SetScheduleLabel("test schedule")
schedule.SetOwner(security.TestUserName())
schedule.SetNextRun(timeutil.Now())
schedule.SetExecutionDetails(executorName, jobspb.ExecutionArguments{})
require.NoError(t, schedule.Create(
context.Background(), ts.InternalExecutor().(sqlutil.InternalExecutor), nil))

<-ex.started
// Disable scheduler and verify all running schedules were cancelled.
schedulerEnabledSetting.Override(context.Background(), &ts.ClusterSettings().SV, false)
<-ex.done
}