Skip to content

Commit

Permalink
Merge #56206
Browse files Browse the repository at this point in the history
56206: sql: make SupportsVectorized check light-weight r=yuzefovich a=yuzefovich

**colexec: pool allocations of some objects in the read path**

This commit pools the allocations of some objects that are created on
the simplest read path in the vectorized engine - when we have
a ColBatchScan connected with a Materializer.

Release note: None

**colbuilder: fix casting behavior for actual types mismatch**

We have recently merged a change that enforces that the colbuilder
produces an operator chain that outputs batches with the desired type
schema. This is enforced by planning casts when there is a mismatch.
Previously, we would only try planning a vectorized cast because the
assumption was that only integers of different widths will need to be
cast in some cases, but as it turns out types of string family also
might need to be cast (e.g. `string` and `"char"` aren't identical).
This is now fixed by falling back to row-execution casting when
a vectorized cast isn't supported.

Release note: None

**sql: make SupportsVectorized check light-weight**

Previously, in order to determine whether we supported the vectorization
of a flow, we would run a pretty expensive SupportsVectorized check that
performs a "fake flow setup" by actually creating all of the components
without running them. This has non-negligible performance impact on
KV-like workloads, so it has been optimized away in favor of a more
light-weight check that simply inspects the processor specs for the fact
whether the processor core can be vectorized (either natively or by
wrapping row-execution processor). All processor cores have been
audited to separate out all that we currently cannot wrap (usually
because they don't implement RowSource interface). Note that if a new
processor core is introduced and `canWrap` check is not updated, we
defensively assume that it cannot be wrapped and emit an assertion
failed error that - hopefully - should surface the fact that we need to
update the check.

Addresses: #53893.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Nov 5, 2020
2 parents 53e7cc5 + 16fc4fb commit 23d7256
Show file tree
Hide file tree
Showing 14 changed files with 460 additions and 331 deletions.
257 changes: 178 additions & 79 deletions pkg/sql/colexec/colbuilder/execplan.go

Large diffs are not rendered by default.

82 changes: 15 additions & 67 deletions pkg/sql/colexec/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,66 +11,34 @@
package colexec

import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

// ExprHelper is a utility interface that helps with expression handling in
// the vectorized engine.
type ExprHelper interface {
// ProcessExpr processes the given expression and returns a well-typed
// expression.
ProcessExpr(execinfrapb.Expression, *tree.SemaContext, *tree.EvalContext, []*types.T) (tree.TypedExpr, error)
var exprHelperPool = sync.Pool{
New: func() interface{} {
return &ExprHelper{}
},
}

// ExprDeserialization describes how expression deserialization should be
// handled in the vectorized engine.
type ExprDeserialization int

const (
// DefaultExprDeserialization is the default way of handling expression
// deserialization in which case LocalExpr field is used if set.
DefaultExprDeserialization ExprDeserialization = iota
// ForcedExprDeserialization is the way of handling expression
// deserialization in which case LocalExpr field is completely ignored and
// the serialized representation is always deserialized.
ForcedExprDeserialization
)

// NewExprHelper returns a new ExprHelper. forceExprDeserialization determines
// whether LocalExpr field is ignored by the helper.
func NewExprHelper(exprDeserialization ExprDeserialization) ExprHelper {
switch exprDeserialization {
case DefaultExprDeserialization:
return &defaultExprHelper{}
case ForcedExprDeserialization:
return &forcedDeserializationExprHelper{}
default:
colexecerror.InternalError(errors.AssertionFailedf("unexpected ExprDeserialization %d", exprDeserialization))
// This code is unreachable, but the compiler cannot infer that.
return nil
}
// NewExprHelper returns a new ExprHelper.
func NewExprHelper() *ExprHelper {
return exprHelperPool.Get().(*ExprHelper)
}

// defaultExprHelper is an ExprHelper that takes advantage of already present
// well-typed expression in LocalExpr when set.
type defaultExprHelper struct {
// ExprHelper is a utility struct that helps with expression handling in the
// vectorized engine.
type ExprHelper struct {
helper execinfrapb.ExprHelper
}

var _ ExprHelper = &defaultExprHelper{}

// NewDefaultExprHelper returns an ExprHelper that takes advantage of an already
// well-typed expression in LocalExpr when set.
func NewDefaultExprHelper() ExprHelper {
return &defaultExprHelper{}
}

func (h *defaultExprHelper) ProcessExpr(
// ProcessExpr processes the given expression and returns a well-typed
// expression.
func (h *ExprHelper) ProcessExpr(
expr execinfrapb.Expression,
semaCtx *tree.SemaContext,
evalCtx *tree.EvalContext,
Expand All @@ -84,26 +52,6 @@ func (h *defaultExprHelper) ProcessExpr(
return execinfrapb.DeserializeExpr(expr.Expr, semaCtx, evalCtx, &tempVars)
}

// forcedDeserializationExprHelper is an ExprHelper that always deserializes
// (namely, parses, type-checks, and evaluates the constants) the provided
// expression, completely ignoring LocalExpr field if set.
type forcedDeserializationExprHelper struct {
helper execinfrapb.ExprHelper
}

var _ ExprHelper = &forcedDeserializationExprHelper{}

func (h *forcedDeserializationExprHelper) ProcessExpr(
expr execinfrapb.Expression,
semaCtx *tree.SemaContext,
evalCtx *tree.EvalContext,
typs []*types.T,
) (tree.TypedExpr, error) {
h.helper.Types = typs
tempVars := tree.MakeIndexedVarHelper(&h.helper, len(typs))
return execinfrapb.DeserializeExpr(expr.Expr, semaCtx, evalCtx, &tempVars)
}

// Remove unused warning.
var _ = findIVarsInRange

Expand Down
67 changes: 53 additions & 14 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexec

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colconv"
Expand Down Expand Up @@ -49,7 +50,7 @@ type Materializer struct {
// row is the memory used for the output row.
row rowenc.EncDatumRow

// ouputRow stores the returned results of next() to be passed through an
// outputRow stores the returned results of next() to be passed through an
// adapter.
outputRow rowenc.EncDatumRow

Expand All @@ -70,17 +71,24 @@ type Materializer struct {
// trailing metadata state, which is meant only for internal metadata
// generation.
type drainHelper struct {
execinfrapb.MetadataSources
sources execinfrapb.MetadataSources
ctx context.Context
bufferedMeta []execinfrapb.ProducerMetadata
}

var _ execinfra.RowSource = &drainHelper{}
var _ execinfra.Releasable = &drainHelper{}

var drainHelperPool = sync.Pool{
New: func() interface{} {
return &drainHelper{}
},
}

func newDrainHelper(sources execinfrapb.MetadataSources) *drainHelper {
return &drainHelper{
MetadataSources: sources,
}
d := drainHelperPool.Get().(*drainHelper)
d.sources = sources
return d
}

// OutputTypes implements the RowSource interface.
Expand All @@ -99,7 +107,7 @@ func (d *drainHelper) Start(ctx context.Context) context.Context {
// Next implements the RowSource interface.
func (d *drainHelper) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
if d.bufferedMeta == nil {
d.bufferedMeta = d.DrainMeta(d.ctx)
d.bufferedMeta = d.sources.DrainMeta(d.ctx)
if d.bufferedMeta == nil {
// Still nil, avoid more calls to DrainMeta.
d.bufferedMeta = []execinfrapb.ProducerMetadata{}
Expand All @@ -119,8 +127,20 @@ func (d *drainHelper) ConsumerDone() {}
// ConsumerClosed implements the RowSource interface.
func (d *drainHelper) ConsumerClosed() {}

// Release implements the execinfra.Releasable interface.
func (d *drainHelper) Release() {
*d = drainHelper{}
drainHelperPool.Put(d)
}

const materializerProcName = "materializer"

var materializerPool = sync.Pool{
New: func() interface{} {
return &Materializer{}
},
}

// NewMaterializer creates a new Materializer processor which processes the
// columnar data coming from input to return it as rows.
// Arguments:
Expand Down Expand Up @@ -149,13 +169,15 @@ func NewMaterializer(
for i := range vecIdxsToConvert {
vecIdxsToConvert[i] = i
}
m := &Materializer{
input: input,
typs: typs,
drainHelper: newDrainHelper(metadataSourcesQueue),
converter: colconv.NewVecToDatumConverter(len(typs), vecIdxsToConvert),
row: make(rowenc.EncDatumRow, len(typs)),
closers: toClose,
m := materializerPool.Get().(*Materializer)
*m = Materializer{
ProcessorBase: m.ProcessorBase,
input: input,
typs: typs,
drainHelper: newDrainHelper(metadataSourcesQueue),
converter: colconv.NewVecToDatumConverter(len(typs), vecIdxsToConvert),
row: make(rowenc.EncDatumRow, len(typs)),
closers: toClose,
}

if err := m.ProcessorBase.Init(
Expand All @@ -169,7 +191,8 @@ func NewMaterializer(
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
InputsToDrain: []execinfra.RowSource{m.drainHelper},
// We append drainHelper to inputs to drain below in order to reuse
// the same underlying slice from the pooled materializer.
TrailingMetaCallback: func(ctx context.Context) []execinfrapb.ProducerMetadata {
m.close()
return nil
Expand All @@ -178,12 +201,15 @@ func NewMaterializer(
); err != nil {
return nil, err
}
m.AddInputToDrain(m.drainHelper)
m.FinishTrace = outputStatsToTrace
m.cancelFlow = cancelFlow
return m, nil
}

var _ execinfra.OpNode = &Materializer{}
var _ execinfra.Processor = &Materializer{}
var _ execinfra.Releasable = &Materializer{}

// ChildCount is part of the exec.OpNode interface.
func (m *Materializer) ChildCount(verbose bool) int {
Expand Down Expand Up @@ -278,3 +304,16 @@ func (m *Materializer) close() {
func (m *Materializer) ConsumerClosed() {
m.close()
}

// Release implements the execinfra.Releasable interface.
func (m *Materializer) Release() {
m.drainHelper.Release()
m.ProcessorBase.Reset()
*m = Materializer{
// We're keeping the reference to the same ProcessorBase since it
// allows us to reuse some of the slices as well as ProcOutputHelper
// struct.
ProcessorBase: m.ProcessorBase,
}
materializerPool.Put(m)
}
41 changes: 36 additions & 5 deletions pkg/sql/colexec/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexec

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
Expand All @@ -26,7 +27,7 @@ import (
// We inject this at test time, so tests can use NewColOperator from colexec
// package.
var TestNewColOperator func(ctx context.Context, flowCtx *execinfra.FlowCtx, args *NewColOperatorArgs,
) (r NewColOperatorResult, err error)
) (r *NewColOperatorResult, err error)

// NewColOperatorArgs is a helper struct that encompasses all of the input
// arguments to NewColOperator call.
Expand All @@ -37,7 +38,7 @@ type NewColOperatorArgs struct {
ProcessorConstructor execinfra.ProcessorConstructor
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper ExprHelper
ExprHelper *ExprHelper
TestingKnobs struct {
// UseStreamingMemAccountForBuffering specifies whether to use
// StreamingMemAccount when creating buffering operators and should only be
Expand Down Expand Up @@ -80,7 +81,37 @@ type NewColOperatorResult struct {
InternalMemUsage int
MetadataSources []execinfrapb.MetadataSource
// ToClose is a slice of components that need to be Closed.
ToClose []colexecbase.Closer
OpMonitors []*mon.BytesMonitor
OpAccounts []*mon.BoundAccount
ToClose []colexecbase.Closer
OpMonitors []*mon.BytesMonitor
OpAccounts []*mon.BoundAccount
Releasables []execinfra.Releasable
}

var _ execinfra.Releasable = &NewColOperatorResult{}

var newColOperatorResultPool = sync.Pool{
New: func() interface{} {
return &NewColOperatorResult{}
},
}

// GetNewColOperatorResult returns a new NewColOperatorResult.
func GetNewColOperatorResult() *NewColOperatorResult {
return newColOperatorResultPool.Get().(*NewColOperatorResult)
}

// Release implements the execinfra.Releasable interface.
func (r *NewColOperatorResult) Release() {
for _, releasable := range r.Releasables {
releasable.Release()
}
*r = NewColOperatorResult{
ColumnTypes: r.ColumnTypes[:0],
MetadataSources: r.MetadataSources[:0],
ToClose: r.ToClose[:0],
OpMonitors: r.OpMonitors[:0],
OpAccounts: r.OpAccounts[:0],
Releasables: r.Releasables[:0],
}
newColOperatorResultPool.Put(r)
}
Loading

0 comments on commit 23d7256

Please sign in to comment.