From 273dc9f8a66a80082e38efa0a4d70acace6b98ff Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Wed, 29 Aug 2018 21:00:50 -0400 Subject: [PATCH] *: wrap ctx.Err() with a stack It's all to common to get an error like "context canceled" without any helpful information about which context was canceled and when. This commit strives to improve the situation by at least including the stack trace of when the ctx.Err() was retrieved. Release note: None --- pkg/ccl/backupccl/show.go | 3 ++- pkg/ccl/changefeedccl/buffer.go | 5 +++-- pkg/ccl/changefeedccl/changefeed_dist.go | 3 ++- pkg/ccl/changefeedccl/changefeed_stmt.go | 2 +- pkg/ccl/changefeedccl/poller.go | 8 ++++---- pkg/ccl/changefeedccl/sink.go | 4 ++-- pkg/ccl/importccl/read_import_csv.go | 2 +- pkg/ccl/importccl/read_import_proc.go | 4 ++-- pkg/ccl/importccl/sst_writer_proc.go | 2 +- pkg/ccl/workloadccl/fixture.go | 2 +- pkg/cmd/roachtest/allocator.go | 2 +- pkg/cmd/roachtest/cdc.go | 6 +++--- pkg/cmd/roachtest/chaos.go | 6 ++++-- pkg/cmd/roachtest/clearrange.go | 3 ++- pkg/cmd/roachtest/cluster_test.go | 4 ++-- pkg/cmd/roachtest/hotspotsplits.go | 2 +- pkg/cmd/roachtest/rebalance_load.go | 3 ++- pkg/cmd/roachtest/tpcc.go | 2 +- pkg/cmd/roachtest/upgrade.go | 3 ++- pkg/cmd/roachtest/version.go | 3 ++- pkg/gossip/server.go | 2 +- pkg/internal/client/lease.go | 4 ++-- pkg/internal/client/range_lookup.go | 2 +- pkg/internal/client/txn.go | 2 +- pkg/jobs/jobs.go | 4 ++-- pkg/jobs/progress.go | 3 ++- pkg/kv/dist_sender.go | 9 +++++---- pkg/kv/dist_sender_rangefeed.go | 3 ++- pkg/kv/range_cache.go | 2 +- pkg/kv/range_cache_test.go | 3 ++- pkg/kv/transport.go | 4 ++-- pkg/rpc/context.go | 4 ++-- pkg/rpc/heartbeat_test.go | 3 ++- pkg/server/admin.go | 4 ++-- pkg/server/node.go | 2 +- pkg/sql/conn_executor.go | 2 +- pkg/sql/distsql_running.go | 5 +++-- pkg/sql/pgwire/conn.go | 4 ++-- pkg/sql/planhook.go | 3 ++- pkg/storage/client_merge_test.go | 2 +- pkg/storage/closedts/transport/server.go | 5 +++-- pkg/storage/idalloc/id_alloc.go | 2 +- pkg/storage/intent_resolver.go | 2 +- pkg/storage/node_liveness.go | 10 +++++----- pkg/storage/queue_test.go | 2 +- pkg/storage/quota_pool.go | 5 +++-- pkg/storage/replica.go | 6 +++--- pkg/storage/replica_backpressure.go | 2 +- pkg/storage/replica_command.go | 6 +++--- pkg/storage/replica_range_lease.go | 4 ++-- pkg/storage/store.go | 4 ++-- pkg/storage/store_snapshot.go | 4 ++-- pkg/storage/stores_server.go | 3 ++- pkg/storage/txnwait/txnqueue.go | 9 +++++---- pkg/ts/server.go | 3 ++- pkg/util/ctxgroup/ctxgroup.go | 9 +++++---- pkg/util/stop/stopper.go | 8 ++++---- pkg/util/stop/stopper_test.go | 2 +- pkg/workload/csv.go | 2 +- pkg/workload/tpcc/worker.go | 2 +- pkg/workload/workload.go | 4 ++-- 61 files changed, 126 insertions(+), 104 deletions(-) diff --git a/pkg/ccl/backupccl/show.go b/pkg/ccl/backupccl/show.go index d89dbee79720..f14fa069e548 100644 --- a/pkg/ccl/backupccl/show.go +++ b/pkg/ccl/backupccl/show.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" ) // showBackupPlanHook implements PlanHookFn. @@ -74,7 +75,7 @@ func showBackupPlanHook( for _, row := range shower.fn(desc) { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case resultsCh <- row: } } diff --git a/pkg/ccl/changefeedccl/buffer.go b/pkg/ccl/changefeedccl/buffer.go index 272e193eed12..d085da3863be 100644 --- a/pkg/ccl/changefeedccl/buffer.go +++ b/pkg/ccl/changefeedccl/buffer.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/pkg/errors" ) type bufferEntry struct { @@ -51,7 +52,7 @@ func (b *buffer) addEntry(ctx context.Context, e bufferEntry) error { // TODO(dan): Spill to a temp rocksdb if entriesCh would block. select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case b.entriesCh <- e: return nil } @@ -62,7 +63,7 @@ func (b *buffer) addEntry(ctx context.Context, e bufferEntry) error { func (b *buffer) Get(ctx context.Context) (bufferEntry, error) { select { case <-ctx.Done(): - return bufferEntry{}, ctx.Err() + return bufferEntry{}, errors.WithStack(ctx.Err()) case e := <-b.entriesCh: return e, nil } diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index d9a4350293c3..5887bf5c7c97 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/pkg/errors" ) func init() { @@ -191,7 +192,7 @@ func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) er select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case w.rowsCh <- row: return nil } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 5de50d3bc608..aff8fd26b8f9 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -219,7 +219,7 @@ func changefeedPlanHook( } select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case err := <-errCh: return err case <-startedCh: diff --git a/pkg/ccl/changefeedccl/poller.go b/pkg/ccl/changefeedccl/poller.go index 30f4b3da032b..8389f1b98682 100644 --- a/pkg/ccl/changefeedccl/poller.go +++ b/pkg/ccl/changefeedccl/poller.go @@ -144,7 +144,7 @@ func (p *poller) Run(ctx context.Context) error { log.VEventf(ctx, 1, `sleeping for %s`, pollDuration) select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-time.After(pollDuration): } } @@ -219,7 +219,7 @@ func (p *poller) Run(ctx context.Context) error { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case exportsSem <- struct{}{}: } @@ -392,7 +392,7 @@ func (p *poller) runUsingRangefeeds(ctx context.Context) error { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case exportsSem <- struct{}{}: } @@ -463,7 +463,7 @@ func (p *poller) runUsingRangefeeds(ctx context.Context) error { log.Fatalf(ctx, "unexpected RangeFeedEvent variant %v", t) } case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } }) diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index f1ab7374a09b..23ccd19c76e4 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -241,7 +241,7 @@ func (s *kafkaSink) Flush(ctx context.Context) error { } select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-flushCh: s.mu.Lock() flushErr := s.mu.flushErr @@ -259,7 +259,7 @@ func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case s.producer.Input() <- msg: } diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go index 36f584871757..10f6975a4422 100644 --- a/pkg/ccl/importccl/read_import_csv.go +++ b/pkg/ccl/importccl/read_import_csv.go @@ -73,7 +73,7 @@ func (c *csvInputReader) flushBatch(ctx context.Context, finished bool, progFn p if len(c.batch.r) > 0 { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case c.recordCh <- c.batch: } } diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index 29ff37253aca..bcb6e1ec07e3 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -85,7 +85,7 @@ func readInputFiles( currentFile++ select { case <-done: - return ctx.Err() + return errors.WithStack(ctx.Err()) default: } if err := func() error { @@ -324,7 +324,7 @@ func (c *rowConverter) sendBatch(ctx context.Context) error { select { case c.kvCh <- c.kvBatch: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } c.kvBatch = make(kvBatch, 0, c.batchCap) return nil diff --git a/pkg/ccl/importccl/sst_writer_proc.go b/pkg/ccl/importccl/sst_writer_proc.go index 79fc43d6f48d..5e4d4fbab6d2 100644 --- a/pkg/ccl/importccl/sst_writer_proc.go +++ b/pkg/ccl/importccl/sst_writer_proc.go @@ -356,7 +356,7 @@ func makeSSTs( select { case contentCh <- sc: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } counts.Reset() sst.Close() diff --git a/pkg/ccl/workloadccl/fixture.go b/pkg/ccl/workloadccl/fixture.go index 0389d0440b18..ffb224933083 100644 --- a/pkg/ccl/workloadccl/fixture.go +++ b/pkg/ccl/workloadccl/fixture.go @@ -197,7 +197,7 @@ func (c *groupCSVWriter) groupWriteCSVs( if err := func() error { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case c.sem <- struct{}{}: } defer func() { <-c.sem }() diff --git a/pkg/cmd/roachtest/allocator.go b/pkg/cmd/roachtest/allocator.go index f30b4b8a55f3..51a8197c6b33 100644 --- a/pkg/cmd/roachtest/allocator.go +++ b/pkg/cmd/roachtest/allocator.go @@ -221,7 +221,7 @@ func waitForRebalance(ctx context.Context, l *logger, db *gosql.DB, maxStdDev fl for { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-statsTimer.C: statsTimer.Read = true stats, err := allocatorStats(db) diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 7b2a34a9aaeb..45a0aa575985 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -157,7 +157,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { for { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-tpccComplete: return nil case <-time.After(time.Second): @@ -217,7 +217,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { case <-tpccComplete: return nil case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-t.C: } @@ -227,7 +227,7 @@ func cdcBasicTest(ctx context.Context, t *test, c *cluster, args cdcTestArgs) { case <-tpccComplete: return nil case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-time.After(downTime): } diff --git a/pkg/cmd/roachtest/chaos.go b/pkg/cmd/roachtest/chaos.go index 2d44a2182ac6..a8cced53ec87 100644 --- a/pkg/cmd/roachtest/chaos.go +++ b/pkg/cmd/roachtest/chaos.go @@ -17,6 +17,8 @@ package main import ( "context" "time" + + "github.com/pkg/errors" ) // ChaosTimer configures a chaos schedule. @@ -66,7 +68,7 @@ func (ch *Chaos) Runner(c *cluster, m *monitor) func(context.Context) error { case <-ch.Stopper: return nil case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-t.C: } @@ -85,7 +87,7 @@ func (ch *Chaos) Runner(c *cluster, m *monitor) func(context.Context) error { case <-ch.Stopper: return nil case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-time.After(downTime): } diff --git a/pkg/cmd/roachtest/clearrange.go b/pkg/cmd/roachtest/clearrange.go index 077b865d24fa..4c95d37f5f84 100644 --- a/pkg/cmd/roachtest/clearrange.go +++ b/pkg/cmd/roachtest/clearrange.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/pkg/errors" ) func registerClearRange(r *registry) { @@ -116,7 +117,7 @@ func registerClearRange(r *registry) { select { case <-after: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } // TODO(benesch): verify that every last range in the table has been diff --git a/pkg/cmd/roachtest/cluster_test.go b/pkg/cmd/roachtest/cluster_test.go index cfed3f082ad7..80909ecff21a 100644 --- a/pkg/cmd/roachtest/cluster_test.go +++ b/pkg/cmd/roachtest/cluster_test.go @@ -74,7 +74,7 @@ func TestClusterMonitor(t *testing.T) { m.Go(func(ctx context.Context) error { <-ctx.Done() fmt.Printf("worker done\n") - return ctx.Err() + return errors.WithStack(ctx.Err()) }) err := m.wait(`echo`, "1: 100\n1: dead") @@ -92,7 +92,7 @@ func TestClusterMonitor(t *testing.T) { }) m.Go(func(ctx context.Context) error { <-ctx.Done() - return ctx.Err() + return errors.WithStack(ctx.Err()) }) err := m.wait(`echo`, `1`) diff --git a/pkg/cmd/roachtest/hotspotsplits.go b/pkg/cmd/roachtest/hotspotsplits.go index 2a5b44e751a4..e33d38ad0cc7 100644 --- a/pkg/cmd/roachtest/hotspotsplits.go +++ b/pkg/cmd/roachtest/hotspotsplits.go @@ -85,7 +85,7 @@ func registerHotSpotSplits(r *registry) { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-time.After(5 * time.Second): } } diff --git a/pkg/cmd/roachtest/rebalance_load.go b/pkg/cmd/roachtest/rebalance_load.go index a2611e1e0e49..5f8e51dbbee4 100644 --- a/pkg/cmd/roachtest/rebalance_load.go +++ b/pkg/cmd/roachtest/rebalance_load.go @@ -26,6 +26,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) @@ -96,7 +97,7 @@ func registerRebalanceLoad(r *registry) { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-time.After(5 * time.Second): } } diff --git a/pkg/cmd/roachtest/tpcc.go b/pkg/cmd/roachtest/tpcc.go index 520134a2b243..faab4aae6414 100644 --- a/pkg/cmd/roachtest/tpcc.go +++ b/pkg/cmd/roachtest/tpcc.go @@ -334,7 +334,7 @@ func loadTPCCBench( select { case <-time.After(rebalanceWait): case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } _, err = db.ExecContext(ctx, `SET CLUSTER SETTING kv.snapshot_rebalance.max_rate='2MiB'`) diff --git a/pkg/cmd/roachtest/upgrade.go b/pkg/cmd/roachtest/upgrade.go index 49625ba91889..fc030685b9d0 100644 --- a/pkg/cmd/roachtest/upgrade.go +++ b/pkg/cmd/roachtest/upgrade.go @@ -22,6 +22,7 @@ import ( "time" _ "github.com/lib/pq" + "github.com/pkg/errors" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/binfetcher" @@ -65,7 +66,7 @@ func registerUpgrade(r *registry) { t.WorkerStatus("sleeping") select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-time.After(ts): return nil } diff --git a/pkg/cmd/roachtest/version.go b/pkg/cmd/roachtest/version.go index eb19dd2bb629..5aa4fee410b1 100644 --- a/pkg/cmd/roachtest/version.go +++ b/pkg/cmd/roachtest/version.go @@ -23,6 +23,7 @@ import ( "time" _ "github.com/lib/pq" + "github.com/pkg/errors" "github.com/cockroachdb/cockroach/pkg/util/binfetcher" ) @@ -88,7 +89,7 @@ func registerVersion(r *registry) { t.WorkerStatus("sleeping") select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-time.After(stageDuration): } // Make sure everyone is still running. diff --git a/pkg/gossip/server.go b/pkg/gossip/server.go index fdfd8b46b53b..74d451232d19 100644 --- a/pkg/gossip/server.go +++ b/pkg/gossip/server.go @@ -121,7 +121,7 @@ func (s *server) Gossip(stream Gossip_GossipServer) error { send := func(reply *Response) error { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case syncChan <- struct{}{}: defer func() { <-syncChan }() diff --git a/pkg/internal/client/lease.go b/pkg/internal/client/lease.go index 395229f12f91..a7fc69d0d22b 100644 --- a/pkg/internal/client/lease.go +++ b/pkg/internal/client/lease.go @@ -142,7 +142,7 @@ func (m *LeaseManager) timeRemaining(val *LeaseVal) time.Duration { func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case l.val.sem <- struct{}{}: } defer func() { <-l.val.sem }() @@ -173,7 +173,7 @@ func (m *LeaseManager) ExtendLease(ctx context.Context, l *Lease) error { func (m *LeaseManager) ReleaseLease(ctx context.Context, l *Lease) error { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case l.val.sem <- struct{}{}: } defer func() { <-l.val.sem }() diff --git a/pkg/internal/client/range_lookup.go b/pkg/internal/client/range_lookup.go index 2d4306eb92b2..313e07dd5515 100644 --- a/pkg/internal/client/range_lookup.go +++ b/pkg/internal/client/range_lookup.go @@ -251,7 +251,7 @@ func RangeLookup( key, prefetchedRanges) } - ctxErr := ctx.Err() + ctxErr := errors.WithStack(ctx.Err()) if ctxErr == nil { log.Fatalf(ctx, "retry loop broke before context expired") } diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index ebd4fc2bedef..84fab9119d1b 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -687,7 +687,7 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error) // error condition this loop isn't capable of handling. for { if err := ctx.Err(); err != nil { - return err + return errors.WithStack(err) } err = fn(ctx, txn) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index b206fe3eae25..4d63254fbc56 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -647,7 +647,7 @@ func RunAndWaitForTerminalState( for r := retry.StartWithCtx(ctx, retry.Options{}); ; { select { case <-ctx.Done(): - return 0, "", ctx.Err() + return 0, "", errors.WithStack(ctx.Err()) case execErr = <-execErrCh: // The closure finished, try to fetch a job id one more time. Close // and nil out execErrCh so it blocks from now on. @@ -671,7 +671,7 @@ func RunAndWaitForTerminalState( for r := retry.StartWithCtx(ctx, retry.Options{}); ; { select { case <-ctx.Done(): - return jobID, "", ctx.Err() + return jobID, "", errors.WithStack(ctx.Err()) case execErr = <-execErrCh: // The closure finished, this is a nice hint to wake up, but it only // works once. Close and nil out execErrCh so it blocks from now on. diff --git a/pkg/jobs/progress.go b/pkg/jobs/progress.go index 6d7fb22fd971..52f38a1f504c 100644 --- a/pkg/jobs/progress.go +++ b/pkg/jobs/progress.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/pkg/errors" ) // For both backups and restores, we compute progress as the number of completed @@ -90,7 +91,7 @@ func (jpl *ProgressLogger) Loop(ctx context.Context, chunkCh <-chan struct{}) er return nil } case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } } diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 04beabe6f4f3..153895aad829 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" ) const ( @@ -1167,7 +1168,7 @@ func (ds *DistSender) deduceRetryEarlyExitError(ctx context.Context) *roachpb.Er return roachpb.NewError(&roachpb.NodeUnavailableError{}) case <-ctx.Done(): // Happens when the client request is canceled. - return roachpb.NewError(ctx.Err()) + return roachpb.NewError(errors.WithStack(ctx.Err())) default: } return nil @@ -1381,8 +1382,8 @@ func (ds *DistSender) sendToReplicas( } // Has the caller given up? - if ctx.Err() != nil { - errMsg := fmt.Sprintf("context done during DistSender.Send: %s", ctx.Err()) + if err := ctx.Err(); err != nil { + errMsg := fmt.Sprintf("context done during DistSender.Send: %s", errors.WithStack(err)) log.Eventf(ctx, errMsg) if ambiguousError != nil { return nil, roachpb.NewAmbiguousResultError(errMsg) @@ -1391,7 +1392,7 @@ func (ds *DistSender) sendToReplicas( // were unable to reach a replica that could serve the request, and they // cause range cache evictions. Context cancellations just mean the // sender changed its mind or the request timed out. - return nil, ctx.Err() + return nil, errors.WithStack(err) } if transport.IsExhausted() { diff --git a/pkg/kv/dist_sender_rangefeed.go b/pkg/kv/dist_sender_rangefeed.go index 1ce04d5d408c..4444c304b0b8 100644 --- a/pkg/kv/dist_sender_rangefeed.go +++ b/pkg/kv/dist_sender_rangefeed.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" ) // RangeFeed divides a RangeFeed request on range boundaries and establishes a @@ -221,7 +222,7 @@ func (ds *DistSender) singleRangeFeed( select { case eventCh <- event: case <-ctx.Done(): - return argsCopy.Timestamp, roachpb.NewError(ctx.Err()) + return argsCopy.Timestamp, roachpb.NewError(errors.WithStack(ctx.Err())) } } } diff --git a/pkg/kv/range_cache.go b/pkg/kv/range_cache.go index 554ed4b0e30c..3be871f69aae 100644 --- a/pkg/kv/range_cache.go +++ b/pkg/kv/range_cache.go @@ -355,7 +355,7 @@ func (rdc *RangeDescriptorCache) lookupRangeDescriptorInternal( select { case res = <-resC: case <-ctxDone: - return nil, nil, ctx.Err() + return nil, nil, errors.WithStack(ctx.Err()) } if res.Shared { diff --git a/pkg/kv/range_cache_test.go b/pkg/kv/range_cache_test.go index 45e03b763718..b1353342422d 100644 --- a/pkg/kv/range_cache_test.go +++ b/pkg/kv/range_cache_test.go @@ -24,6 +24,7 @@ import ( "testing" "github.com/biogo/store/llrb" + "github.com/pkg/errors" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -111,7 +112,7 @@ func (db *testDescriptorDB) RangeLookup( select { case <-db.pauseChan: case <-ctx.Done(): - return nil, nil, ctx.Err() + return nil, nil, errors.WithStack(ctx.Err()) } atomic.AddInt64(&db.lookupCount, 1) diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index b84df731da96..574ff2f81c41 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -188,8 +188,8 @@ func (gt *grpcTransport) sendBatch( // Bail out early if the context is already canceled. (GRPC will // detect this pretty quickly, but the first check of the context // in the local server comes pretty late) - if ctx.Err() != nil { - return nil, ctx.Err() + if err := ctx.Err(); err != nil { + return nil, errors.WithStack(err) } gt.opts.metrics.SentCount.Inc(1) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index b18352040374..fa888b501efe 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -243,7 +243,7 @@ func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) { case <-c.stopper.ShouldStop(): return nil, errors.Errorf("stopped") case <-ctx.Done(): - return nil, ctx.Err() + return nil, errors.WithStack(ctx.Err()) } // If connection is invalid, return latest heartbeat error. @@ -406,7 +406,7 @@ func (a rangeFeedClientAdapter) Send(e *roachpb.RangeFeedEvent) error { case a.eventC <- e: return nil case <-a.ctx.Done(): - return a.ctx.Err() + return errors.WithStack(a.ctx.Err()) } } diff --git a/pkg/rpc/heartbeat_test.go b/pkg/rpc/heartbeat_test.go index bf1efe6f40ff..55980328885f 100644 --- a/pkg/rpc/heartbeat_test.go +++ b/pkg/rpc/heartbeat_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" ) func TestRemoteOffsetString(t *testing.T) { @@ -95,7 +96,7 @@ func (mhs *ManualHeartbeatService) Ping( return nil, err } case <-ctx.Done(): - return nil, ctx.Err() + return nil, errors.WithStack(ctx.Err()) case <-mhs.stopper.ShouldStop(): } hs := HeartbeatService{ diff --git a/pkg/server/admin.go b/pkg/server/admin.go index db1803c43b18..2c2b21dd1cb0 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -697,7 +697,7 @@ func (s *adminServer) tableStatsForSpan( } case <-ctx.Done(): // Caller gave up, stop doing work. - return nil, ctx.Err() + return nil, errors.WithStack(ctx.Err()) } } @@ -1316,7 +1316,7 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr case <-s.server.stopper.IsStopped(): return nil case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } diff --git a/pkg/server/node.go b/pkg/server/node.go index 261c2cf58e96..a8e21dc84f0c 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -699,7 +699,7 @@ func (n *Node) connectGossip(ctx context.Context) error { case <-n.stopper.ShouldStop(): return errors.New("stop called before we could connect to gossip") case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-n.storeCfg.Gossip.Connected: } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index f9a5b4e6fcd7..5b4c452773bb 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1038,7 +1038,7 @@ func (ex *connExecutor) run(ctx context.Context, cancel context.CancelFunc) erro for { ex.curStmt = nil if err := ctx.Err(); err != nil { - return err + return errors.WithStack(err) } cmd, pos, err := ex.stmtBuf.curCmd() diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 37acccbddc36..7acb2f502cc8 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -20,6 +20,8 @@ import ( "fmt" + "github.com/pkg/errors" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -33,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/opentracing/opentracing-go" - "github.com/pkg/errors" ) // To allow queries to send out flow RPCs in parallel, we use a pool of workers @@ -484,7 +485,7 @@ func (r *DistSQLReceiver) Push( r.resultWriter.SetError(r.txnAbortedErr.Load().(errWrap).err) } if r.resultWriter.Err() == nil && r.ctx.Err() != nil { - r.resultWriter.SetError(r.ctx.Err()) + r.resultWriter.SetError(errors.WithStack(r.ctx.Err())) } if r.resultWriter.Err() != nil { // TODO(andrei): We should drain here if we weren't canceled. diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 4258260bfab4..a9cfcc91e57d 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -244,9 +244,9 @@ func (c *conn) serveImpl( c.conn = newReadTimeoutConn(c.conn, func() error { // If the context was closed, it's time to bail. Either a higher-level // server or the command processor have canceled us. - if ctx.Err() != nil { + if err := ctx.Err(); err != nil { ctxCanceled = true - return ctx.Err() + return errors.WithStack(err) } // If the server is draining, we'll let the processor know by pushing a // DrainRequest. This will make the processor quit whenever it finds a good diff --git a/pkg/sql/planhook.go b/pkg/sql/planhook.go index dc780f3995a5..550578c53d4f 100644 --- a/pkg/sql/planhook.go +++ b/pkg/sql/planhook.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/pkg/errors" ) // planHookFn is a function that can intercept a statement being planned and @@ -147,7 +148,7 @@ func (f *hookFnNode) startExec(params runParams) error { func (f *hookFnNode) Next(params runParams) (bool, error) { select { case <-params.ctx.Done(): - return false, params.ctx.Err() + return false, errors.WithStack(params.ctx.Err()) case err := <-f.run.errCh: return false, err case f.run.row = <-f.run.resultsCh: diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index d535cc0701da..4d002e4ff30a 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -1209,7 +1209,7 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) { for { select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-doneCh: return nil default: diff --git a/pkg/storage/closedts/transport/server.go b/pkg/storage/closedts/transport/server.go index a5b8088c65a2..db6042d5730a 100644 --- a/pkg/storage/closedts/transport/server.go +++ b/pkg/storage/closedts/transport/server.go @@ -16,9 +16,10 @@ package transport import ( "context" - "errors" "time" + "github.com/pkg/errors" + "github.com/cockroachdb/cockroach/pkg/storage/closedts" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -85,7 +86,7 @@ func (s *Server) Get(client ctpb.InboundClient) error { var ok bool select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-s.stopper.ShouldQuiesce(): return errors.New("node is draining") case entry, ok = <-ch: diff --git a/pkg/storage/idalloc/id_alloc.go b/pkg/storage/idalloc/id_alloc.go index 7fe0254fb684..312c450ad870 100644 --- a/pkg/storage/idalloc/id_alloc.go +++ b/pkg/storage/idalloc/id_alloc.go @@ -92,7 +92,7 @@ func (ia *Allocator) Allocate(ctx context.Context) (uint32, error) { } return id, nil case <-ctx.Done(): - return 0, ctx.Err() + return 0, errors.WithStack(ctx.Err()) } } diff --git a/pkg/storage/intent_resolver.go b/pkg/storage/intent_resolver.go index 9b98f4d5c9fc..fcf5bb81e86c 100644 --- a/pkg/storage/intent_resolver.go +++ b/pkg/storage/intent_resolver.go @@ -925,7 +925,7 @@ func (ir *intentResolver) resolveIntents( } // Avoid doing any work on behalf of expired contexts. See // https://github.com/cockroachdb/cockroach/issues/15997. - if err := ctx.Err(); err != nil { + if err := errors.WithStack(ctx.Err()); err != nil { return err } log.Eventf(ctx, "resolving intents [wait=%t]", opts.Wait) diff --git a/pkg/storage/node_liveness.go b/pkg/storage/node_liveness.go index a41ad321c043..2e8da83336ca 100644 --- a/pkg/storage/node_liveness.go +++ b/pkg/storage/node_liveness.go @@ -281,7 +281,7 @@ func (nl *NodeLiveness) SetDecommissioning( select { case sem <- struct{}{}: case <-ctx.Done(): - return false, ctx.Err() + return false, errors.WithStack(ctx.Err()) } defer func() { <-sem @@ -339,7 +339,7 @@ func (nl *NodeLiveness) setDrainingInternal( select { case sem <- struct{}{}: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } defer func() { <-sem @@ -565,7 +565,7 @@ func (nl *NodeLiveness) heartbeatInternal( select { case sem <- struct{}{}: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } defer func() { <-sem @@ -727,7 +727,7 @@ func (nl *NodeLiveness) IncrementEpoch(ctx context.Context, liveness *Liveness) select { case sem <- struct{}{}: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } defer func() { <-sem @@ -794,7 +794,7 @@ func (nl *NodeLiveness) updateLiveness( for { // Before each attempt, ensure that the context has not expired. if err := ctx.Err(); err != nil { - return err + return errors.WithStack(err) } for _, eng := range nl.engines { // Synchronously writing to all disks before updating node liveness because diff --git a/pkg/storage/queue_test.go b/pkg/storage/queue_test.go index f0f75aa48f29..d2429a9c4796 100644 --- a/pkg/storage/queue_test.go +++ b/pkg/storage/queue_test.go @@ -785,7 +785,7 @@ func (pq *processTimeoutQueueImpl) process( ) error { <-ctx.Done() atomic.AddInt32(&pq.processed, 1) - return ctx.Err() + return errors.WithStack(ctx.Err()) } func TestBaseQueueProcessTimeout(t *testing.T) { diff --git a/pkg/storage/quota_pool.go b/pkg/storage/quota_pool.go index 376f2c632aad..a3c58f476fc4 100644 --- a/pkg/storage/quota_pool.go +++ b/pkg/storage/quota_pool.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/pkg/errors" ) type quotaPool struct { @@ -160,7 +161,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error { } qp.Unlock() - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-qp.done: // We don't need to 'unregister' ourselves as in the case when the // context is canceled. In fact, we want others waiters to only @@ -191,7 +192,7 @@ func (qp *quotaPool) acquire(ctx context.Context, v int64) error { qp.Lock() qp.notifyNextLocked() qp.Unlock() - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-qp.done: // We don't need to release quota back as all ongoing and // subsequent acquisitions will succeed immediately. diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index d998c6557760..d497ddf15396 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -2307,7 +2307,7 @@ func (r *Replica) beginCmds( // command queue we'll need to transfer our prerequisites to all // dependent commands if we want to cancel, so it's good to bail // out early if we can. - if err := ctx.Err(); err != nil { + if err := errors.WithStack(ctx.Err()); err != nil { log.VEventf(ctx, 2, "%s before command queue: %s", err, ba.Summary()) return nil, err } @@ -2381,7 +2381,7 @@ func (r *Replica) beginCmds( // If the prereq still has pending dependencies, migrate them. newCmd.ResolvePendingPrereq() case <-ctxDone: - err := ctx.Err() + err := errors.WithStack(ctx.Err()) log.VEventf(ctx, 2, "%s while in command queue: %s", err, ba) if fn := r.store.cfg.TestingKnobs.OnCommandQueueAction; fn != nil { @@ -3510,7 +3510,7 @@ func (r *Replica) propose( if err := ctx.Err(); err != nil { errStr := fmt.Sprintf("%s before proposing: %s", err, ba.Summary()) log.Warning(ctx, errStr) - return nil, nil, 0, roachpb.NewError(err) + return nil, nil, 0, roachpb.NewError(errors.WithStack(err)) } // Only need to check that the request is in bounds at proposal time, diff --git a/pkg/storage/replica_backpressure.go b/pkg/storage/replica_backpressure.go index e53b9da38792..c8046d4efde5 100644 --- a/pkg/storage/replica_backpressure.go +++ b/pkg/storage/replica_backpressure.go @@ -151,7 +151,7 @@ func (r *Replica) maybeBackpressureWriteBatch(ctx context.Context, ba roachpb.Ba // Wait for the callback to be called. select { case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case err := <-splitC: if err != nil { return errors.Wrap(err, "split failed while applying backpressure") diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index faeefb6fe741..0c678946a678 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -1060,7 +1060,7 @@ func RelocateRange( re := retry.StartWithCtx(ctx, retry.Options{MaxBackoff: 5 * time.Second}) for len(addTargets) > 0 { if err := ctx.Err(); err != nil { - return err + return errors.WithStack(err) } target := addTargets[0] @@ -1115,7 +1115,7 @@ func RelocateRange( re.Reset() for len(removeTargets) > 0 { if err := ctx.Err(); err != nil { - return err + return errors.WithStack(err) } target := removeTargets[0] @@ -1132,7 +1132,7 @@ func RelocateRange( } removeTargets = removeTargets[1:] } - return ctx.Err() + return errors.WithStack(ctx.Err()) } // adminScatter moves replicas and leaseholders for a selection of ranges. diff --git a/pkg/storage/replica_range_lease.go b/pkg/storage/replica_range_lease.go index 27992b7d55da..3a83c7aeb6f7 100644 --- a/pkg/storage/replica_range_lease.go +++ b/pkg/storage/replica_range_lease.go @@ -618,7 +618,7 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID return pErr.GoError() case <-ctx.Done(): transfer.Cancel() - return ctx.Err() + return errors.WithStack(ctx.Err()) } } // Wait for the in-progress extension without holding the mutex. @@ -630,7 +630,7 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID continue case <-ctx.Done(): extension.Cancel() - return ctx.Err() + return errors.WithStack(ctx.Err()) } } } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index ae1df3ead04f..32059e12ab48 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -3047,7 +3047,7 @@ func (s *Store) Send( for { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { - return nil, roachpb.NewError(err) + return nil, roachpb.NewError(errors.WithStack(err)) } // Get range and add command to the range for execution. @@ -3180,7 +3180,7 @@ func (s *Store) Send( case <-mergeCompleteCh: // Merge complete. Retry the command. case <-ctx.Done(): - return nil, roachpb.NewError(ctx.Err()) + return nil, roachpb.NewError(errors.WithStack(ctx.Err())) case <-s.stopper.ShouldQuiesce(): return nil, roachpb.NewError(&roachpb.NodeUnavailableError{}) } diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 86015f19399c..45584ea2c856 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -352,7 +352,7 @@ func (s *Store) reserveSnapshot( select { case s.snapshotApplySem <- struct{}{}: case <-ctx.Done(): - return nil, "", ctx.Err() + return nil, "", errors.WithStack(ctx.Err()) case <-s.stopper.ShouldStop(): return nil, "", errors.Errorf("stopped") default: @@ -362,7 +362,7 @@ func (s *Store) reserveSnapshot( select { case s.snapshotApplySem <- struct{}{}: case <-ctx.Done(): - return nil, "", ctx.Err() + return nil, "", errors.WithStack(ctx.Err()) case <-s.stopper.ShouldStop(): return nil, "", errors.Errorf("stopped") } diff --git a/pkg/storage/stores_server.go b/pkg/storage/stores_server.go index 2c90da1d4869..359a46c8a2fb 100644 --- a/pkg/storage/stores_server.go +++ b/pkg/storage/stores_server.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/pkg/errors" ) // Server implements PerReplicaServer. @@ -105,7 +106,7 @@ func (is Server) WaitForApplication( if ctx.Err() == nil { log.Fatal(ctx, "infinite retry loop exited but context has no error") } - return ctx.Err() + return errors.WithStack(ctx.Err()) }) return resp, err } diff --git a/pkg/storage/txnwait/txnqueue.go b/pkg/storage/txnwait/txnqueue.go index 9c9f28d22dd8..0759a28c959a 100644 --- a/pkg/storage/txnwait/txnqueue.go +++ b/pkg/storage/txnwait/txnqueue.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" ) const maxWaitForQueryTxn = 50 * time.Millisecond @@ -470,7 +471,7 @@ func (q *Queue) MaybeWaitForPush( case <-ctx.Done(): // Caller has given up. log.VEvent(ctx, 2, "pusher giving up due to context cancellation") - return nil, roachpb.NewError(ctx.Err()) + return nil, roachpb.NewError(errors.WithStack(ctx.Err())) case txn := <-push.pending: log.VEventf(ctx, 2, "result of pending push: %v", txn) @@ -653,7 +654,7 @@ func (q *Queue) MaybeWaitForQuery( select { case <-ctx.Done(): // Caller has given up. - return roachpb.NewError(ctx.Err()) + return roachpb.NewError(errors.WithStack(ctx.Err())) case <-maxWaitCh: return nil case <-query.pending: @@ -732,14 +733,14 @@ func (q *Queue) startQueryPusherTxn( // push waiter requires another query of the pusher txn. select { case <-ctx.Done(): - errCh <- roachpb.NewError(ctx.Err()) + errCh <- roachpb.NewError(errors.WithStack(ctx.Err())) return case <-readyCh: } // Reset the retry to query again immediately. r.Reset() } - errCh <- roachpb.NewError(ctx.Err()) + errCh <- roachpb.NewError(errors.WithStack(ctx.Err())) }); err != nil { errCh <- roachpb.NewError(err) } diff --git a/pkg/ts/server.go b/pkg/ts/server.go index 3d1fd37effc1..5626e0d0731c 100644 --- a/pkg/ts/server.go +++ b/pkg/ts/server.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -305,7 +306,7 @@ func (s *Server) Query( return nil, err } case <-ctx.Done(): - return nil, ctx.Err() + return nil, errors.WithStack(ctx.Err()) } } diff --git a/pkg/util/ctxgroup/ctxgroup.go b/pkg/util/ctxgroup/ctxgroup.go index 66e59eb9bda3..736eb526a4d8 100644 --- a/pkg/util/ctxgroup/ctxgroup.go +++ b/pkg/util/ctxgroup/ctxgroup.go @@ -37,7 +37,7 @@ are all example bugs that Cockroach has had during its use of errgroup: select { case ch <- val: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } return nil @@ -71,7 +71,7 @@ that doesn't shadow the original ctx: select { case ch <- val: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } return nil @@ -103,7 +103,7 @@ exit early. Contrast with using this package: select { case ch <- val: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } } return nil @@ -157,6 +157,7 @@ package ctxgroup import ( "context" + "github.com/pkg/errors" "golang.org/x/sync/errgroup" ) @@ -187,7 +188,7 @@ func (g Group) GoCtx(f func(ctx context.Context) error) { // Err returns the Group's ctx.Err(). func (g Group) Err() error { - return g.ctx.Err() + return errors.WithStack(g.ctx.Err()) } // GroupWorkers runs num worker go routines in an errgroup. diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 3f58806325ad..1aaa3d4ef8e0 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -16,7 +16,6 @@ package stop import ( "context" - "errors" "fmt" "net/http" "sort" @@ -24,6 +23,7 @@ import ( "sync" opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -342,7 +342,7 @@ func (s *Stopper) RunLimitedAsyncTask( select { case sem <- struct{}{}: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-s.ShouldQuiesce(): return ErrUnavailable default: @@ -354,7 +354,7 @@ func (s *Stopper) RunLimitedAsyncTask( select { case sem <- struct{}{}: case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) case <-s.ShouldQuiesce(): return ErrUnavailable } @@ -365,7 +365,7 @@ func (s *Stopper) RunLimitedAsyncTask( select { case <-ctx.Done(): <-sem - return ctx.Err() + return errors.WithStack(ctx.Err()) default: } diff --git a/pkg/util/stop/stopper_test.go b/pkg/util/stop/stopper_test.go index 95739177f6c6..9229d9196c09 100644 --- a/pkg/util/stop/stopper_test.go +++ b/pkg/util/stop/stopper_test.go @@ -625,7 +625,7 @@ func TestStopperRunLimitedAsyncTaskCancelContext(t *testing.T) { if err := s.RunAsyncTask(ctx, "test", func(ctx context.Context) { for i := 0; i < maxConcurrency*2; i++ { if err := s.RunLimitedAsyncTask(ctx, "test", sem, true, f); err != nil { - if err != context.Canceled { + if errors.Cause(err) != context.Canceled { t.Fatal(err) } atomic.AddInt32(&workersCanceled, 1) diff --git a/pkg/workload/csv.go b/pkg/workload/csv.go index 6a309c2d04f1..3c1d0e5b9413 100644 --- a/pkg/workload/csv.go +++ b/pkg/workload/csv.go @@ -48,7 +48,7 @@ func WriteCSVRows( select { case <-ctx.Done(): - return 0, ctx.Err() + return 0, errors.WithStack(ctx.Err()) default: } for _, row := range table.InitialRows.Batch(rowBatchIdx) { diff --git a/pkg/workload/tpcc/worker.go b/pkg/workload/tpcc/worker.go index 95d7718cd5f7..1c67b86fdee5 100644 --- a/pkg/workload/tpcc/worker.go +++ b/pkg/workload/tpcc/worker.go @@ -177,5 +177,5 @@ func (w *worker) run(ctx context.Context) error { } time.Sleep(time.Duration(thinkTime) * time.Second) } - return ctx.Err() + return errors.WithStack(ctx.Err()) } diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index a0cfb54b83fb..607de804556f 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -409,7 +409,7 @@ func Split(ctx context.Context, db *gosql.DB, table Table, concurrency int) erro log.Infof(ctx, "performing split %d of %d", count, len(splitPoints)) } case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } if p.lo < m { @@ -447,7 +447,7 @@ func Split(ctx context.Context, db *gosql.DB, table Table, concurrency int) erro log.Warningf(ctx, `%s: %s`, buf.String(), err) } case <-ctx.Done(): - return ctx.Err() + return errors.WithStack(ctx.Err()) } }