Skip to content

Commit

Permalink
Merge pull request #69000 from yuzefovich/backport21.1-68567
Browse files Browse the repository at this point in the history
release-21.1: colrpc: enhance warnings from the outbox
  • Loading branch information
yuzefovich authored Aug 30, 2021
2 parents 6859359 + bc96c37 commit a2725bd
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v2//:apd", # keep
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_marusama_semaphore//:semaphore",
],
)
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,9 @@ func (r opResult) createAndWrapRowSource(
if args.ProcessorConstructor == nil {
return errors.New("processorConstructor is nil")
}
log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow because %v", causeToWrap)
log.VEventf(ctx, 1, "planning a row-execution processor in the vectorized flow: %v", causeToWrap)
if err := canWrap(flowCtx.EvalCtx.SessionData.VectorizeMode, spec); err != nil {
log.VEventf(ctx, 1, "planning a wrapped processor failed because %v", err)
log.VEventf(ctx, 1, "planning a wrapped processor failed: %v", err)
// Return the original error for why we don't support this spec
// natively since it is more interesting.
return causeToWrap
Expand Down Expand Up @@ -671,7 +671,7 @@ func NewColOperator(
}
result.OpMonitors = result.OpMonitors[:0]
if returnedErr != nil {
log.VEventf(ctx, 1, "vectorized planning failed with %v", returnedErr)
log.VEventf(ctx, 1, "vectorized planning failed: %v", returnedErr)
}
}
if panicErr != nil {
Expand Down Expand Up @@ -1365,7 +1365,7 @@ func NewColOperator(
streamingAllocator, r.Op, i, castedIdx, actual, expected,
)
if err != nil {
return r, errors.AssertionFailedf("unexpectedly couldn't plan a cast although IsCastSupported returned true: %v", err)
return r, errors.NewAssertionErrorWithWrappedErrf(err, "unexpectedly couldn't plan a cast although IsCastSupported returned true")
}
projection[i] = uint32(castedIdx)
typesWithCasts = append(typesWithCasts, expected)
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/colexec/hash_based_partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/marusama/semaphore"
)

Expand Down Expand Up @@ -119,7 +120,7 @@ type hashBasedPartitioner struct {
colexecop.CloserHelper

unlimitedAllocator *colmem.Allocator
name string
name redact.SafeString
state hashBasedPartitionerState
inputs []colexecop.Operator
inputTypes [][]*types.T
Expand Down Expand Up @@ -208,7 +209,7 @@ func newHashBasedPartitioner(
unlimitedAllocator *colmem.Allocator,
flowCtx *execinfra.FlowCtx,
args *colexecargs.NewColOperatorArgs,
name string,
name redact.SafeString,
inputs []colexecop.Operator,
inputTypes [][]*types.T,
hashCols [][]uint32,
Expand Down Expand Up @@ -533,7 +534,7 @@ StateChanged:
if partitionInfo.memSize <= op.maxPartitionSizeToProcessUsingMain {
log.VEventf(ctx, 2,
`%s processes partition with idx %d of size %s using the "main" strategy`,
op.name, partitionIdx, humanizeutil.IBytes(partitionInfo.memSize),
op.name, partitionIdx, redact.SafeString(humanizeutil.IBytes(partitionInfo.memSize)),
)
for i := range op.partitionedInputs {
op.partitionedInputs[i].partitionIdx = partitionIdx
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"@com_github_apache_arrow_go_arrow//array",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)

Expand Down
20 changes: 15 additions & 5 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

// flowStreamClient is a utility interface used to mock out the RPC layer.
Expand Down Expand Up @@ -220,7 +221,10 @@ func (o *Outbox) Run(
// called, for all other errors flowCtxCancel is. The given error is logged with
// the associated opName.
func handleStreamErr(
ctx context.Context, opName string, err error, flowCtxCancel, outboxCtxCancel context.CancelFunc,
ctx context.Context,
opName redact.SafeString,
err error,
flowCtxCancel, outboxCtxCancel context.CancelFunc,
) {
if err == io.EOF {
if log.V(1) {
Expand All @@ -233,9 +237,9 @@ func handleStreamErr(
}
}

func (o *Outbox) moveToDraining(ctx context.Context) {
func (o *Outbox) moveToDraining(ctx context.Context, reason redact.RedactableString) {
if atomic.CompareAndSwapUint32(&o.draining, 0, 1) {
log.VEvent(ctx, 2, "Outbox moved to draining")
log.VEventf(ctx, 2, "Outbox moved to draining (%s)", reason)
}
}

Expand Down Expand Up @@ -390,15 +394,21 @@ func (o *Outbox) runWithStream(
log.VEventf(ctx, 2, "Outbox received handshake: %v", msg.Handshake)
case msg.DrainRequest != nil:
log.VEventf(ctx, 2, "Outbox received drain request")
o.moveToDraining(ctx)
o.moveToDraining(ctx, "consumer requested draining" /* reason */)
}
}
close(waitCh)
}()

terminatedGracefully, errToSend := o.sendBatches(ctx, stream, flowCtxCancel, outboxCtxCancel)
if terminatedGracefully || errToSend != nil {
o.moveToDraining(ctx)
var reason redact.RedactableString
if errToSend != nil {
reason = redact.Sprintf("encountered error when sending batches: %v", errToSend)
} else {
reason = redact.Sprint(redact.SafeString("terminated gracefully"))
}
o.moveToDraining(ctx, reason)
if err := o.sendMetadata(ctx, stream, errToSend); err != nil {
handleStreamErr(ctx, "Send (metadata)", err, flowCtxCancel, outboxCtxCancel)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string {
f.tempStorage.path = filepath.Join(f.Cfg.TempStoragePath, tempDirName)
log.VEventf(ctx, 1, "flow %s spilled to disk, stack trace: %s", f.ID, util.GetSmallTrace(2))
if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path); err != nil {
colexecerror.InternalError(errors.Wrapf(err, "unable to create temporary storage directory"))
colexecerror.InternalError(errors.Wrap(err, "unable to create temporary storage directory"))
}
return f.tempStorage.path
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (ds *ServerImpl) Drain(
ctx context.Context, flowDrainWait time.Duration, reporter func(int, redact.SafeString),
) {
if err := ds.setDraining(true); err != nil {
log.Warningf(ctx, "unable to gossip distsql draining state: %s", err)
log.Warningf(ctx, "unable to gossip distsql draining state: %v", err)
}

flowWait := flowDrainWait
Expand Down

0 comments on commit a2725bd

Please sign in to comment.