Skip to content

Commit

Permalink
Merge #62282
Browse files Browse the repository at this point in the history
62282: sql: hint scan batch size by expected row count r=yuzefovich a=jordanlewis

Might close #62198.

Previously, the "dynamic batch size" strategy for the vectorized
engine's batch allocator worked the same in every situation: batches
would start at size 1, then double on each re-allocation, until they hit
their maximum size of 1024.

Now, to improve performance for scans that return a number of rows
somewhere in between 1 and 1024, we pass in the optimizer's best guess
of the number of rows that the scan will produce all the way down into
the TableReader. That guess is used as the initial size of the batch if
it's less than 1024.

Release note (performance improvement): improve the performance for the
vectorized engine when scanning fewer than 1024 rows at a time.

Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
  • Loading branch information
craig[bot] and jordanlewis committed Mar 24, 2021
2 parents af06665 + 5041b22 commit a56498f
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 100 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ ALL_TESTS = [
"//pkg/sql/colexec:colexec_test",
"//pkg/sql/colexecerror:colexecerror_test",
"//pkg/sql/colexecop:colexecop_test",
"//pkg/sql/colfetcher:colfetcher_test",
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,11 @@ func NewColOperator(
if err := checkNumIn(inputs, 0); err != nil {
return r, err
}
scanOp, err := colfetcher.NewColBatchScan(ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post)

estimatedRowCount := spec.EstimatedRowCount
scanOp, err := colfetcher.NewColBatchScan(
ctx, streamingAllocator, flowCtx, evalCtx, core.TableReader, post, estimatedRowCount,
)
if err != nil {
return r, err
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/sql/colfetcher/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
Expand Down Expand Up @@ -47,3 +47,24 @@ stringer(
src = "cfetcher.go",
typ = "fetcherState",
)

go_test(
name = "colfetcher_test",
srcs = [
"main_test.go",
"vectorized_batch_size_test.go",
],
deps = [
"//pkg/base",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
],
)
34 changes: 31 additions & 3 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ type cFetcher struct {
// fetcher is the underlying fetcher that provides KVs.
fetcher *row.KVFetcher

// estimatedRowCount is the optimizer-derived number of expected rows that
// this fetch will produce, if non-zero.
estimatedRowCount uint64

// machine contains fields that get updated during the run of the fetcher.
machine struct {
// state is the queue of next states of the state machine. The 0th entry
Expand All @@ -269,6 +273,10 @@ type cFetcher struct {
// seekPrefix is the prefix to seek to in stateSeekPrefix.
seekPrefix roachpb.Key

// limitHint is a hint as to the number of rows that the caller expects to
// be returned from this fetch.
limitHint int

// remainingValueColsByIdx is the set of value columns that are yet to be
// seen during the decoding of the current row.
remainingValueColsByIdx util.FastIntSet
Expand Down Expand Up @@ -306,12 +314,19 @@ type cFetcher struct {
}
}

const cFetcherBatchMinCapacity = 1

func (rf *cFetcher) resetBatch(timestampOutputIdx, tableOidOutputIdx int) {
var reallocated bool
var estimatedRowCount int
// We need to transform our rf.estimatedRowCount, which is a uint64, into
// an int. We have to be careful: if we just cast it directly, a giant
// estimate will wrap around and become negative.
if rf.estimatedRowCount > uint64(coldata.BatchSize()) {
estimatedRowCount = coldata.BatchSize()
} else {
estimatedRowCount = int(rf.estimatedRowCount)
}
rf.machine.batch, reallocated = rf.allocator.ResetMaybeReallocate(
rf.typs, rf.machine.batch, cFetcherBatchMinCapacity, rf.memoryLimit,
rf.typs, rf.machine.batch, estimatedRowCount, rf.memoryLimit,
)
if reallocated {
rf.machine.colvecs = rf.machine.batch.ColVecs()
Expand Down Expand Up @@ -649,6 +664,7 @@ func (rf *cFetcher) StartScan(
}
rf.fetcher = f
rf.machine.lastRowPrefix = nil
rf.machine.limitHint = int(limitHint)
rf.machine.state[0] = stateInitFetch
return nil
}
Expand Down Expand Up @@ -1038,7 +1054,19 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
}
rf.machine.rowIdx++
rf.shiftState()

var emitBatch bool
if rf.machine.rowIdx >= rf.machine.batch.Capacity() {
// We have no more room in our batch, so output it immediately.
emitBatch = true
} else if rf.machine.limitHint > 0 && rf.machine.rowIdx >= rf.machine.limitHint {
// If we made it to our limit hint, output our batch early to make sure
// that we don't bother filling in extra data if we don't need to.
emitBatch = true
rf.machine.limitHint = 0
}

if emitBatch {
rf.pushState(stateResetBatch)
rf.machine.batch.SetLength(rf.machine.rowIdx)
rf.machine.rowIdx = 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func NewColBatchScan(
evalCtx *tree.EvalContext,
spec *execinfrapb.TableReaderSpec,
post *execinfrapb.PostProcessSpec,
estimatedRowCount uint64,
) (*ColBatchScan, error) {
// NB: we hit this with a zero NodeID (but !ok) with multi-tenancy.
if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok {
Expand Down Expand Up @@ -264,6 +265,7 @@ func NewColBatchScan(
}

fetcher := cFetcherPool.Get().(*cFetcher)
fetcher.estimatedRowCount = estimatedRowCount
if _, _, err := initCRowFetcher(
flowCtx.Codec(), allocator, execinfra.GetWorkMemLimit(flowCtx.Cfg),
fetcher, table, columnIdxMap, neededColumns, spec, spec.HasSystemColumns,
Expand Down
33 changes: 33 additions & 0 deletions pkg/sql/colfetcher/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colfetcher_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}
74 changes: 74 additions & 0 deletions pkg/sql/colfetcher/vectorized_batch_size_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colfetcher_test

import (
"context"
"regexp"
"strconv"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
)

// TestScanBatchSize tests that the the cfetcher's dynamic batch size algorithm
// uses the optimizer's estimated row count for its initial batch size. This
// test sets up a scan against a table with a known row count, and makes sure
// that the optimizer uses its statistics to produce an estimated row count that
// is equal to the number of rows in the table, allowing the fetcher to create
// a single batch for the scan.
func TestScanBatchSize(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderMetamorphic(t, "This test doesn't work with metamorphic batch sizes.")

testClusterArgs := base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
}
tc := testcluster.StartTestCluster(t, 1, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]

_, err := conn.ExecContext(ctx, `CREATE TABLE t (a PRIMARY KEY) AS SELECT generate_series(1, 511)`)
assert.NoError(t, err)

rows, err := conn.QueryContext(ctx, `EXPLAIN ANALYZE (VERBOSE, DISTSQL) SELECT * FROM t`)
assert.NoError(t, err)
batchCountRegex := regexp.MustCompile(`vectorized batch count: (\d+)`)
var found bool
var sb strings.Builder
for rows.Next() {
var res string
assert.NoError(t, rows.Scan(&res))
sb.WriteString(res)
sb.WriteByte('\n')
matches := batchCountRegex.FindStringSubmatch(res)
if len(matches) == 0 {
continue
}
foundBatches, err := strconv.Atoi(matches[1])
assert.NoError(t, err)
assert.Equal(t, 1, foundBatches, "should use just 1 batch to scan 511 rows")
found = true
break
}
if !found {
t.Fatalf("expected to find a vectorized batch count; found nothing. text:\n%s", sb.String())
}
}
5 changes: 2 additions & 3 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,9 @@ func (a *Allocator) NewMemBatchNoCols(typs []*types.T, capacity int) coldata.Bat
func (a *Allocator) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, minCapacity int, maxBatchMemSize int64,
) (newBatch coldata.Batch, reallocated bool) {
if minCapacity < 1 {
if minCapacity < 0 {
colexecerror.InternalError(errors.AssertionFailedf("invalid minCapacity %d", minCapacity))
}
if minCapacity > coldata.BatchSize() {
} else if minCapacity == 0 || minCapacity > coldata.BatchSize() {
minCapacity = coldata.BatchSize()
}
reallocated = true
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,7 @@ func (dsp *DistSQLPlanner) planTableReaders(
p.TotalEstimatedScannedRows += info.estimatedRowCount

corePlacement[i].NodeID = sp.Node
corePlacement[i].EstimatedRowCount = info.estimatedRowCount
corePlacement[i].Core.TableReader = tr
}

Expand Down
Loading

0 comments on commit a56498f

Please sign in to comment.