From bc96c37381a5fe867e805e510dca733a682dfc38 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 9 Aug 2021 10:44:18 -0700 Subject: [PATCH] colrpc: enhance warnings from the outbox This commit marks several string constants as "safe" from the redactability perspective so that the warnings logged by the outboxes are more helpful. Additionally, several minor nits around error formatting are addressed. Release note: None --- pkg/sql/colexec/BUILD.bazel | 1 + pkg/sql/colexec/colbuilder/execplan.go | 8 ++++---- pkg/sql/colexec/hash_based_partitioner.go | 7 ++++--- pkg/sql/colflow/colrpc/BUILD.bazel | 1 + pkg/sql/colflow/colrpc/outbox.go | 20 +++++++++++++++----- pkg/sql/colflow/vectorized_flow.go | 2 +- pkg/sql/distsql/server.go | 2 +- 7 files changed, 27 insertions(+), 14 deletions(-) diff --git a/pkg/sql/colexec/BUILD.bazel b/pkg/sql/colexec/BUILD.bazel index 632467720955..ca2d05d59e4c 100644 --- a/pkg/sql/colexec/BUILD.bazel +++ b/pkg/sql/colexec/BUILD.bazel @@ -78,6 +78,7 @@ go_library( "//pkg/util/tracing", "@com_github_cockroachdb_apd_v2//:apd", # keep "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", "@com_github_marusama_semaphore//:semaphore", ], ) diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 74ace5e1a1eb..a3fa0290d563 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -559,9 +559,9 @@ func (r opResult) createAndWrapRowSource( if args.ProcessorConstructor == nil { return errors.New("processorConstructor is nil") } - log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow because %v", causeToWrap) + log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap) if err := canWrap(flowCtx.EvalCtx.SessionData.VectorizeMode, spec); err != nil { - log.VEventf(ctx, 1, "planning a wrapped processor failed because %v", err) + log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err) // Return the original error for why we don't support this spec // natively since it is more interesting. return causeToWrap @@ -671,7 +671,7 @@ func NewColOperator( } result.OpMonitors = result.OpMonitors[:0] if returnedErr != nil { - log.VEventf(ctx, 1, "vectorized planning failed with %v", returnedErr) + log.VEventf(ctx, 1, "vectorized planning failed: %v", returnedErr) } } if panicErr != nil { @@ -1365,7 +1365,7 @@ func NewColOperator( streamingAllocator, r.Op, i, castedIdx, actual, expected, ) if err != nil { - return r, errors.AssertionFailedf("unexpectedly couldn't plan a cast although IsCastSupported returned true: %v", err) + return r, errors.NewAssertionErrorWithWrappedErrf(err, "unexpectedly couldn't plan a cast although IsCastSupported returned true") } projection[i] = uint32(castedIdx) typesWithCasts = append(typesWithCasts, expected) diff --git a/pkg/sql/colexec/hash_based_partitioner.go b/pkg/sql/colexec/hash_based_partitioner.go index 03de91e60673..b1be93eb08fa 100644 --- a/pkg/sql/colexec/hash_based_partitioner.go +++ b/pkg/sql/colexec/hash_based_partitioner.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/marusama/semaphore" ) @@ -119,7 +120,7 @@ type hashBasedPartitioner struct { colexecop.CloserHelper unlimitedAllocator *colmem.Allocator - name string + name redact.SafeString state hashBasedPartitionerState inputs []colexecop.Operator inputTypes [][]*types.T @@ -208,7 +209,7 @@ func newHashBasedPartitioner( unlimitedAllocator *colmem.Allocator, flowCtx *execinfra.FlowCtx, args *colexecargs.NewColOperatorArgs, - name string, + name redact.SafeString, inputs []colexecop.Operator, inputTypes [][]*types.T, hashCols [][]uint32, @@ -533,7 +534,7 @@ StateChanged: if partitionInfo.memSize <= op.maxPartitionSizeToProcessUsingMain { log.VEventf(ctx, 2, `%s processes partition with idx %d of size %s using the "main" strategy`, - op.name, partitionIdx, humanizeutil.IBytes(partitionInfo.memSize), + op.name, partitionIdx, redact.SafeString(humanizeutil.IBytes(partitionInfo.memSize)), ) for i := range op.partitionedInputs { op.partitionedInputs[i].partitionIdx = partitionIdx diff --git a/pkg/sql/colflow/colrpc/BUILD.bazel b/pkg/sql/colflow/colrpc/BUILD.bazel index 211b407fda34..80d08ac5a502 100644 --- a/pkg/sql/colflow/colrpc/BUILD.bazel +++ b/pkg/sql/colflow/colrpc/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "@com_github_apache_arrow_go_arrow//array", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", + "@com_github_cockroachdb_redact//:redact", ], ) diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index 6e187c919f17..67d13652cc33 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" + "github.com/cockroachdb/redact" ) // flowStreamClient is a utility interface used to mock out the RPC layer. @@ -220,7 +221,10 @@ func (o *Outbox) Run( // called, for all other errors flowCtxCancel is. The given error is logged with // the associated opName. func handleStreamErr( - ctx context.Context, opName string, err error, flowCtxCancel, outboxCtxCancel context.CancelFunc, + ctx context.Context, + opName redact.SafeString, + err error, + flowCtxCancel, outboxCtxCancel context.CancelFunc, ) { if err == io.EOF { if log.V(1) { @@ -233,9 +237,9 @@ func handleStreamErr( } } -func (o *Outbox) moveToDraining(ctx context.Context) { +func (o *Outbox) moveToDraining(ctx context.Context, reason redact.RedactableString) { if atomic.CompareAndSwapUint32(&o.draining, 0, 1) { - log.VEvent(ctx, 2, "Outbox moved to draining") + log.VEventf(ctx, 2, "Outbox moved to draining (%s)", reason) } } @@ -390,7 +394,7 @@ func (o *Outbox) runWithStream( log.VEventf(ctx, 2, "Outbox received handshake: %v", msg.Handshake) case msg.DrainRequest != nil: log.VEventf(ctx, 2, "Outbox received drain request") - o.moveToDraining(ctx) + o.moveToDraining(ctx, "consumer requested draining" /* reason */) } } close(waitCh) @@ -398,7 +402,13 @@ func (o *Outbox) runWithStream( terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel, outboxCtxCancel) if terminatedGracefully || errToSend != nil { - o.moveToDraining(ctx) + var reason redact.RedactableString + if errToSend != nil { + reason = redact.Sprintf("encountered error when sending batches: %v", errToSend) + } else { + reason = redact.Sprint(redact.SafeString("terminated gracefully")) + } + o.moveToDraining(ctx, reason) if err := o.sendMetadata(ctx, stream, errToSend); err != nil { handleStreamErr(ctx, "Send (metadata)", err, flowCtxCancel, outboxCtxCancel) } else { diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index d59a6fb91ee8..1f8b574848b4 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -241,7 +241,7 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string { f.tempStorage.path = filepath.Join(f.Cfg.TempStoragePath, tempDirName) log.VEventf(ctx, 1, "flow %s spilled to disk, stack trace: %s", f.ID, util.GetSmallTrace(2)) if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path); err != nil { - colexecerror.InternalError(errors.Wrapf(err, "unable to create temporary storage directory")) + colexecerror.InternalError(errors.Wrap(err, "unable to create temporary storage directory")) } return f.tempStorage.path } diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 3a1c520551a4..4946ed18e7d1 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -142,7 +142,7 @@ func (ds *ServerImpl) Drain( ctx context.Context, flowDrainWait time.Duration, reporter func(int, redact.SafeString), ) { if err := ds.setDraining(true); err != nil { - log.Warningf(ctx, "unable to gossip distsql draining state: %s", err) + log.Warningf(ctx, "unable to gossip distsql draining state: %v", err) } flowWait := flowDrainWait