Skip to content

Commit

Permalink
tracing,job: fix NPE in TraceCollector
Browse files Browse the repository at this point in the history
Previously, TraceCollector.StartIter would return a nil object
if we failed to resolve nodeliveness during initialization. This
led to a NPE.

This change now return a TraceCollector instance with the error field
set to the appropriate error, so that the validity check on the iterator
can correctly handle this scenario.

This change also reworks the dump trace on job cancellation test.
Job cancellation semantics under stress are slightly undeterministic in
terms of how many times execution of OnFailOrCancel is resumed. This makes
it hard to coordinate when to check and how many trace files to expect.

Fixes: #68315

Release note: None
  • Loading branch information
adityamaru committed Aug 3, 2021
1 parent 2317cc8 commit 8a25365
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 29 deletions.
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ go_test(
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
Expand Down
65 changes: 38 additions & 27 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -282,6 +283,11 @@ func (rts *registryTestSuite) setUp(t *testing.T) {
},
FailOrCancel: func(ctx context.Context) error {
t.Log("Starting FailOrCancel")
if rts.traceRealSpan {
// Add a dummy recording so we actually see something in the trace.
span := tracing.SpanFromContext(ctx)
span.RecordStructured(&types.StringValue{Value: "boom"})
}
rts.mu.Lock()
rts.mu.a.OnFailOrCancelStart = true
rts.mu.Unlock()
Expand Down Expand Up @@ -1093,42 +1099,47 @@ func TestRegistryLifecycle(t *testing.T) {
})

t.Run("dump traces on cancel", func(t *testing.T) {
skip.UnderStress(t, "job cancellation semantics is flaky under stress")
rts := registryTestSuite{traceRealSpan: true}
rts.setUp(t)
defer rts.tearDown()

runJobAndFail := func(expectedNumFiles int) {
j, err := jobs.TestingCreateAndStartJob(context.Background(), rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
var mu syncutil.Mutex
blockCh := make(chan struct{})
shouldBlock := true
rts.afterJobStateMachine = func() {
mu.Lock()
defer mu.Unlock()
if shouldBlock {
shouldBlock = false
blockCh <- struct{}{}
}
rts.job = j

rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)
}
rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`)
defer rts.tearDown()
j, err := jobs.TestingCreateAndStartJob(rts.ctx, rts.registry, rts.s.DB(), rts.mockJob)
if err != nil {
t.Fatal(err)
}
rts.job = j

rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())
rts.mu.e.ResumeStart = true
rts.resumeCheckCh <- struct{}{}
rts.check(t, jobs.StatusRunning)

// Cancellation will cause the running instance of the job to get context
// canceled causing it to potentially dump traces.
require.Error(t, rts.job.AwaitCompletion(rts.ctx))
checkTraceFiles(t, rts.registry, expectedNumFiles)
rts.sqlDB.Exec(t, "CANCEL JOB $1", j.ID())

rts.mu.e.OnFailOrCancelStart = true
rts.check(t, jobs.StatusReverting)
<-blockCh
checkTraceFiles(t, rts.registry, 1)

rts.failOrCancelCheckCh <- struct{}{}
close(rts.failOrCancelCheckCh)
rts.failOrCancelCh <- nil
close(rts.failOrCancelCh)
rts.mu.e.OnFailOrCancelExit = true
rts.mu.e.OnFailOrCancelStart = true
rts.check(t, jobs.StatusReverting)

rts.check(t, jobs.StatusCanceled)
}
rts.failOrCancelCheckCh <- struct{}{}
close(rts.failOrCancelCheckCh)
rts.failOrCancelCh <- nil
close(rts.failOrCancelCh)
rts.mu.e.OnFailOrCancelExit = true

rts.sqlDB.Exec(t, `SET CLUSTER SETTING jobs.trace.force_dump_mode='onStop'`)
runJobAndFail(1)
rts.check(t, jobs.StatusCanceled)
})
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/collector"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -1225,7 +1226,8 @@ CREATE TABLE crdb_internal.cluster_inflight_traces (
}

traceCollector := p.ExecCfg().TraceCollector
for iter := traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() {
var iter *collector.Iterator
for iter = traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() {
nodeID, recording := iter.Value()
traceString := recording.String()
traceJaegerJSON, err := recording.ToJaegerJSON("", "", fmt.Sprintf("node %d", nodeID))
Expand All @@ -1240,6 +1242,9 @@ CREATE TABLE crdb_internal.cluster_inflight_traces (
return false, err
}
}
if iter.Error() != nil {
return false, iter.Error()
}

return true, nil
}}},
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/tracing/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (t *TraceCollector) StartIter(ctx context.Context, traceID uint64) *Iterato
tc := &Iterator{ctx: ctx, traceID: traceID, collector: t}
tc.liveNodes, tc.iterErr = nodesFromNodeLiveness(ctx, t.nodeliveness)
if tc.iterErr != nil {
return nil
return tc
}

// Calling Next() positions the Iterator in a valid state. It will fetch the
Expand Down

0 comments on commit 8a25365

Please sign in to comment.