Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: fix race which permitted concurrent execution #68374

Merged
merged 2 commits into from
Aug 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2984,7 +2984,7 @@ func TestBackupRestoreDuringUserDefinedTypeChange(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
if numTypeChangesStarted < len(tc.queries) {
numTypeChangesStarted++
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ go_test(
"//pkg/server/status",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkv",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3177,6 +3177,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
select {
case <-waitUntilClosed:
case <-done:
case <-ctx.Done():
}
default:
}
Expand Down Expand Up @@ -3286,11 +3287,10 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
// canceled.
waitForBlocked = requestBlockedScan()
sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN d INT NOT NULL DEFAULT 2`)
unblock := waitForBlocked()
_ = waitForBlocked()
waitForRecord()
sqlDB.Exec(t, `CANCEL JOB $1`, foo.(cdctest.EnterpriseTestFeed).JobID())
waitForNoRecord()
unblock()
}
}, feedTestNoTenants, withArgsFn(func(args *base.TestServerArgs) {
storeKnobs := &kvserver.StoreTestingKnobs{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -326,6 +327,7 @@ func startTestFullServer(
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
SpanConfig: &spanconfig.TestingKnobs{ManagerDisableJobCreation: true},
}
if options.knobsFn != nil {
options.knobsFn(&knobs)
Expand Down
13 changes: 7 additions & 6 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package multiregionccl_test

import (
"context"
"sort"
"strings"
"testing"
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestConcurrentAddDropRegions(t *testing.T) {

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
if firstOp {
firstOp = false
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(ctx context.Context) error {
mu.Lock()
defer mu.Unlock()
close(typeChangeStarted)
Expand Down Expand Up @@ -383,7 +384,7 @@ ALTER TABLE db.public.global CONFIGURE ZONE USING

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
close(regionOpStarted)
<-placementOpFinished
return nil
Expand Down Expand Up @@ -444,7 +445,7 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) {
dropRegionFinished := make(chan struct{})
knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
defer mu.Unlock()
if dropRegionStarted != nil {
Expand Down Expand Up @@ -845,7 +846,7 @@ func TestRegionAddDropWithConcurrentBackupOps(t *testing.T) {

backupKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(ctx context.Context) error {
mu.Lock()
defer mu.Unlock()
if waitInTypeSchemaChangerDuringBackup {
Expand Down Expand Up @@ -902,7 +903,7 @@ INSERT INTO db.rbr VALUES (1,1),(2,2),(3,3);

restoreKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
RunBeforeEnumMemberPromotion: func(context.Context) error {
mu.Lock()
defer mu.Unlock()
if !regionAlterCmd.shouldSucceed {
Expand Down
40 changes: 25 additions & 15 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,33 +325,43 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven
}
resumeCtx, cancel := r.makeCtx()

aj := &adoptedJob{sid: s.ID(), cancel: cancel}
r.addAdoptedJob(jobID, aj)
if alreadyAdopted := r.addAdoptedJob(jobID, s.ID(), cancel); alreadyAdopted {
return nil
}

r.metrics.ResumedJobs.Inc(1)
if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) {
// Wait for the job to finish. No need to print the error because if there
// was one it's been set in the job status already.
_ = r.runJob(resumeCtx, resumer, job, status, job.taskName())
}); err != nil {
r.removeAdoptedJob(jobID)
r.unregister(jobID)
return err
}
return nil
}

func (r *Registry) removeAdoptedJob(jobID jobspb.JobID) {
// addAdoptedJob adds the job to the set of currently running jobs. This set is
// used for introspection, and, importantly, to serve as a lock to prevent
// concurrent executions. Removal occurs in runJob or in the case that we were
// unable to launch the goroutine to call runJob. If the returned boolean is
// false, it means that the job is already registered as running and should not
// be run again.
func (r *Registry) addAdoptedJob(
jobID jobspb.JobID, sessionID sqlliveness.SessionID, cancel context.CancelFunc,
) (alreadyAdopted bool) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.mu.adoptedJobs, jobID)
}

func (r *Registry) addAdoptedJob(jobID jobspb.JobID, aj *adoptedJob) {
// TODO(sajjad): We should check whether adoptedJobs already has jobID or not. If
// the ID exists, we should not add it again and the caller should not start
// another resumer.
r.mu.Lock()
defer r.mu.Unlock()
r.mu.adoptedJobs[jobID] = aj
if _, alreadyAdopted = r.mu.adoptedJobs[jobID]; alreadyAdopted {
return true
}

r.mu.adoptedJobs[jobID] = &adoptedJob{
sid: sessionID,
cancel: cancel,
}
return false
}

func (r *Registry) runJob(
Expand Down Expand Up @@ -455,11 +465,11 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
statusString := *row[1].(*tree.DString)
switch Status(statusString) {
case StatusPaused:
r.unregister(id)
r.cancelRegisteredJobContext(id)
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
r.unregister(id)
r.cancelRegisteredJobContext(id)
md.Payload.Error = errJobCanceled.Error()
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
Expand Down
15 changes: 13 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func (j *Job) getRunStats() (rs RunStats) {
// and nothing will be send on errCh. Clients must not start jobs more than
// once.
func (sj *StartableJob) Start(ctx context.Context) (err error) {
if starts := atomic.AddInt64(&sj.starts, 1); starts != 1 {
if alreadyStarted := sj.recordStart(); alreadyStarted {
return errors.AssertionFailedf(
"StartableJob %d cannot be started more than once", sj.ID())
}
Expand Down Expand Up @@ -960,10 +960,21 @@ func (sj *StartableJob) CleanupOnRollback(ctx context.Context) error {
// Cancel will mark the job as canceled and release its resources in the
// Registry.
func (sj *StartableJob) Cancel(ctx context.Context) error {
defer sj.registry.unregister(sj.ID())
alreadyStarted := sj.recordStart() // prevent future start attempts
defer func() {
if alreadyStarted {
sj.registry.cancelRegisteredJobContext(sj.ID())
} else {
sj.registry.unregister(sj.ID())
}
}()
return sj.registry.CancelRequested(ctx, nil, sj.ID())
}

func (sj *StartableJob) recordStart() (alreadyStarted bool) {
return atomic.AddInt64(&sj.starts, 1) != 1
}

// FormatRetriableExecutionErrorLogToStringArray extracts the events
// stored in the payload, formats them into strings and returns them as an
// array of strings. This function is intended for use with crdb_internal.jobs.
Expand Down
71 changes: 37 additions & 34 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -44,7 +45,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -222,6 +222,9 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
}
}
args.Knobs.JobsTestingKnobs = knobs
args.Knobs.SpanConfig = &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
}

if rts.traceRealSpan {
baseDir, dirCleanupFn := testutils.TempDir(t)
Expand Down Expand Up @@ -283,6 +286,11 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
},
FailOrCancel: func(ctx context.Context) error {
t.Log("Starting FailOrCancel")
if rts.traceRealSpan {
// Add a dummy recording so we actually see something in the trace.
span := tracing.SpanFromContext(ctx)
span.RecordStructured(&types.StringValue{Value: "boom"})
}
rts.mu.Lock()
rts.mu.a.OnFailOrCancelStart = true
rts.mu.Unlock()
Expand Down Expand Up @@ -362,7 +370,6 @@ func (rts *registryTestSuite) check(t *testing.T, expectedStatus jobs.Status) {

func TestRegistryLifecycle(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 68315, "flaky test")
defer log.Scope(t).Close(t)

t.Run("normal success", func(t *testing.T) {
Expand Down Expand Up @@ -1095,42 +1102,40 @@ func TestRegistryLifecycle(t *testing.T) {
})

t.Run("dump traces on cancel", func(t *testing.T) {
rts := registryTestSuite{traceRealSpan: true}
completeCh := make(chan struct{})
rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() {
completeCh <- struct{}{}
}}
rts.setUp(t)
defer rts.tearDown()
rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`)
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

runJobAndFail := func(expectedNumFiles int) {
j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())
rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())

// Cancellation will cause the running instance of the job to get context
// canceled causing it to potentially dump traces.
require.Error(t, rts.job.AwaitCompletion(rts.ctx))
checkTraceFiles(t, rts.registry, expectedNumFiles)
<-completeCh
checkTraceFiles(t, rts.registry, 1)

rts.mu.e.OnFailOrCancelStart = true
rts.check(t, jobs.StatusReverting)
rts.mu.e.OnFailOrCancelStart = true
rts.check(t, jobs.StatusReverting)

rts.failOrCancelCheckCh <- struct{}{}
close(rts.failOrCancelCheckCh)
rts.failOrCancelCh <- nil
close(rts.failOrCancelCh)
rts.mu.e.OnFailOrCancelExit = true
rts.failOrCancelCheckCh <- struct{}{}
close(rts.failOrCancelCheckCh)
rts.failOrCancelCh <- nil
close(rts.failOrCancelCh)
rts.mu.e.OnFailOrCancelExit = true

rts.check(t, jobs.StatusCanceled)
}
rts.check(t, jobs.StatusCanceled)

rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`)
runJobAndFail(1)
<-completeCh
})
}

Expand Down Expand Up @@ -1754,7 +1759,7 @@ func TestShowJobs(t *testing.T) {
defer log.Scope(t).Close(t)

params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}
params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
s, rawSQLDB, _ := serverutils.StartServer(t, params)
sqlDB := sqlutils.MakeSQLRunner(rawSQLDB)
ctx := context.Background()
Expand Down Expand Up @@ -2583,11 +2588,9 @@ func TestStartableJob(t *testing.T) {
status, err := sj.CurrentStatus(ctx, nil /* txn */)
require.NoError(t, err)
require.Equal(t, jobs.StatusCancelRequested, status)
for _, id := range jr.CurrentlyRunningJobs() {
require.NotEqual(t, id, sj.ID())
}
// Start should fail since we have already called cancel on the job.
err = sj.Start(ctx)
require.Regexp(t, "job with status cancel-requested cannot be marked started", err)
require.Regexp(t, "cannot be started more than once", err)
})
setUpRunTest := func(t *testing.T) (
sj *jobs.StartableJob,
Expand Down
Loading