Skip to content

Commit

Permalink
Merge #47309 #48107
Browse files Browse the repository at this point in the history
47309: colexec: resolve miscellaneous TODOs and do some cleanups r=yuzefovich a=yuzefovich

**colexec: unify spacing in templates**

This commit removes the space after `{{` and before `}}` in templating
directives which unifies the code base (and brings it in line with the
godoc's examples usage).

Release note: None

**colexec: minor refactor of merge joiner**

This commit does the following refactor of the merge joiner:
1. it bumps the "output batch size" for `count(*)` queries (when we
don't need to populate the output other than the number of tuples) to
max int. Previously this was limited due to `uint16` length of the
batch.
2. in the probe phase, when comparing tuples from both inputs to see
whether they match each other, previously we were using the combination
of `==` and `<` to make the decision which input to advance; now we will
be using `.Compare` equivalents. This allows us to simplify the template
generator a bit.
3. it removes extra spaces in the template directives.

Point number 2 shows some improvement (5% or so) in the benchmarks when
operating on ints, but it would matter more on Bytes and other
non-native types.

Release note: None

**colexec: miscellaneous cleanups**

This commit does the following:
1. plumbs the processor ID of the processor that we're wrapping into the
vectorized flow. This processor ID will be used for the materializer
/ columnarizer pair. I don't think it matters much, but why not.
2. resolves a minor TODO in the external sorter benchmark.
3. removes limitation of `2<<16-1` on K in the top K sorter (previously
limited by `uint16` batch length.
4. removes calling `Next` on the input of `zeroOperator` (which always
returns a zero batch). I believe previously it was needed so that
projecting operators had a chance to append their output columns to the
batch which is no longer the case.

Release note: None

48107: roachtest: fix tpchvec perf r=yuzefovich a=yuzefovich

This commit fixes a problem with "test case struct" resolution that was
introduced recently when we added `smithcmp` config to `tpchvec` test.
The root of the issue is that `testRun` function was made a part of the
`tpchVecTestCase` interface which doesn't really makes sense.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Apr 28, 2020
3 parents da17595 + 34040dc + 4ac2159 commit c2668dc
Show file tree
Hide file tree
Showing 35 changed files with 345 additions and 355 deletions.
88 changes: 44 additions & 44 deletions pkg/cmd/roachtest/tpchvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ var tpchTables = []string{

type tpchVecTestCase interface {
// TODO(asubiotto): Getting the queries we want to run given a version should
// also be part of this. This can also be where we return tpch queries with
// random placeholders.
// also be part of this.
// vectorizeOptions are the vectorize options that each query will be run
// with.
vectorizeOptions() []bool
Expand All @@ -97,9 +96,6 @@ type tpchVecTestCase interface {
// preTestRunHook is called before any tpch query is run. Can be used to
// perform setup.
preTestRunHook(ctx context.Context, t *test, c *cluster, conn *gosql.DB, version crdbVersion)
// testRun is the main function of the test case that executes the desired
// test workload.
testRun(ctx context.Context, t *test, c *cluster, version crdbVersion)
// postQueryRunHook is called after each tpch query is run with the output and
// the vectorize mode it was run in.
postQueryRunHook(t *test, output []byte, vectorized bool)
Expand Down Expand Up @@ -135,36 +131,6 @@ func (b tpchVecTestCaseBase) preTestRunHook(
}
}

func (b tpchVecTestCaseBase) testRun(
ctx context.Context, t *test, c *cluster, version crdbVersion,
) {
firstNode := c.Node(1)
queriesToSkip := queriesToSkipByVersion[version]
for queryNum := 1; queryNum <= tpchVecNumQueries; queryNum++ {
for _, vectorize := range b.vectorizeOptions() {
if reason, skip := queriesToSkip[queryNum]; skip {
t.Status(fmt.Sprintf("skipping q%d because of %q", queryNum, reason))
continue
}
vectorizeSetting := "off"
if vectorize {
vectorizeSetting = vectorizeOnOptionByVersion[version]
}
cmd := fmt.Sprintf("./workload run tpch --concurrency=1 --db=tpch "+
"--max-ops=%d --queries=%d --vectorize=%s {pgurl:1-%d}",
b.numRunsPerQuery(), queryNum, vectorizeSetting, tpchVecNodeCount)
workloadOutput, err := c.RunWithBuffer(ctx, t.l, firstNode, cmd)
t.l.Printf("\n" + string(workloadOutput))
if err != nil {
// Note: if you see an error like "exit status 1", it is likely caused
// by the erroneous output of the query.
t.Fatal(err)
}
b.postQueryRunHook(t, workloadOutput, vectorize)
}
}
}

func (b tpchVecTestCaseBase) postQueryRunHook(_ *test, _ []byte, _ bool) {}

func (b tpchVecTestCaseBase) postTestRunHook(_ *test, _ *gosql.DB, _ crdbVersion) {}
Expand Down Expand Up @@ -311,6 +277,36 @@ func (b tpchVecSmallBatchSizeTest) preTestRunHook(
setSmallBatchSize(t, conn, rng)
}

func baseTestRun(
ctx context.Context, t *test, c *cluster, version crdbVersion, tc tpchVecTestCase,
) {
firstNode := c.Node(1)
queriesToSkip := queriesToSkipByVersion[version]
for queryNum := 1; queryNum <= tpchVecNumQueries; queryNum++ {
for _, vectorize := range tc.vectorizeOptions() {
if reason, skip := queriesToSkip[queryNum]; skip {
t.Status(fmt.Sprintf("skipping q%d because of %q", queryNum, reason))
continue
}
vectorizeSetting := "off"
if vectorize {
vectorizeSetting = vectorizeOnOptionByVersion[version]
}
cmd := fmt.Sprintf("./workload run tpch --concurrency=1 --db=tpch "+
"--max-ops=%d --queries=%d --vectorize=%s {pgurl:1-%d}",
tc.numRunsPerQuery(), queryNum, vectorizeSetting, tpchVecNodeCount)
workloadOutput, err := c.RunWithBuffer(ctx, t.l, firstNode, cmd)
t.l.Printf("\n" + string(workloadOutput))
if err != nil {
// Note: if you see an error like "exit status 1", it is likely caused
// by the erroneous output of the query.
t.Fatal(err)
}
tc.postQueryRunHook(t, workloadOutput, vectorize)
}
}
}

type tpchVecSmithcmpTest struct {
tpchVecTestCaseBase
}
Expand Down Expand Up @@ -349,9 +345,7 @@ func (s tpchVecSmithcmpTest) preTestRunHook(
}
}

func (s tpchVecSmithcmpTest) testRun(
ctx context.Context, t *test, c *cluster, version crdbVersion,
) {
func smithcmpTestRun(ctx context.Context, t *test, c *cluster, _ crdbVersion, _ tpchVecTestCase) {
const (
configFile = `tpchvec_smithcmp.toml`
configURL = `https://raw.githubusercontent.com/cockroachdb/cockroach/master/pkg/cmd/roachtest/` + configFile
Expand All @@ -366,7 +360,13 @@ func (s tpchVecSmithcmpTest) testRun(
}
}

func runTPCHVec(ctx context.Context, t *test, c *cluster, testCase tpchVecTestCase) {
func runTPCHVec(
ctx context.Context,
t *test,
c *cluster,
testCase tpchVecTestCase,
testRun func(ctx context.Context, t *test, c *cluster, version crdbVersion, tc tpchVecTestCase),
) {
firstNode := c.Node(1)
c.Put(ctx, cockroach, "./cockroach", c.All())
c.Put(ctx, workload, "./workload", firstNode)
Expand Down Expand Up @@ -396,7 +396,7 @@ func runTPCHVec(ctx context.Context, t *test, c *cluster, testCase tpchVecTestCa
}

testCase.preTestRunHook(ctx, t, c, conn, version)
testCase.testRun(ctx, t, c, version)
testRun(ctx, t, c, version, testCase)
testCase.postTestRunHook(t, conn, version)
}

Expand All @@ -407,7 +407,7 @@ func registerTPCHVec(r *testRegistry) {
Cluster: makeClusterSpec(tpchVecNodeCount),
MinVersion: "v19.2.0",
Run: func(ctx context.Context, t *test, c *cluster) {
runTPCHVec(ctx, t, c, newTpchVecPerfTest())
runTPCHVec(ctx, t, c, newTpchVecPerfTest(), baseTestRun)
},
})

Expand All @@ -419,7 +419,7 @@ func registerTPCHVec(r *testRegistry) {
// there is no point in running this config on that version.
MinVersion: "v20.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
runTPCHVec(ctx, t, c, tpchVecDiskTest{})
runTPCHVec(ctx, t, c, tpchVecDiskTest{}, baseTestRun)
},
})

Expand All @@ -431,7 +431,7 @@ func registerTPCHVec(r *testRegistry) {
// size, so only run on versions >= 20.1.0.
MinVersion: "v20.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
runTPCHVec(ctx, t, c, tpchVecSmallBatchSizeTest{})
runTPCHVec(ctx, t, c, tpchVecSmallBatchSizeTest{}, baseTestRun)
},
})

Expand All @@ -441,7 +441,7 @@ func registerTPCHVec(r *testRegistry) {
Cluster: makeClusterSpec(tpchVecNodeCount),
MinVersion: "v20.1.0",
Run: func(ctx context.Context, t *test, c *cluster) {
runTPCHVec(ctx, t, c, tpchVecSmithcmpTest{})
runTPCHVec(ctx, t, c, tpchVecSmithcmpTest{}, smithcmpTestRun)
},
})
}
34 changes: 17 additions & 17 deletions pkg/sql/colexec/and_or_projection_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
)

// {{ range .}}
// {{range .}}

type _OP_LOWERProjOp struct {
allocator *colmem.Allocator
Expand Down Expand Up @@ -135,9 +135,9 @@ func (o *_OP_LOWERProjOp) Next(ctx context.Context) coldata.Batch {
knownResult bool
isLeftNull, isRightNull bool
)
// {{ if _IS_OR_OP }}
// {{if _IS_OR_OP}}
knownResult = true
// {{ end }}
// {{end}}
leftCol := batch.ColVec(o.leftIdx)
leftColVals := leftCol.Bool()
var curIdx int
Expand Down Expand Up @@ -226,7 +226,7 @@ func (o *_OP_LOWERProjOp) Next(ctx context.Context) coldata.Batch {
return batch
}

// {{ end }}
// {{end}}

// {{/*
// This code snippet decides whether to include the tuple with index i into
Expand Down Expand Up @@ -256,7 +256,7 @@ func _ADD_TUPLE_FOR_RIGHT(_L_HAS_NULLS bool) { // */}}
// This code snippet sets the result of applying a logical operation AND or OR
// to two boolean vectors while paying attention to null values.
func _SET_VALUES(_IS_OR_OP bool, _L_HAS_NULLS bool, _R_HAS_NULLS bool) { // */}}
// {{ define "setValues" -}}
// {{define "setValues" -}}
if sel := batch.Selection(); sel != nil {
for _, idx := range sel[:origLen] {
_SET_SINGLE_VALUE(_IS_OR_OP, _L_HAS_NULLS, _R_HAS_NULLS)
Expand All @@ -270,7 +270,7 @@ func _SET_VALUES(_IS_OR_OP bool, _L_HAS_NULLS bool, _R_HAS_NULLS bool) { // */}}
_SET_SINGLE_VALUE(_IS_OR_OP, _L_HAS_NULLS, _R_HAS_NULLS)
}
}
// {{ end }}
// {{end}}
// {{/*
}

Expand All @@ -280,23 +280,23 @@ func _SET_VALUES(_IS_OR_OP bool, _L_HAS_NULLS bool, _R_HAS_NULLS bool) { // */}}
// This code snippet sets the result of applying a logical operation AND or OR
// to two boolean values which can be null.
func _SET_SINGLE_VALUE(_IS_OR_OP bool, _L_HAS_NULLS bool, _R_HAS_NULLS bool) { // */}}
// {{ define "setSingleValue" -}}
// {{ if _L_HAS_NULLS }}
// {{define "setSingleValue" -}}
// {{if _L_HAS_NULLS}}
isLeftNull = leftNulls.NullAt(idx)
// {{ else }}
// {{else}}
isLeftNull = false
// {{ end }}
// {{end}}
leftVal := leftColVals[idx]
if !isLeftNull && leftVal == knownResult {
outputColVals[idx] = leftVal
} else {
// {{ if _R_HAS_NULLS }}
// {{if _R_HAS_NULLS}}
isRightNull = rightNulls.NullAt(idx)
// {{ else }}
// {{else}}
isRightNull = false
// {{ end }}
// {{end}}
rightVal := rightColVals[idx]
// {{ if _IS_OR_OP }}
// {{if _IS_OR_OP}}
// The rules for OR'ing two booleans are:
// 1. if at least one of the values is TRUE, then the result is also TRUE
// 2. if both values are FALSE, then the result is also FALSE
Expand All @@ -312,7 +312,7 @@ func _SET_SINGLE_VALUE(_IS_OR_OP bool, _L_HAS_NULLS bool, _R_HAS_NULLS bool) { /
// Rule 3.
outputNulls.SetNull(idx)
}
// {{ else }}
// {{else}}
// The rules for AND'ing two booleans are:
// 1. if at least one of the values is FALSE, then the result is also FALSE
// 2. if both values are TRUE, then the result is also TRUE
Expand All @@ -328,9 +328,9 @@ func _SET_SINGLE_VALUE(_IS_OR_OP bool, _L_HAS_NULLS bool, _R_HAS_NULLS bool) { /
// Rule 3.
outputNulls.SetNull(idx)
}
// {{ end }}
// {{end}}
}
// {{ end }}
// {{end}}
// {{/*
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/any_not_null_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ func _FIND_ANY_NOT_NULL(a *anyNotNull_TYPEAgg, nulls *coldata.Nulls, i int, _HAS
a.foundNonNullForCurrentGroup = false
}
var isNull bool
// {{ if .HasNulls }}
// {{if .HasNulls}}
isNull = nulls.NullAt(i)
// {{ else }}
// {{else}}
isNull = false
// {{ end }}
// {{end}}
if !a.foundNonNullForCurrentGroup && !isNull {
// If we haven't seen any non-nulls for the current group yet, and the
// current value is non-null, then we can pick the current value to be the
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colexec/avg_agg_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,16 @@ func _ACCUMULATE_AVG(a *_AGG_TYPEAgg, nulls *coldata.Nulls, i int, _HAS_NULLS bo
// We only need to reset this flag if there are nulls. If there are no
// nulls, this will be updated unconditionally below.
// */}}
// {{ if .HasNulls }}
// {{if .HasNulls}}
a.scratch.foundNonNullForCurrentGroup = false
// {{ end }}
// {{end}}
}
var isNull bool
// {{ if .HasNulls }}
// {{if .HasNulls}}
isNull = nulls.NullAt(i)
// {{ else }}
// {{else}}
isNull = false
// {{ end }}
// {{end}}
if !isNull {
_ASSIGN_ADD(a.scratch.curSum, a.scratch.curSum, col[i])
a.scratch.curCount++
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colexec/cast_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ func _FROM_TYPE_SLICE(col, i, j interface{}) interface{} {

func cast(fromType, toType *types.T, inputVec, outputVec coldata.Vec, n int, sel []int) {
switch typeconv.FromColumnType(fromType) {
// {{ range $typ, $overloads := . }}
// {{range $typ, $overloads := .}}
case coltypes._ALLTYPES:
switch typeconv.FromColumnType(toType) {
// {{ range $overloads }}
// {{ if isCastFuncSet . }}
// {{range $overloads}}
// {{if isCastFuncSet .}}
case coltypes._TOTYPE:
inputCol := inputVec._FROMTYPE()
outputCol := outputVec._TOTYPE()
Expand Down Expand Up @@ -159,11 +159,11 @@ func GetCastOperator(
}, nil
}
switch typeconv.FromColumnType(fromType) {
// {{ range $typ, $overloads := . }}
// {{range $typ, $overloads := .}}
case coltypes._ALLTYPES:
switch typeconv.FromColumnType(toType) {
// {{ range $overloads }}
// {{ if isCastFuncSet . }}
// {{range $overloads}}
// {{if isCastFuncSet .}}
case coltypes._TOTYPE:
return &castOp{
OneInputNode: NewOneInputNode(input),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func init() {
// this file for a list of replacements done.
func replaceManipulationFuncs(typeIdent string, body string) string {
for _, dmri := range dataManipulationReplacementInfos {
body = dmri.re.ReplaceAllString(body, fmt.Sprintf("{{ %s.%s }}", typeIdent, dmri.replaceWith))
body = dmri.re.ReplaceAllString(body, fmt.Sprintf("{{%s.%s}}", typeIdent, dmri.replaceWith))
}
return body
}
Loading

0 comments on commit c2668dc

Please sign in to comment.