Skip to content

Commit

Permalink
*: wrap ctx.Err() with a stack
Browse files Browse the repository at this point in the history
It's all to common to get an error like "context canceled" without any
helpful information about which context was canceled and when. This
commit strives to improve the situation by at least including the stack
trace of when the ctx.Err() was retrieved.

Release note: None
  • Loading branch information
jordanlewis committed Aug 30, 2018
1 parent 9d69acb commit 273dc9f
Show file tree
Hide file tree
Showing 61 changed files with 126 additions and 104 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

// showBackupPlanHook implements PlanHookFn.
Expand Down Expand Up @@ -74,7 +75,7 @@ func showBackupPlanHook(
for _, row := range shower.fn(desc) {
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case resultsCh <- row:
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)

type bufferEntry struct {
Expand Down Expand Up @@ -51,7 +52,7 @@ func (b *buffer) addEntry(ctx context.Context, e bufferEntry) error {
// TODO(dan): Spill to a temp rocksdb if entriesCh would block.
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case b.entriesCh <- e:
return nil
}
Expand All @@ -62,7 +63,7 @@ func (b *buffer) addEntry(ctx context.Context, e bufferEntry) error {
func (b *buffer) Get(ctx context.Context) (bufferEntry, error) {
select {
case <-ctx.Done():
return bufferEntry{}, ctx.Err()
return bufferEntry{}, errors.WithStack(ctx.Err())
case e := <-b.entriesCh:
return e, nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)

func init() {
Expand Down Expand Up @@ -191,7 +192,7 @@ func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) er

select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case w.rowsCh <- row:
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func changefeedPlanHook(
}
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case err := <-errCh:
return err
case <-startedCh:
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (p *poller) Run(ctx context.Context) error {
log.VEventf(ctx, 1, `sleeping for %s`, pollDuration)
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-time.After(pollDuration):
}
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func (p *poller) Run(ctx context.Context) error {

select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case exportsSem <- struct{}{}:
}

Expand Down Expand Up @@ -392,7 +392,7 @@ func (p *poller) runUsingRangefeeds(ctx context.Context) error {

select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case exportsSem <- struct{}{}:
}

Expand Down Expand Up @@ -463,7 +463,7 @@ func (p *poller) runUsingRangefeeds(ctx context.Context) error {
log.Fatalf(ctx, "unexpected RangeFeedEvent variant %v", t)
}
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
}
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *kafkaSink) Flush(ctx context.Context) error {
}
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-flushCh:
s.mu.Lock()
flushErr := s.mu.flushErr
Expand All @@ -259,7 +259,7 @@ func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage

select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case s.producer.Input() <- msg:
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *csvInputReader) flushBatch(ctx context.Context, finished bool, progFn p
if len(c.batch.r) > 0 {
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case c.recordCh <- c.batch:
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func readInputFiles(
currentFile++
select {
case <-done:
return ctx.Err()
return errors.WithStack(ctx.Err())
default:
}
if err := func() error {
Expand Down Expand Up @@ -324,7 +324,7 @@ func (c *rowConverter) sendBatch(ctx context.Context) error {
select {
case c.kvCh <- c.kvBatch:
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
}
c.kvBatch = make(kvBatch, 0, c.batchCap)
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func makeSSTs(
select {
case contentCh <- sc:
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
}
counts.Reset()
sst.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (c *groupCSVWriter) groupWriteCSVs(
if err := func() error {
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case c.sem <- struct{}{}:
}
defer func() { <-c.sem }()
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func waitForRebalance(ctx context.Context, l *logger, db *gosql.DB, maxStdDev fl
for {
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-statsTimer.C:
statsTimer.Read = true
stats, err := allocatorStats(db)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/roachtest/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {
for {
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-tpccComplete:
return nil
case <-time.After(time.Second):
Expand Down Expand Up @@ -217,7 +217,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {
case <-tpccComplete:
return nil
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-t.C:
}

Expand All @@ -227,7 +227,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) {
case <-tpccComplete:
return nil
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-time.After(downTime):
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/cmd/roachtest/chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package main
import (
"context"
"time"

"github.com/pkg/errors"
)

// ChaosTimer configures a chaos schedule.
Expand Down Expand Up @@ -66,7 +68,7 @@ func (ch *Chaos) Runner(c *cluster, m *monitor) func(context.Context) error {
case <-ch.Stopper:
return nil
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-t.C:
}

Expand All @@ -85,7 +87,7 @@ func (ch *Chaos) Runner(c *cluster, m *monitor) func(context.Context) error {
case <-ch.Stopper:
return nil
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-time.After(downTime):
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/clearrange.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
)

func registerClearRange(r *registry) {
Expand Down Expand Up @@ -116,7 +117,7 @@ func registerClearRange(r *registry) {
select {
case <-after:
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
}
}
// TODO(benesch): verify that every last range in the table has been
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestClusterMonitor(t *testing.T) {
m.Go(func(ctx context.Context) error {
<-ctx.Done()
fmt.Printf("worker done\n")
return ctx.Err()
return errors.WithStack(ctx.Err())
})

err := m.wait(`echo`, "1: 100\n1: dead")
Expand All @@ -92,7 +92,7 @@ func TestClusterMonitor(t *testing.T) {
})
m.Go(func(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
return errors.WithStack(ctx.Err())
})

err := m.wait(`echo`, `1`)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/hotspotsplits.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func registerHotSpotSplits(r *registry) {

select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-time.After(5 * time.Second):
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -96,7 +97,7 @@ func registerRebalanceLoad(r *registry) {

select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-time.After(5 * time.Second):
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func loadTPCCBench(
select {
case <-time.After(rebalanceWait):
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
}

_, err = db.ExecContext(ctx, `SET CLUSTER SETTING kv.snapshot_rebalance.max_rate='2MiB'`)
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

_ "github.com/lib/pq"
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/binfetcher"
Expand Down Expand Up @@ -65,7 +66,7 @@ func registerUpgrade(r *registry) {
t.WorkerStatus("sleeping")
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-time.After(ts):
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

_ "github.com/lib/pq"
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/util/binfetcher"
)
Expand Down Expand Up @@ -88,7 +89,7 @@ func registerVersion(r *registry) {
t.WorkerStatus("sleeping")
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case <-time.After(stageDuration):
}
// Make sure everyone is still running.
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *server) Gossip(stream Gossip_GossipServer) error {
send := func(reply *Response) error {
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case syncChan <- struct{}{}:
defer func() { <-syncChan }()

Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/client/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration {
func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error {
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case l.val.sem <- struct{}{}:
}
defer func() { <-l.val.sem }()
Expand Down Expand Up @@ -173,7 +173,7 @@ func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error {
func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error {
select {
case <-ctx.Done():
return ctx.Err()
return errors.WithStack(ctx.Err())
case l.val.sem <- struct{}{}:
}
defer func() { <-l.val.sem }()
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/client/range_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func RangeLookup(
key, prefetchedRanges)
}

ctxErr := ctx.Err()
ctxErr := errors.WithStack(ctx.Err())
if ctxErr == nil {
log.Fatalf(ctx, "retry loop broke before context expired")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// error condition this loop isn't capable of handling.
for {
if err := ctx.Err(); err != nil {
return err
return errors.WithStack(err)
}
err = fn(ctx, txn)

Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ func RunAndWaitForTerminalState(
for r := retry.StartWithCtx(ctx, retry.Options{}); ; {
select {
case <-ctx.Done():
return 0, "", ctx.Err()
return 0, "", errors.WithStack(ctx.Err())
case execErr = <-execErrCh:
// The closure finished, try to fetch a job id one more time. Close
// and nil out execErrCh so it blocks from now on.
Expand All @@ -671,7 +671,7 @@ func RunAndWaitForTerminalState(
for r := retry.StartWithCtx(ctx, retry.Options{}); ; {
select {
case <-ctx.Done():
return jobID, "", ctx.Err()
return jobID, "", errors.WithStack(ctx.Err())
case execErr = <-execErrCh:
// The closure finished, this is a nice hint to wake up, but it only
// works once. Close and nil out execErrCh so it blocks from now on.
Expand Down
Loading

0 comments on commit 273dc9f

Please sign in to comment.