Skip to content

Commit

Permalink
Merge #56540
Browse files Browse the repository at this point in the history
56540: colexec: propagate the set of needed columns in table reader spec r=yuzefovich a=yuzefovich

This commit adds the propagation of the set of needed columns via the
table reader spec and that information is now used when setting up the
ColBatchScans. The row-by-row engine is not affected since it still
needs to set up the ProcOutputHelpers, but that is no longer needed in
the vectorized engine which gives us a couple of percent improvement on
KV microbenchmark.

Addresses: #53893

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Nov 11, 2020
2 parents efa820a + 76c991e commit ba363c7
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 336 deletions.
56 changes: 14 additions & 42 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,6 @@ func NewColOperator(

core := &spec.Core
post := &spec.Post
var procOutputHelper *execinfra.ProcOutputHelper

// resultPreSpecPlanningStateShallowCopy is a shallow copy of the result
// before any specs are planned. Used if there is a need to backtrack.
Expand Down Expand Up @@ -702,16 +701,14 @@ func NewColOperator(
if err := checkNumIn(inputs, 0); err != nil {
return r, err
}
scanOp, helper, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post)
scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post)
if err != nil {
return r, err
}
result.Op = scanOp
result.IOReader = scanOp
result.MetadataSources = append(result.MetadataSources, scanOp)
result.Releasables = append(result.Releasables, helper)
result.Releasables = append(result.Releasables, scanOp)
procOutputHelper = helper
// colBatchScan is wrapped with a cancel checker below, so we need to
// log its creation separately.
if log.V(1) {
Expand Down Expand Up @@ -1152,7 +1149,7 @@ func NewColOperator(
Op: result.Op,
ColumnTypes: result.ColumnTypes,
}
err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory, procOutputHelper)
err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory)
if err != nil {
if log.V(2) {
log.Infof(
Expand Down Expand Up @@ -1244,9 +1241,7 @@ func (r opResult) planAndMaybeWrapOnExprAsFilter(
Op: r.Op,
ColumnTypes: r.ColumnTypes,
}
if err := ppr.planFilterExpr(
ctx, flowCtx, evalCtx, onExpr, args.StreamingMemAccount, factory, args.ExprHelper, nil, /* procOutputHelper */
); err != nil {
if err := ppr.planFilterExpr(ctx, flowCtx, evalCtx, onExpr, args.StreamingMemAccount, factory, args.ExprHelper); err != nil {
// ON expression planning failed. Fall back to planning the filter
// using row execution.
if log.V(2) {
Expand Down Expand Up @@ -1292,22 +1287,17 @@ func (r opResult) wrapPostProcessSpec(
}

// planPostProcessSpec plans the post processing stage specified in post on top
// of r.Op. It takes in an optional procOutputHelper which has already been
// initialized with post meaning that it already contains well-typed expressions
// which allows us to avoid redundant deserialization.
// of r.Op.
func (r *postProcessResult) planPostProcessSpec(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
args *colexec.NewColOperatorArgs,
post *execinfrapb.PostProcessSpec,
factory coldata.ColumnFactory,
procOutputHelper *execinfra.ProcOutputHelper,
) error {
if !post.Filter.Empty() {
if err := r.planFilterExpr(
ctx, flowCtx, evalCtx, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, procOutputHelper,
); err != nil {
if err := r.planFilterExpr(ctx, flowCtx, evalCtx, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper); err != nil {
return err
}
}
Expand All @@ -1320,16 +1310,10 @@ func (r *postProcessResult) planPostProcessSpec(
}
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
var renderedCols []uint32
for renderIdx, renderExpr := range post.RenderExprs {
var expr tree.TypedExpr
var err error
if procOutputHelper != nil {
expr = procOutputHelper.RenderExprs[renderIdx].Expr
} else {
expr, err = args.ExprHelper.ProcessExpr(renderExpr, semaCtx, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
for _, renderExpr := range post.RenderExprs {
expr, err := args.ExprHelper.ProcessExpr(renderExpr, semaCtx, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
var outputIdx int
r.Op, outputIdx, r.ColumnTypes, err = planProjectionOperators(
Expand Down Expand Up @@ -1453,10 +1437,7 @@ func (r opResult) updateWithPostProcessResult(ppr postProcessResult) {
copy(r.ColumnTypes, ppr.ColumnTypes)
}

// planFilterExpr creates all operators to implement filter expression. It takes
// in an optional procOutputHelper which has already been initialized with post
// meaning that it already contains well-typed expressions which allows us to
// avoid redundant deserialization.
// planFilterExpr creates all operators to implement filter expression.
func (r *postProcessResult) planFilterExpr(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand All @@ -1465,20 +1446,11 @@ func (r *postProcessResult) planFilterExpr(
acc *mon.BoundAccount,
factory coldata.ColumnFactory,
helper *colexec.ExprHelper,
procOutputHelper *execinfra.ProcOutputHelper,
) error {
var (
expr tree.TypedExpr
err error
)
if procOutputHelper != nil {
expr = procOutputHelper.Filter.Expr
} else {
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
expr, err = helper.ProcessExpr(filter, semaCtx, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
expr, err := helper.ProcessExpr(filter, semaCtx, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
if expr == tree.DNull {
// The filter expression is tree.DNull meaning that it is always false, so
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {

desc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t")
tr := execinfrapb.TableReaderSpec{
Table: *desc.TableDesc(),
Spans: make([]execinfrapb.TableReaderSpan, 1),
Table: *desc.TableDesc(),
Spans: make([]execinfrapb.TableReaderSpan, 1),
NeededColumns: []uint32{0},
}
var err error
tr.Spans[0].Span.Key, err = rowenc.TestingMakePrimaryIndexKey(desc, 0)
Expand Down
33 changes: 13 additions & 20 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,14 @@ func NewColBatchScan(
evalCtx *tree.EvalContext,
spec *execinfrapb.TableReaderSpec,
post *execinfrapb.PostProcessSpec,
) (*ColBatchScan, *execinfra.ProcOutputHelper, error) {
) (*ColBatchScan, error) {
// NB: we hit this with a zero NodeID (but !ok) with multi-tenancy.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok {
return nil, nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID")
return nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID")
}
if spec.IsCheck {
// cFetchers don't support these checks.
return nil, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set")
}

limitHint := execinfra.LimitHint(spec.LimitHint, post)
Expand Down Expand Up @@ -183,32 +187,21 @@ func NewColBatchScan(
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)
semaCtx.TypeResolver = resolver
if err := resolver.HydrateTypeSlice(ctx, typs); err != nil {
return nil, nil, err
}
helper := execinfra.NewProcOutputHelper()
if err := helper.Init(
post,
typs,
&semaCtx,
evalCtx,
nil, /* output */
); err != nil {
return nil, helper, err
return nil, err
}

neededColumns := helper.NeededColumns()
var neededColumns util.FastIntSet
for i := range spec.NeededColumns {
neededColumns.Add(int(spec.NeededColumns[i]))
}

fetcher := cFetcherPool.Get().(*cFetcher)
if spec.IsCheck {
// cFetchers don't support these checks.
return nil, helper, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set")
}
if _, _, err := initCRowFetcher(
flowCtx.Codec(), allocator, fetcher, table, int(spec.IndexIdx), columnIdxMap,
spec.Reverse, neededColumns, spec.Visibility, spec.LockingStrength, spec.LockingWaitPolicy,
sysColDescs,
); err != nil {
return nil, helper, err
return nil, err
}

s := colBatchScanPool.Get().(*ColBatchScan)
Expand All @@ -227,7 +220,7 @@ func NewColBatchScan(
parallelize: spec.Parallelize && limitHint == 0,
ResultTypes: typs,
}
return s, helper, nil
return s, nil
}

// initCRowFetcher initializes a row.cFetcher. See initRowFetcher.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,10 +973,10 @@ func initTableReaderSpec(
Visibility: n.colCfg.visibility,
LockingStrength: n.lockingStrength,
LockingWaitPolicy: n.lockingWaitPolicy,

// Retain the capacity of the spans slice.
Spans: s.Spans[:0],
HasSystemColumns: n.containsSystemColumns,
NeededColumns: n.colCfg.wantedColumnsOrdinals,
}
indexIdx, err := getIndexIdx(n.index, n.desc)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (e *distSQLSpecExecFactory) ConstructScan(
// Retain the capacity of the spans slice.
Spans: trSpec.Spans[:0],
HasSystemColumns: scanContainsSystemColumns(&colCfg),
NeededColumns: colCfg.wantedColumnsOrdinals,
}
trSpec.IndexIdx, err = getIndexIdx(indexDesc, tabDesc)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,19 @@ func makeScanColumnsConfig(table cat.Table, cols exec.TableColumnOrdinalSet) sca
// include (or not include). Note that when wantedColumns is non-empty,
// the visibility flag will never trigger the addition of more columns.
colCfg := scanColumnsConfig{
wantedColumns: make([]tree.ColumnID, 0, cols.Len()),
visibility: execinfra.ScanVisibilityPublicAndNotPublic,
wantedColumns: make([]tree.ColumnID, 0, cols.Len()),
wantedColumnsOrdinals: make([]uint32, 0, cols.Len()),
visibility: execinfra.ScanVisibilityPublicAndNotPublic,
}
for ord, ok := cols.Next(0); ok; ord, ok = cols.Next(ord + 1) {
col := table.Column(ord)
colOrd := ord
if col.Kind() == cat.VirtualInverted {
col = table.Column(col.InvertedSourceColumnOrdinal())
colOrd = col.InvertedSourceColumnOrdinal()
col = table.Column(colOrd)
}
colCfg.wantedColumns = append(colCfg.wantedColumns, tree.ColumnID(col.ColID()))
colCfg.wantedColumnsOrdinals = append(colCfg.wantedColumnsOrdinals, uint32(colOrd))
}
return colCfg
}
Expand Down
Loading

0 comments on commit ba363c7

Please sign in to comment.