Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exec: reset internal unsafeBatch in orderedAggregator #40668

Merged
merged 1 commit into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/sql/colexec/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ func (a *orderedAggregator) EstimateStaticMemoryUsage() int {
return EstimateBatchSizeBytes(a.outputTypes, coldata.BatchSize*2)
}

func (a *orderedAggregator) initWithBatchSize(inputSize, outputSize int) {
func (a *orderedAggregator) initWithOutputBatchSize(outputSize uint16) {
a.initWithInputAndOutputBatchSize(coldata.BatchSize, int(outputSize))
}

func (a *orderedAggregator) initWithInputAndOutputBatchSize(inputSize, outputSize int) {
a.input.Init()

// Twice the input batchSize is allocated to avoid having to check for
Expand All @@ -261,10 +265,11 @@ func (a *orderedAggregator) initWithBatchSize(inputSize, outputSize int) {
}

func (a *orderedAggregator) Init() {
a.initWithBatchSize(coldata.BatchSize, coldata.BatchSize)
a.initWithInputAndOutputBatchSize(coldata.BatchSize, coldata.BatchSize)
}

func (a *orderedAggregator) Next(ctx context.Context) coldata.Batch {
a.unsafeBatch.ResetInternalBatch()
if a.scratch.shouldResetInternalBatch {
a.scratch.ResetInternalBatch()
a.scratch.shouldResetInternalBatch = false
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func TestAggregatorOneFunc(t *testing.T) {
out := newOpTestOutput(a, []int{0}, tc.expected)
// Explicitly reinitialize the aggregator with the given output batch
// size.
a.(*orderedAggregator).initWithBatchSize(tc.batchSize, tc.outputBatchSize)
a.(*orderedAggregator).initWithInputAndOutputBatchSize(tc.batchSize, tc.outputBatchSize)
if err := out.VerifyAnyOrder(); err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/colexec/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,8 @@ type mergeJoinBase struct {
right mergeJoinInput

// Output buffer definition.
output coldata.Batch
needToResetOutput bool
outputBatchSize uint16
output coldata.Batch
outputBatchSize uint16
// outputReady is a flag to indicate that merge joiner is ready to emit an
// output batch.
outputReady bool
Expand Down Expand Up @@ -402,10 +401,10 @@ func (o *mergeJoinBase) EstimateStaticMemoryUsage() int {
}

func (o *mergeJoinBase) Init() {
o.initWithBatchSize(coldata.BatchSize)
o.initWithOutputBatchSize(coldata.BatchSize)
}

func (o *mergeJoinBase) initWithBatchSize(outBatchSize uint16) {
func (o *mergeJoinBase) initWithOutputBatchSize(outBatchSize uint16) {
o.output = coldata.NewMemBatchWithSize(o.getOutColTypes(), int(outBatchSize))
o.left.source.Init()
o.right.source.Init()
Expand Down
12 changes: 4 additions & 8 deletions pkg/sql/colexec/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

type mjTestInitializer interface {
initWithBatchSize(outBatchSize uint16)
}

// TODO(yuzefovich): add unit tests for cases with ON expression.

type mjTestCase struct {
Expand Down Expand Up @@ -1528,8 +1524,8 @@ func TestMergeJoiner(t *testing.T) {
// We use a custom verifier function so that we can get the merge join op
// to use a custom output batch size per test, to exercise more cases.
var mergeJoinVerifier verifier = func(output *opTestOutput) error {
if mj, ok := output.input.(mjTestInitializer); ok {
mj.initWithBatchSize(tc.outputBatchSize)
if mj, ok := output.input.(variableOutputBatchSizeInitializer); ok {
mj.initWithOutputBatchSize(tc.outputBatchSize)
} else {
t.Fatalf("unexpectedly merge joiner doesn't implement mjTestInitializer")
}
Expand Down Expand Up @@ -1597,7 +1593,7 @@ func TestMergeJoinerMultiBatch(t *testing.T) {
t.Fatal("error in merge join op constructor", err)
}

a.(*mergeJoinInnerOp).initWithBatchSize(outBatchSize)
a.(*mergeJoinInnerOp).initWithOutputBatchSize(outBatchSize)

i := 0
count := 0
Expand Down Expand Up @@ -1729,7 +1725,7 @@ func TestMergeJoinerLongMultiBatchCount(t *testing.T) {
t.Fatal("error in merge join op constructor", err)
}

a.(*mergeJoinInnerOp).initWithBatchSize(outBatchSize)
a.(*mergeJoinInnerOp).initWithOutputBatchSize(outBatchSize)

count := 0
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/colexec/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,13 +1238,10 @@ func (o *mergeJoin_JOIN_TYPE_STRING_FILTER_INFO_STRINGOp) calculateOutputCount(
}

func (o *mergeJoin_JOIN_TYPE_STRING_FILTER_INFO_STRINGOp) Next(ctx context.Context) coldata.Batch {
o.output.ResetInternalBatch()
for {
switch o.state {
case mjEntry:
if o.needToResetOutput {
o.needToResetOutput = false
o.output.ResetInternalBatch()
}
o.initProberState(ctx)

if o.nonEmptyBufferedGroup() {
Expand Down Expand Up @@ -1281,7 +1278,6 @@ func (o *mergeJoin_JOIN_TYPE_STRING_FILTER_INFO_STRINGOp) Next(ctx context.Conte
o.output.SetLength(o.builderState.outCount)
// Reset builder out count.
o.builderState.outCount = uint16(0)
o.needToResetOutput = true
o.outputReady = false
return o.output
}
Expand Down
15 changes: 14 additions & 1 deletion pkg/sql/colexec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ func maybeHasNulls(b coldata.Batch) bool {

type testRunner func(*testing.T, []tuples, []coltypes.T, tuples, verifier, []int, func([]Operator) (Operator, error))

// variableOutputBatchSizeInitializer is implemented by operators that can be
// initialized with variable output size batches. This allows runTests to
// increase test coverage of these operators.
type variableOutputBatchSizeInitializer interface {
initWithOutputBatchSize(uint16)
}

// runTests is a helper that automatically runs your tests with varied batch
// sizes and with and without a random selection vector.
// tups is the sets of input tuples.
Expand Down Expand Up @@ -219,7 +226,13 @@ func runTestsWithoutAllNullsInjection(
if err != nil {
t.Fatal(err)
}
op.Init()
if vbsiOp, ok := op.(variableOutputBatchSizeInitializer); ok {
// initialize the operator with a very small output batch size to
// increase the likelihood that multiple batches will be output.
vbsiOp.initWithOutputBatchSize(1)
} else {
op.Init()
}
ctx := context.Background()
b := op.Next(ctx)
if round == 1 {
Expand Down