Skip to content

Commit

Permalink
jobs: fix race which permitted concurrent execution
Browse files Browse the repository at this point in the history
The race was that we'd unregister a job from the `adoptedJobs` map while
processing the pause or cancel requests. This is problematic because the
job also unregisters itself in such cases. However, because the job was
already unregistered, the entry which the now canceled job removes may in
fact correspond to a new execution. We remove this hazard by centralizing
the location where the entry is actually removed. Instead, we have the
cancelation/pause processing only cancel the context of the relevant
execution.

Fixes #68593

Release note: None

Release justification: bug fixes and low-risk updates to new functionality
  • Loading branch information
ajwerner committed Aug 27, 2021
1 parent 2e949fa commit 288e504
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 86 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2984,7 +2984,7 @@ func TestBackupRestoreDuringUserDefinedTypeChange(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
if numTypeChangesStarted < len(tc.queries) {
numTypeChangesStarted++
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ go_test(
"//pkg/server/status",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkv",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3166,8 +3166,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
unblock := waitForBlocked()
waitForRecord()
sqlDB.Exec(t, `CANCEL JOB $1`, foo.(cdctest.EnterpriseTestFeed).JobID())
waitForNoRecord()
unblock()
waitForNoRecord()
}
}, feedTestNoTenants, withArgsFn(func(args *base.TestServerArgs) {
storeKnobs := &kvserver.StoreTestingKnobs{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -326,6 +327,7 @@ func startTestFullServer(
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
SpanConfig: &spanconfig.TestingKnobs{ManagerDisableJobCreation: true},
}
if options.knobsFn != nil {
options.knobsFn(&knobs)
Expand Down
13 changes: 7 additions & 6 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package multiregionccl_test

import (
"context"
"sort"
"strings"
"testing"
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestConcurrentAddDropRegions(t *testing.T) {

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
if firstOp {
firstOp = false
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(ctx context.Context) error {
mu.Lock()
defer mu.Unlock()
close(typeChangeStarted)
Expand Down Expand Up @@ -383,7 +384,7 @@ ALTER TABLE db.public.global CONFIGURE ZONE USING

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
close(regionOpStarted)
<-placementOpFinished
return nil
Expand Down Expand Up @@ -444,7 +445,7 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) {
dropRegionFinished := make(chan struct{})
knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
defer mu.Unlock()
if dropRegionStarted != nil {
Expand Down Expand Up @@ -845,7 +846,7 @@ func TestRegionAddDropWithConcurrentBackupOps(t *testing.T) {

backupKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(ctx context.Context) error {
mu.Lock()
defer mu.Unlock()
if waitInTypeSchemaChangerDuringBackup {
Expand Down Expand Up @@ -902,7 +903,7 @@ INSERT INTO db.rbr VALUES (1,1),(2,2),(3,3);

restoreKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
defer mu.Unlock()
if !regionAlterCmd.shouldSucceed {
Expand Down
40 changes: 25 additions & 15 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,33 +325,43 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven
}
resumeCtx, cancel := r.makeCtx()

aj := &adoptedJob{sid: s.ID(), cancel: cancel}
r.addAdoptedJob(jobID, aj)
if alreadyAdopted := r.addAdoptedJob(jobID, s.ID(), cancel); alreadyAdopted {
return nil
}

r.metrics.ResumedJobs.Inc(1)
if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) {
// Wait for the job to finish. No need to print the error because if there
// was one it's been set in the job status already.
_ = r.runJob(resumeCtx, resumer, job, status, job.taskName())
}); err != nil {
r.removeAdoptedJob(jobID)
r.unregister(jobID)
return err
}
return nil
}

func (r *Registry) removeAdoptedJob(jobID jobspb.JobID) {
// addAdoptedJob adds the job to the set of currently running jobs. This set is
// used for introspection, and, importantly, to serve as a lock to prevent
// concurrent executions. Removal occurs in runJob or in the case that we were
// unable to launch the goroutine to call runJob. If the returned boolean is
// false, it means that the job is already registered as running and should not
// be run again.
func (r *Registry) addAdoptedJob(
jobID jobspb.JobID, sessionID sqlliveness.SessionID, cancel context.CancelFunc,
) (alreadyAdopted bool) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.mu.adoptedJobs, jobID)
}

func (r *Registry) addAdoptedJob(jobID jobspb.JobID, aj *adoptedJob) {
// TODO(sajjad): We should check whether adoptedJobs already has jobID or not. If
// the ID exists, we should not add it again and the caller should not start
// another resumer.
r.mu.Lock()
defer r.mu.Unlock()
r.mu.adoptedJobs[jobID] = aj
if _, alreadyAdopted = r.mu.adoptedJobs[jobID]; alreadyAdopted {
return true
}

r.mu.adoptedJobs[jobID] = &adoptedJob{
sid: sessionID,
cancel: cancel,
}
return false
}

func (r *Registry) runJob(
Expand Down Expand Up @@ -454,11 +464,11 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
statusString := *row[1].(*tree.DString)
switch Status(statusString) {
case StatusPaused:
r.unregister(id)
r.cancelRegisteredJobContext(id)
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
r.unregister(id)
r.cancelRegisteredJobContext(id)
md.Payload.Error = errJobCanceled.Error()
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
Expand Down
15 changes: 13 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func (j *Job) CurrentStatus(ctx context.Context, txn *kv.Txn) (Status, error) {
// and nothing will be send on errCh. Clients must not start jobs more than
// once.
func (sj *StartableJob) Start(ctx context.Context) (err error) {
if starts := atomic.AddInt64(&sj.starts, 1); starts != 1 {
if alreadyStarted := sj.recordStart(); alreadyStarted {
return errors.AssertionFailedf(
"StartableJob %d cannot be started more than once", sj.ID())
}
Expand Down Expand Up @@ -972,6 +972,17 @@ func (sj *StartableJob) CleanupOnRollback(ctx context.Context) error {
// Cancel will mark the job as canceled and release its resources in the
// Registry.
func (sj *StartableJob) Cancel(ctx context.Context) error {
defer sj.registry.unregister(sj.ID())
alreadyStarted := sj.recordStart() // prevent future start attempts
defer func() {
if alreadyStarted {
sj.registry.cancelRegisteredJobContext(sj.ID())
} else {
sj.registry.unregister(sj.ID())
}
}()
return sj.registry.CancelRequested(ctx, nil, sj.ID())
}

func (sj *StartableJob) recordStart() (alreadyStarted bool) {
return atomic.AddInt64(&sj.starts, 1) != 1
}
71 changes: 37 additions & 34 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -44,7 +45,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -222,6 +222,9 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
}
}
args.Knobs.JobsTestingKnobs = knobs
args.Knobs.SpanConfig = &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
}

if rts.traceRealSpan {
baseDir, dirCleanupFn := testutils.TempDir(t)
Expand Down Expand Up @@ -283,6 +286,11 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
},
FailOrCancel: func(ctx context.Context) error {
t.Log("Starting FailOrCancel")
if rts.traceRealSpan {
// Add a dummy recording so we actually see something in the trace.
span := tracing.SpanFromContext(ctx)
span.RecordStructured(&types.StringValue{Value: "boom"})
}
rts.mu.Lock()
rts.mu.a.OnFailOrCancelStart = true
rts.mu.Unlock()
Expand Down Expand Up @@ -362,7 +370,6 @@ func (rts *registryTestSuite) check(t *testing.T, expectedStatus jobs.Status) {

func TestRegistryLifecycle(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 68315, "flaky test")
defer log.Scope(t).Close(t)

t.Run("normal success", func(t *testing.T) {
Expand Down Expand Up @@ -1095,42 +1102,40 @@ func TestRegistryLifecycle(t *testing.T) {
})

t.Run("dump traces on cancel", func(t *testing.T) {
rts := registryTestSuite{traceRealSpan: true}
completeCh := make(chan struct{})
rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() {
completeCh <- struct{}{}
}}
rts.setUp(t)
defer rts.tearDown()
rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

runJobAndFail := func(expectedNumFiles int) {
j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())
rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())

// Cancellation will cause the running instance of the job to get context
// canceled causing it to potentially dump traces.
require.Error(t, rts.job.AwaitCompletion(rts.ctx))
checkTraceFiles(t, rts.registry, expectedNumFiles)
<-completeCh
checkTraceFiles(t, rts.registry, 1)

rts.mu.e.OnFailOrCancelStart = true
rts.check(t, jobs.StatusReverting)
rts.mu.e.OnFailOrCancelStart = true
rts.check(t, jobs.StatusReverting)

rts.failOrCancelCheckCh <- struct{}{}
close(rts.failOrCancelCheckCh)
rts.failOrCancelCh <- nil
close(rts.failOrCancelCh)
rts.mu.e.OnFailOrCancelExit = true
rts.failOrCancelCheckCh <- struct{}{}
close(rts.failOrCancelCheckCh)
rts.failOrCancelCh <- nil
close(rts.failOrCancelCh)
rts.mu.e.OnFailOrCancelExit = true

rts.check(t, jobs.StatusCanceled)
}
rts.check(t, jobs.StatusCanceled)

rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`)
runJobAndFail(1)
<-completeCh
})
}

Expand Down Expand Up @@ -1754,7 +1759,7 @@ func TestShowJobs(t *testing.T) {
defer log.Scope(t).Close(t)

params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}
params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
s, rawSQLDB, _ := serverutils.StartServer(t, params)
sqlDB := sqlutils.MakeSQLRunner(rawSQLDB)
ctx := context.Background()
Expand Down Expand Up @@ -2583,11 +2588,9 @@ func TestStartableJob(t *testing.T) {
status, err := sj.CurrentStatus(ctx, nil /* txn */)
require.NoError(t, err)
require.Equal(t, jobs.StatusCancelRequested, status)
for _, id := range jr.CurrentlyRunningJobs() {
require.NotEqual(t, id, sj.ID())
}
// Start should fail since we have already called cancel on the job.
err = sj.Start(ctx)
require.Regexp(t, "job with status cancel-requested cannot be marked started", err)
require.Regexp(t, "cannot be started more than once", err)
})
setUpRunTest := func(t *testing.T) (
sj *jobs.StartableJob,
Expand Down
26 changes: 15 additions & 11 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,12 +652,13 @@ func (r *Registry) CreateStartableJobWithTxn(
// Using a new context allows for independent lifetimes and cancellation.
resumerCtx, cancel = r.makeCtx()

r.mu.Lock()
defer r.mu.Unlock()
if _, alreadyRegistered := r.mu.adoptedJobs[jobID]; alreadyRegistered {
log.Fatalf(ctx, "job %d: was just created but found in registered adopted jobs", jobID)
if alreadyAdopted := r.addAdoptedJob(jobID, j.sessionID, cancel); alreadyAdopted {
log.Fatalf(
ctx,
"job %d: was just created but found in registered adopted jobs",
jobID,
)
}
r.mu.adoptedJobs[jobID] = &adoptedJob{sid: j.sessionID, cancel: cancel}
execDone = make(chan struct{})
}

Expand Down Expand Up @@ -1362,17 +1363,20 @@ func (r *Registry) cancelAllAdoptedJobs() {
func (r *Registry) unregister(jobID jobspb.JobID) {
r.mu.Lock()
defer r.mu.Unlock()

aj, ok := r.mu.adoptedJobs[jobID]
// It is possible for a job to be double unregistered. unregister is always
// called at the end of resume. But it can also be called during deprecatedCancelAll
// and in the adopt loop under certain circumstances.
if ok {
if aj, ok := r.mu.adoptedJobs[jobID]; ok {
aj.cancel()
delete(r.mu.adoptedJobs, jobID)
}
}

func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) {
r.mu.Lock()
defer r.mu.Unlock()
if aj, ok := r.mu.adoptedJobs[jobID]; ok {
aj.cancel()
}
}

// RetryInitialDelay returns the value of retryInitialDelaySetting cluster setting,
// in seconds, which is the initial delay in exponential-backoff delay calculation.
func (r *Registry) RetryInitialDelay() float64 {
Expand Down
Loading

0 comments on commit 288e504

Please sign in to comment.