Skip to content

Commit

Permalink
colexec: propagate the set of needed columns in table reader spec
Browse files Browse the repository at this point in the history
This commit adds the propagation of the set of needed columns via the
table reader spec and that information is now used when setting up the
ColBatchScans. The row-by-row engine is not affected since it still
needs to set up the ProcOutputHelpers, but that is no longer needed in
the vectorized engine which gives us a couple of percent improvement on
KV microbenchmark.

Release note: None
  • Loading branch information
yuzefovich committed Nov 11, 2020
1 parent 2c41f6d commit 76c991e
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 336 deletions.
56 changes: 14 additions & 42 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,6 @@ func NewColOperator(

core := &spec.Core
post := &spec.Post
var procOutputHelper *execinfra.ProcOutputHelper

// resultPreSpecPlanningStateShallowCopy is a shallow copy of the result
// before any specs are planned. Used if there is a need to backtrack.
Expand Down Expand Up @@ -702,16 +701,14 @@ func NewColOperator(
if err := checkNumIn(inputs, 0); err != nil {
return r, err
}
scanOp, helper, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post)
scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post)
if err != nil {
return r, err
}
result.Op = scanOp
result.IOReader = scanOp
result.MetadataSources = append(result.MetadataSources, scanOp)
result.Releasables = append(result.Releasables, helper)
result.Releasables = append(result.Releasables, scanOp)
procOutputHelper = helper
// colBatchScan is wrapped with a cancel checker below, so we need to
// log its creation separately.
if log.V(1) {
Expand Down Expand Up @@ -1152,7 +1149,7 @@ func NewColOperator(
Op: result.Op,
ColumnTypes: result.ColumnTypes,
}
err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory, procOutputHelper)
err = ppr.planPostProcessSpec(ctx, flowCtx, evalCtx, args, post, factory)
if err != nil {
if log.V(2) {
log.Infof(
Expand Down Expand Up @@ -1244,9 +1241,7 @@ func (r opResult) planAndMaybeWrapOnExprAsFilter(
Op: r.Op,
ColumnTypes: r.ColumnTypes,
}
if err := ppr.planFilterExpr(
ctx, flowCtx, evalCtx, onExpr, args.StreamingMemAccount, factory, args.ExprHelper, nil, /* procOutputHelper */
); err != nil {
if err := ppr.planFilterExpr(ctx, flowCtx, evalCtx, onExpr, args.StreamingMemAccount, factory, args.ExprHelper); err != nil {
// ON expression planning failed. Fall back to planning the filter
// using row execution.
if log.V(2) {
Expand Down Expand Up @@ -1292,22 +1287,17 @@ func (r opResult) wrapPostProcessSpec(
}

// planPostProcessSpec plans the post processing stage specified in post on top
// of r.Op. It takes in an optional procOutputHelper which has already been
// initialized with post meaning that it already contains well-typed expressions
// which allows us to avoid redundant deserialization.
// of r.Op.
func (r *postProcessResult) planPostProcessSpec(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
args *colexec.NewColOperatorArgs,
post *execinfrapb.PostProcessSpec,
factory coldata.ColumnFactory,
procOutputHelper *execinfra.ProcOutputHelper,
) error {
if !post.Filter.Empty() {
if err := r.planFilterExpr(
ctx, flowCtx, evalCtx, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper, procOutputHelper,
); err != nil {
if err := r.planFilterExpr(ctx, flowCtx, evalCtx, post.Filter, args.StreamingMemAccount, factory, args.ExprHelper); err != nil {
return err
}
}
Expand All @@ -1320,16 +1310,10 @@ func (r *postProcessResult) planPostProcessSpec(
}
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
var renderedCols []uint32
for renderIdx, renderExpr := range post.RenderExprs {
var expr tree.TypedExpr
var err error
if procOutputHelper != nil {
expr = procOutputHelper.RenderExprs[renderIdx].Expr
} else {
expr, err = args.ExprHelper.ProcessExpr(renderExpr, semaCtx, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
for _, renderExpr := range post.RenderExprs {
expr, err := args.ExprHelper.ProcessExpr(renderExpr, semaCtx, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
var outputIdx int
r.Op, outputIdx, r.ColumnTypes, err = planProjectionOperators(
Expand Down Expand Up @@ -1453,10 +1437,7 @@ func (r opResult) updateWithPostProcessResult(ppr postProcessResult) {
copy(r.ColumnTypes, ppr.ColumnTypes)
}

// planFilterExpr creates all operators to implement filter expression. It takes
// in an optional procOutputHelper which has already been initialized with post
// meaning that it already contains well-typed expressions which allows us to
// avoid redundant deserialization.
// planFilterExpr creates all operators to implement filter expression.
func (r *postProcessResult) planFilterExpr(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
Expand All @@ -1465,20 +1446,11 @@ func (r *postProcessResult) planFilterExpr(
acc *mon.BoundAccount,
factory coldata.ColumnFactory,
helper *colexec.ExprHelper,
procOutputHelper *execinfra.ProcOutputHelper,
) error {
var (
expr tree.TypedExpr
err error
)
if procOutputHelper != nil {
expr = procOutputHelper.Filter.Expr
} else {
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
expr, err = helper.ProcessExpr(filter, semaCtx, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
expr, err := helper.ProcessExpr(filter, semaCtx, evalCtx, r.ColumnTypes)
if err != nil {
return err
}
if expr == tree.DNull {
// The filter expression is tree.DNull meaning that it is always false, so
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {

desc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t")
tr := execinfrapb.TableReaderSpec{
Table: *desc.TableDesc(),
Spans: make([]execinfrapb.TableReaderSpan, 1),
Table: *desc.TableDesc(),
Spans: make([]execinfrapb.TableReaderSpan, 1),
NeededColumns: []uint32{0},
}
var err error
tr.Spans[0].Span.Key, err = rowenc.TestingMakePrimaryIndexKey(desc, 0)
Expand Down
33 changes: 13 additions & 20 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,14 @@ func NewColBatchScan(
evalCtx *tree.EvalContext,
spec *execinfrapb.TableReaderSpec,
post *execinfrapb.PostProcessSpec,
) (*ColBatchScan, *execinfra.ProcOutputHelper, error) {
) (*ColBatchScan, error) {
// NB: we hit this with a zero NodeID (but !ok) with multi-tenancy.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok {
return nil, nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID")
return nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID")
}
if spec.IsCheck {
// cFetchers don't support these checks.
return nil, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set")
}

limitHint := execinfra.LimitHint(spec.LimitHint, post)
Expand Down Expand Up @@ -183,32 +187,21 @@ func NewColBatchScan(
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)
semaCtx.TypeResolver = resolver
if err := resolver.HydrateTypeSlice(ctx, typs); err != nil {
return nil, nil, err
}
helper := execinfra.NewProcOutputHelper()
if err := helper.Init(
post,
typs,
&semaCtx,
evalCtx,
nil, /* output */
); err != nil {
return nil, helper, err
return nil, err
}

neededColumns := helper.NeededColumns()
var neededColumns util.FastIntSet
for i := range spec.NeededColumns {
neededColumns.Add(int(spec.NeededColumns[i]))
}

fetcher := cFetcherPool.Get().(*cFetcher)
if spec.IsCheck {
// cFetchers don't support these checks.
return nil, helper, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set")
}
if _, _, err := initCRowFetcher(
flowCtx.Codec(), allocator, fetcher, table, int(spec.IndexIdx), columnIdxMap,
spec.Reverse, neededColumns, spec.Visibility, spec.LockingStrength, spec.LockingWaitPolicy,
sysColDescs,
); err != nil {
return nil, helper, err
return nil, err
}

s := colBatchScanPool.Get().(*ColBatchScan)
Expand All @@ -227,7 +220,7 @@ func NewColBatchScan(
parallelize: spec.Parallelize && limitHint == 0,
ResultTypes: typs,
}
return s, helper, nil
return s, nil
}

// initCRowFetcher initializes a row.cFetcher. See initRowFetcher.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,10 +973,10 @@ func initTableReaderSpec(
Visibility: n.colCfg.visibility,
LockingStrength: n.lockingStrength,
LockingWaitPolicy: n.lockingWaitPolicy,

// Retain the capacity of the spans slice.
Spans: s.Spans[:0],
HasSystemColumns: n.containsSystemColumns,
NeededColumns: n.colCfg.wantedColumnsOrdinals,
}
indexIdx, err := getIndexIdx(n.index, n.desc)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (e *distSQLSpecExecFactory) ConstructScan(
// Retain the capacity of the spans slice.
Spans: trSpec.Spans[:0],
HasSystemColumns: scanContainsSystemColumns(&colCfg),
NeededColumns: colCfg.wantedColumnsOrdinals,
}
trSpec.IndexIdx, err = getIndexIdx(indexDesc, tabDesc)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/exec_factory_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,19 @@ func makeScanColumnsConfig(table cat.Table, cols exec.TableColumnOrdinalSet) sca
// include (or not include). Note that when wantedColumns is non-empty,
// the visibility flag will never trigger the addition of more columns.
colCfg := scanColumnsConfig{
wantedColumns: make([]tree.ColumnID, 0, cols.Len()),
visibility: execinfra.ScanVisibilityPublicAndNotPublic,
wantedColumns: make([]tree.ColumnID, 0, cols.Len()),
wantedColumnsOrdinals: make([]uint32, 0, cols.Len()),
visibility: execinfra.ScanVisibilityPublicAndNotPublic,
}
for ord, ok := cols.Next(0); ok; ord, ok = cols.Next(ord + 1) {
col := table.Column(ord)
colOrd := ord
if col.Kind() == cat.VirtualInverted {
col = table.Column(col.InvertedSourceColumnOrdinal())
colOrd = col.InvertedSourceColumnOrdinal()
col = table.Column(colOrd)
}
colCfg.wantedColumns = append(colCfg.wantedColumns, tree.ColumnID(col.ColID()))
colCfg.wantedColumnsOrdinals = append(colCfg.wantedColumnsOrdinals, uint32(colOrd))
}
return colCfg
}
Expand Down
Loading

0 comments on commit 76c991e

Please sign in to comment.