From de5c7f7122022c2237a25be1dd81c6fab778e4b4 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 26 Aug 2021 12:04:56 -0400 Subject: [PATCH 1/2] jobs: fix race which permitted concurrent execution The race was that we'd unregister a job from the `adoptedJobs` map while processing the pause or cancel requests. This is problematic because the job also unregisters itself in such cases. However, because the job was already unregistered, the entry which the now canceled job removes may in fact correspond to a new execution. We remove this hazard by centralizing the location where the entry is actually removed. Instead, we have the cancelation/pause processing only cancel the context of the relevant execution. Fixes #68593 Release note: None Release justification: bug fixes and low-risk updates to new functionality --- pkg/ccl/backupccl/backup_test.go | 2 +- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/changefeed_test.go | 4 +- pkg/ccl/changefeedccl/helpers_test.go | 2 + pkg/ccl/multiregionccl/region_test.go | 13 +++-- pkg/jobs/adopt.go | 40 ++++++++----- pkg/jobs/jobs.go | 15 ++++- pkg/jobs/jobs_test.go | 71 ++++++++++++------------ pkg/jobs/registry.go | 26 +++++---- pkg/sql/type_change.go | 4 +- pkg/sql/type_change_test.go | 27 +++++---- 11 files changed, 118 insertions(+), 87 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 8d5b82d97fca..01c7bc22466e 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -2984,7 +2984,7 @@ func TestBackupRestoreDuringUserDefinedTypeChange(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() error { + RunBeforeEnumMemberPromotion: func(context.Context) error { mu.Lock() if numTypeChangesStarted < len(tc.queries) { numTypeChangesStarted++ diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 4463d819b7ba..503276fe23f2 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -168,6 +168,7 @@ go_test( "//pkg/server/status", "//pkg/server/telemetry", "//pkg/settings/cluster", + "//pkg/spanconfig", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkv", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 752970b32128..b10359f9aefa 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -3177,6 +3177,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { select { case <-waitUntilClosed: case <-done: + case <-ctx.Done(): } default: } @@ -3286,11 +3287,10 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { // canceled. waitForBlocked = requestBlockedScan() sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN d INT NOT NULL DEFAULT 2`) - unblock := waitForBlocked() + _ = waitForBlocked() waitForRecord() sqlDB.Exec(t, `CANCEL JOB $1`, foo.(cdctest.EnterpriseTestFeed).JobID()) waitForNoRecord() - unblock() } }, feedTestNoTenants, withArgsFn(func(args *base.TestServerArgs) { storeKnobs := &kvserver.StoreTestingKnobs{} diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 2a7292b69d3e..c1379ffb411a 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -326,6 +327,7 @@ func startTestFullServer( knobs := base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{}}, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + SpanConfig: &spanconfig.TestingKnobs{ManagerDisableJobCreation: true}, } if options.knobsFn != nil { options.knobsFn(&knobs) diff --git a/pkg/ccl/multiregionccl/region_test.go b/pkg/ccl/multiregionccl/region_test.go index baafd5317ae9..e2b3f2d163a5 100644 --- a/pkg/ccl/multiregionccl/region_test.go +++ b/pkg/ccl/multiregionccl/region_test.go @@ -9,6 +9,7 @@ package multiregionccl_test import ( + "context" "sort" "strings" "testing" @@ -119,7 +120,7 @@ func TestConcurrentAddDropRegions(t *testing.T) { knobs := base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() error { + RunBeforeEnumMemberPromotion: func(context.Context) error { mu.Lock() if firstOp { firstOp = false @@ -258,7 +259,7 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) { knobs := base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() error { + RunBeforeEnumMemberPromotion: func(ctx context.Context) error { mu.Lock() defer mu.Unlock() close(typeChangeStarted) @@ -383,7 +384,7 @@ ALTER TABLE db.public.global CONFIGURE ZONE USING knobs := base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() error { + RunBeforeEnumMemberPromotion: func(context.Context) error { close(regionOpStarted) <-placementOpFinished return nil @@ -444,7 +445,7 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) { dropRegionFinished := make(chan struct{}) knobs := base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() error { + RunBeforeEnumMemberPromotion: func(context.Context) error { mu.Lock() defer mu.Unlock() if dropRegionStarted != nil { @@ -845,7 +846,7 @@ func TestRegionAddDropWithConcurrentBackupOps(t *testing.T) { backupKnobs := base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() error { + RunBeforeEnumMemberPromotion: func(ctx context.Context) error { mu.Lock() defer mu.Unlock() if waitInTypeSchemaChangerDuringBackup { @@ -902,7 +903,7 @@ INSERT INTO db.rbr VALUES (1,1),(2,2),(3,3); restoreKnobs := base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() error { + RunBeforeEnumMemberPromotion: func(context.Context) error { mu.Lock() defer mu.Unlock() if !regionAlterCmd.shouldSucceed { diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 5418def3885d..0669a72f1b58 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -325,33 +325,43 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven } resumeCtx, cancel := r.makeCtx() - aj := &adoptedJob{sid: s.ID(), cancel: cancel} - r.addAdoptedJob(jobID, aj) + if alreadyAdopted := r.addAdoptedJob(jobID, s.ID(), cancel); alreadyAdopted { + return nil + } + r.metrics.ResumedJobs.Inc(1) if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) { // Wait for the job to finish. No need to print the error because if there // was one it's been set in the job status already. _ = r.runJob(resumeCtx, resumer, job, status, job.taskName()) }); err != nil { - r.removeAdoptedJob(jobID) + r.unregister(jobID) return err } return nil } -func (r *Registry) removeAdoptedJob(jobID jobspb.JobID) { +// addAdoptedJob adds the job to the set of currently running jobs. This set is +// used for introspection, and, importantly, to serve as a lock to prevent +// concurrent executions. Removal occurs in runJob or in the case that we were +// unable to launch the goroutine to call runJob. If the returned boolean is +// false, it means that the job is already registered as running and should not +// be run again. +func (r *Registry) addAdoptedJob( + jobID jobspb.JobID, sessionID sqlliveness.SessionID, cancel context.CancelFunc, +) (alreadyAdopted bool) { r.mu.Lock() defer r.mu.Unlock() - delete(r.mu.adoptedJobs, jobID) -} -func (r *Registry) addAdoptedJob(jobID jobspb.JobID, aj *adoptedJob) { - // TODO(sajjad): We should check whether adoptedJobs already has jobID or not. If - // the ID exists, we should not add it again and the caller should not start - // another resumer. - r.mu.Lock() - defer r.mu.Unlock() - r.mu.adoptedJobs[jobID] = aj + if _, alreadyAdopted = r.mu.adoptedJobs[jobID]; alreadyAdopted { + return true + } + + r.mu.adoptedJobs[jobID] = &adoptedJob{ + sid: sessionID, + cancel: cancel, + } + return false } func (r *Registry) runJob( @@ -455,11 +465,11 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes statusString := *row[1].(*tree.DString) switch Status(statusString) { case StatusPaused: - r.unregister(id) + r.cancelRegisteredJobContext(id) log.Infof(ctx, "job %d, session %s: paused", id, s.ID()) case StatusReverting: if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error { - r.unregister(id) + r.cancelRegisteredJobContext(id) md.Payload.Error = errJobCanceled.Error() encodedErr := errors.EncodeError(ctx, errJobCanceled) md.Payload.FinalResumeError = &encodedErr diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index a21fefd30a1e..093310139f26 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -862,7 +862,7 @@ func (j *Job) getRunStats() (rs RunStats) { // and nothing will be send on errCh. Clients must not start jobs more than // once. func (sj *StartableJob) Start(ctx context.Context) (err error) { - if starts := atomic.AddInt64(&sj.starts, 1); starts != 1 { + if alreadyStarted := sj.recordStart(); alreadyStarted { return errors.AssertionFailedf( "StartableJob %d cannot be started more than once", sj.ID()) } @@ -960,10 +960,21 @@ func (sj *StartableJob) CleanupOnRollback(ctx context.Context) error { // Cancel will mark the job as canceled and release its resources in the // Registry. func (sj *StartableJob) Cancel(ctx context.Context) error { - defer sj.registry.unregister(sj.ID()) + alreadyStarted := sj.recordStart() // prevent future start attempts + defer func() { + if alreadyStarted { + sj.registry.cancelRegisteredJobContext(sj.ID()) + } else { + sj.registry.unregister(sj.ID()) + } + }() return sj.registry.CancelRequested(ctx, nil, sj.ID()) } +func (sj *StartableJob) recordStart() (alreadyStarted bool) { + return atomic.AddInt64(&sj.starts, 1) != 1 +} + // FormatRetriableExecutionErrorLogToStringArray extracts the events // stored in the payload, formats them into strings and returns them as an // array of strings. This function is intended for use with crdb_internal.jobs. diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index bda0533d2a07..c02134f67a84 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -44,7 +45,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -222,6 +222,9 @@ func (rts *registryTestSuite) setUp(t *testing.T) { } } args.Knobs.JobsTestingKnobs = knobs + args.Knobs.SpanConfig = &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, + } if rts.traceRealSpan { baseDir, dirCleanupFn := testutils.TempDir(t) @@ -283,6 +286,11 @@ func (rts *registryTestSuite) setUp(t *testing.T) { }, FailOrCancel: func(ctx context.Context) error { t.Log("Starting FailOrCancel") + if rts.traceRealSpan { + // Add a dummy recording so we actually see something in the trace. + span := tracing.SpanFromContext(ctx) + span.RecordStructured(&types.StringValue{Value: "boom"}) + } rts.mu.Lock() rts.mu.a.OnFailOrCancelStart = true rts.mu.Unlock() @@ -362,7 +370,6 @@ func (rts *registryTestSuite) check(t *testing.T, expectedStatus jobs.Status) { func TestRegistryLifecycle(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 68315, "flaky test") defer log.Scope(t).Close(t) t.Run("normal success", func(t *testing.T) { @@ -1095,42 +1102,40 @@ func TestRegistryLifecycle(t *testing.T) { }) t.Run("dump traces on cancel", func(t *testing.T) { - rts := registryTestSuite{traceRealSpan: true} + completeCh := make(chan struct{}) + rts := registryTestSuite{traceRealSpan: true, afterJobStateMachine: func() { + completeCh <- struct{}{} + }} rts.setUp(t) defer rts.tearDown() + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) + j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob) + if err != nil { + t.Fatal(err) + } + rts.job = j - runJobAndFail := func(expectedNumFiles int) { - j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob) - if err != nil { - t.Fatal(err) - } - rts.job = j - - rts.mu.e.ResumeStart = true - rts.resumeCheckCh <- struct{}{} - rts.check(t, jobs.StatusRunning) + rts.mu.e.ResumeStart = true + rts.resumeCheckCh <- struct{}{} + rts.check(t, jobs.StatusRunning) - rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID()) + rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID()) - // Cancellation will cause the running instance of the job to get context - // canceled causing it to potentially dump traces. - require.Error(t, rts.job.AwaitCompletion(rts.ctx)) - checkTraceFiles(t, rts.registry, expectedNumFiles) + <-completeCh + checkTraceFiles(t, rts.registry, 1) - rts.mu.e.OnFailOrCancelStart = true - rts.check(t, jobs.StatusReverting) + rts.mu.e.OnFailOrCancelStart = true + rts.check(t, jobs.StatusReverting) - rts.failOrCancelCheckCh <- struct{}{} - close(rts.failOrCancelCheckCh) - rts.failOrCancelCh <- nil - close(rts.failOrCancelCh) - rts.mu.e.OnFailOrCancelExit = true + rts.failOrCancelCheckCh <- struct{}{} + close(rts.failOrCancelCheckCh) + rts.failOrCancelCh <- nil + close(rts.failOrCancelCh) + rts.mu.e.OnFailOrCancelExit = true - rts.check(t, jobs.StatusCanceled) - } + rts.check(t, jobs.StatusCanceled) - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) - runJobAndFail(1) + <-completeCh }) } @@ -1754,7 +1759,7 @@ func TestShowJobs(t *testing.T) { defer log.Scope(t).Close(t) params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()} + params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() s, rawSQLDB, _ := serverutils.StartServer(t, params) sqlDB := sqlutils.MakeSQLRunner(rawSQLDB) ctx := context.Background() @@ -2583,11 +2588,9 @@ func TestStartableJob(t *testing.T) { status, err := sj.CurrentStatus(ctx, nil /* txn */) require.NoError(t, err) require.Equal(t, jobs.StatusCancelRequested, status) - for _, id := range jr.CurrentlyRunningJobs() { - require.NotEqual(t, id, sj.ID()) - } + // Start should fail since we have already called cancel on the job. err = sj.Start(ctx) - require.Regexp(t, "job with status cancel-requested cannot be marked started", err) + require.Regexp(t, "cannot be started more than once", err) }) setUpRunTest := func(t *testing.T) ( sj *jobs.StartableJob, diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 1aecec80c39b..5ef86daa6957 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -652,12 +652,13 @@ func (r *Registry) CreateStartableJobWithTxn( // Using a new context allows for independent lifetimes and cancellation. resumerCtx, cancel = r.makeCtx() - r.mu.Lock() - defer r.mu.Unlock() - if _, alreadyRegistered := r.mu.adoptedJobs[jobID]; alreadyRegistered { - log.Fatalf(ctx, "job %d: was just created but found in registered adopted jobs", jobID) + if alreadyAdopted := r.addAdoptedJob(jobID, j.sessionID, cancel); alreadyAdopted { + log.Fatalf( + ctx, + "job %d: was just created but found in registered adopted jobs", + jobID, + ) } - r.mu.adoptedJobs[jobID] = &adoptedJob{sid: j.sessionID, cancel: cancel} execDone = make(chan struct{}) } @@ -1340,17 +1341,20 @@ func (r *Registry) cancelAllAdoptedJobs() { func (r *Registry) unregister(jobID jobspb.JobID) { r.mu.Lock() defer r.mu.Unlock() - - aj, ok := r.mu.adoptedJobs[jobID] - // It is possible for a job to be double unregistered. unregister is always - // called at the end of resume. But it can also be called during deprecatedCancelAll - // and in the adopt loop under certain circumstances. - if ok { + if aj, ok := r.mu.adoptedJobs[jobID]; ok { aj.cancel() delete(r.mu.adoptedJobs, jobID) } } +func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) { + r.mu.Lock() + defer r.mu.Unlock() + if aj, ok := r.mu.adoptedJobs[jobID]; ok { + aj.cancel() + } +} + // RetryInitialDelay returns the value of retryInitialDelaySetting cluster setting, // in seconds, which is the initial delay in exponential-backoff delay calculation. func (r *Registry) RetryInitialDelay() float64 { diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index d2f5fc707c80..b3f2b0a4b075 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -180,7 +180,7 @@ type TypeSchemaChangerTestingKnobs struct { RunBeforeExec func() error // RunBeforeEnumMemberPromotion runs before enum members are promoted from // readable to all permissions in the typeSchemaChanger. - RunBeforeEnumMemberPromotion func() error + RunBeforeEnumMemberPromotion func(ctx context.Context) error // RunAfterOnFailOrCancel runs after OnFailOrCancel completes, if // OnFailOrCancel is triggered. RunAfterOnFailOrCancel func() error @@ -272,7 +272,7 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error { typeDesc.GetKind() == descpb.TypeDescriptor_MULTIREGION_ENUM) && len(t.transitioningMembers) != 0 { if fn := t.execCfg.TypeSchemaChangerTestingKnobs.RunBeforeEnumMemberPromotion; fn != nil { - if err := fn(); err != nil { + if err := fn(ctx); err != nil { return err } } diff --git a/pkg/sql/type_change_test.go b/pkg/sql/type_change_test.go index 7f245c3197cd..31dd5b272f16 100644 --- a/pkg/sql/type_change_test.go +++ b/pkg/sql/type_change_test.go @@ -13,7 +13,6 @@ package sql_test import ( "context" "fmt" - "sync" "testing" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -517,18 +516,18 @@ func TestTypeChangeJobCancelSemantics(t *testing.T) { params, _ := tests.CreateTestServerParams() // Wait groups for synchronizing various parts of the test. - var typeSchemaChangeStarted sync.WaitGroup - typeSchemaChangeStarted.Add(1) - var blockTypeSchemaChange sync.WaitGroup - blockTypeSchemaChange.Add(1) - var finishedSchemaChange sync.WaitGroup - finishedSchemaChange.Add(1) + typeSchemaChangeStarted := make(chan struct{}) + blockTypeSchemaChange := make(chan struct{}) + finishedSchemaChange := make(chan struct{}) params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() params.Knobs.SQLTypeSchemaChanger = &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() error { - typeSchemaChangeStarted.Done() - blockTypeSchemaChange.Wait() + RunBeforeEnumMemberPromotion: func(ctx context.Context) error { + close(typeSchemaChangeStarted) + select { + case <-blockTypeSchemaChange: + case <-ctx.Done(): + } return nil }, } @@ -555,10 +554,10 @@ CREATE TYPE db.greetings AS ENUM ('hi', 'yo'); if !tc.cancelable && err != nil { t.Error(err) } - finishedSchemaChange.Done() + close(finishedSchemaChange) }() - typeSchemaChangeStarted.Wait() + <-typeSchemaChangeStarted _, err = sqlDB.Exec(`CANCEL JOB ( SELECT job_id FROM [SHOW JOBS] @@ -594,8 +593,8 @@ WHERE return nil }) } - blockTypeSchemaChange.Done() - finishedSchemaChange.Wait() + close(blockTypeSchemaChange) + <-finishedSchemaChange }) } } From 9b0e2ef497747fba40f0edfaf9263e49bdc11c5f Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Mon, 2 Aug 2021 22:56:47 -0400 Subject: [PATCH 2/2] tracing,job: fix NPE in TraceCollector Previously, TraceCollector.StartIter would return a nil object if we failed to resolve nodeliveness during initialization. This led to a NPE. This change now return a TraceCollector instance with the error field set to the appropriate error, so that the validity check on the iterator can correctly handle this scenario. This change also reworks the dump trace on job cancellation test. Job cancellation semantics under stress are slightly undeterministic in terms of how many times execution of OnFailOrCancel is resumed. This makes it hard to coordinate when to check and how many trace files to expect. Fixes: #68315 Release note: None Release justification: low risk, high benefit changes to existing functionality --- pkg/sql/crdb_internal.go | 10 +++++++++- pkg/util/tracing/collector/collector.go | 11 ++++++----- pkg/util/tracing/collector/collector_test.go | 4 +++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index f256deac62c9..99c29c8447c9 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -72,6 +72,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/collector" "github.com/cockroachdb/errors" ) @@ -1283,7 +1284,8 @@ CREATE TABLE crdb_internal.cluster_inflight_traces ( } traceCollector := p.ExecCfg().TraceCollector - for iter := traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() { + var iter *collector.Iterator + for iter, err = traceCollector.StartIter(ctx, traceID); err == nil && iter.Valid(); iter.Next() { nodeID, recording := iter.Value() traceString := recording.String() traceJaegerJSON, err := recording.ToJaegerJSON("", "", fmt.Sprintf("node %d", nodeID)) @@ -1298,6 +1300,12 @@ CREATE TABLE crdb_internal.cluster_inflight_traces ( return false, err } } + if err != nil { + return false, err + } + if iter.Error() != nil { + return false, iter.Error() + } return true, nil }}}, diff --git a/pkg/util/tracing/collector/collector.go b/pkg/util/tracing/collector/collector.go index 10753aa8edd5..a2432ff60ba9 100644 --- a/pkg/util/tracing/collector/collector.go +++ b/pkg/util/tracing/collector/collector.go @@ -87,11 +87,12 @@ type Iterator struct { // StartIter fetches the live nodes in the cluster, and configures the underlying // Iterator that is used to access recorded spans in a streaming fashion. -func (t *TraceCollector) StartIter(ctx context.Context, traceID uint64) *Iterator { +func (t *TraceCollector) StartIter(ctx context.Context, traceID uint64) (*Iterator, error) { tc := &Iterator{ctx: ctx, traceID: traceID, collector: t} - tc.liveNodes, tc.iterErr = nodesFromNodeLiveness(ctx, t.nodeliveness) - if tc.iterErr != nil { - return nil + var err error + tc.liveNodes, err = nodesFromNodeLiveness(ctx, t.nodeliveness) + if err != nil { + return nil, err } // Calling Next() positions the Iterator in a valid state. It will fetch the @@ -99,7 +100,7 @@ func (t *TraceCollector) StartIter(ctx context.Context, traceID uint64) *Iterato // nodes. tc.Next() - return tc + return tc, nil } // Valid returns whether the Iterator is in a valid state to read values from. diff --git a/pkg/util/tracing/collector/collector_test.go b/pkg/util/tracing/collector/collector_test.go index e90b332a008b..55c4af5f7171 100644 --- a/pkg/util/tracing/collector/collector_test.go +++ b/pkg/util/tracing/collector/collector_test.go @@ -133,10 +133,12 @@ func TestTracingCollectorGetSpanRecordings(t *testing.T) { res := make(map[roachpb.NodeID][]tracing.Recording) var iter *collector.Iterator - for iter = traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() { + var err error + for iter, err = traceCollector.StartIter(ctx, traceID); err == nil && iter.Valid(); iter.Next() { nodeID, recording := iter.Value() res[nodeID] = append(res[nodeID], recording) } + require.NoError(t, err) require.NoError(t, iter.Error()) return res }