diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index b811772b1da0..1102f052152b 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -109,6 +109,7 @@ go_test( "//pkg/sql/tests", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/ctxgroup", diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 7f853591a867..2fbef3a9a3b1 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -282,6 +283,11 @@ func (rts *registryTestSuite) setUp(t *testing.T) { }, FailOrCancel: func(ctx context.Context) error { t.Log("Starting FailOrCancel") + if rts.traceRealSpan { + // Add a dummy recording so we actually see something in the trace. + span := tracing.SpanFromContext(ctx) + span.RecordStructured(&types.StringValue{Value: "boom"}) + } rts.mu.Lock() rts.mu.a.OnFailOrCancelStart = true rts.mu.Unlock() @@ -1093,42 +1099,47 @@ func TestRegistryLifecycle(t *testing.T) { }) t.Run("dump traces on cancel", func(t *testing.T) { + skip.UnderStress(t, "job cancellation semantics is flaky under stress") rts := registryTestSuite{traceRealSpan: true} rts.setUp(t) - defer rts.tearDown() - - runJobAndFail := func(expectedNumFiles int) { - j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob) - if err != nil { - t.Fatal(err) + var mu syncutil.Mutex + blockCh := make(chan struct{}) + shouldBlock := true + rts.afterJobStateMachine = func() { + mu.Lock() + defer mu.Unlock() + if shouldBlock { + shouldBlock = false + blockCh <- struct{}{} } - rts.job = j - - rts.mu.e.ResumeStart = true - rts.resumeCheckCh <- struct{}{} - rts.check(t, jobs.StatusRunning) + } + rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) + defer rts.tearDown() + j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob) + if err != nil { + t.Fatal(err) + } + rts.job = j - rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID()) + rts.mu.e.ResumeStart = true + rts.resumeCheckCh <- struct{}{} + rts.check(t, jobs.StatusRunning) - // Cancellation will cause the running instance of the job to get context - // canceled causing it to potentially dump traces. - require.Error(t, rts.job.AwaitCompletion(rts.ctx)) - checkTraceFiles(t, rts.registry, expectedNumFiles) + rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID()) - rts.mu.e.OnFailOrCancelStart = true - rts.check(t, jobs.StatusReverting) + <-blockCh + checkTraceFiles(t, rts.registry, 1) - rts.failOrCancelCheckCh <- struct{}{} - close(rts.failOrCancelCheckCh) - rts.failOrCancelCh <- nil - close(rts.failOrCancelCh) - rts.mu.e.OnFailOrCancelExit = true + rts.mu.e.OnFailOrCancelStart = true + rts.check(t, jobs.StatusReverting) - rts.check(t, jobs.StatusCanceled) - } + rts.failOrCancelCheckCh <- struct{}{} + close(rts.failOrCancelCheckCh) + rts.failOrCancelCh <- nil + close(rts.failOrCancelCh) + rts.mu.e.OnFailOrCancelExit = true - rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`) - runJobAndFail(1) + rts.check(t, jobs.StatusCanceled) }) } diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 4884394f0fc7..c9ad95212cd9 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -67,6 +67,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/collector" "github.com/cockroachdb/errors" ) @@ -1225,7 +1226,8 @@ CREATE TABLE crdb_internal.cluster_inflight_traces ( } traceCollector := p.ExecCfg().TraceCollector - for iter := traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() { + var iter *collector.Iterator + for iter = traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() { nodeID, recording := iter.Value() traceString := recording.String() traceJaegerJSON, err := recording.ToJaegerJSON("", "", fmt.Sprintf("node %d", nodeID)) @@ -1240,6 +1242,9 @@ CREATE TABLE crdb_internal.cluster_inflight_traces ( return false, err } } + if iter.Error() != nil { + return false, iter.Error() + } return true, nil }}}, diff --git a/pkg/util/tracing/collector/collector.go b/pkg/util/tracing/collector/collector.go index 10753aa8edd5..e7189882cbc7 100644 --- a/pkg/util/tracing/collector/collector.go +++ b/pkg/util/tracing/collector/collector.go @@ -91,7 +91,7 @@ func (t *TraceCollector) StartIter(ctx context.Context, traceID uint64) *Iterato tc := &Iterator{ctx: ctx, traceID: traceID, collector: t} tc.liveNodes, tc.iterErr = nodesFromNodeLiveness(ctx, t.nodeliveness) if tc.iterErr != nil { - return nil + return tc } // Calling Next() positions the Iterator in a valid state. It will fetch the