Skip to content

Commit

Permalink
Merge pull request cockroachdb#45784 from yuzefovich/mj-disk-spilling
Browse files Browse the repository at this point in the history
colexec: add disk spilling to merge joiner
  • Loading branch information
yuzefovich authored Mar 6, 2020
2 parents 6a3417a + 928ff3c commit 6108aba
Show file tree
Hide file tree
Showing 14 changed files with 615 additions and 194 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/colcontainer/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,9 @@ func (d *diskQueue) Rewind() error {
if err := d.closeFileDeserializer(); err != nil {
return err
}
if err := d.CloseRead(); err != nil {
return err
}
d.deserializerState.curBatch = 0
d.readFile = nil
d.readFileIdx = 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ const DefaultVectorizeRowCountThreshold = 1000
// that the vectorized engine can have (globally) for use of the temporary
// storage.
const VecMaxOpenFDsLimit = 256

const defaultMemoryLimit = 64 << 20 /* 64 MiB */
32 changes: 14 additions & 18 deletions pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,25 +762,17 @@ func NewColOperator(
}

joinType := core.MergeJoiner.Type
mergeJoinerMemAccount := streamingMemAccount
if !result.IsStreaming && !useStreamingMemAccountForBuffering {
// If the merge joiner is buffering, create an unlimited buffering
// account for now.
// TODO(asubiotto): Once we support spilling to disk in the merge
// joiner, make this a limited account. Done this way so that we can
// still run plans that include a merge join with a low memory limit
// to test disk spilling of other components for the time being.
mergeJoinerMemAccount = result.createBufferingUnlimitedMemAccount(ctx, flowCtx, "merge-joiner")
}
// We are using an unlimited memory monitor here because merge joiner
// itself is responsible for making sure that we stay within the memory
// limit, and it will fall back to disk if necessary.
unlimitedAllocator := NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(
ctx, flowCtx, "merge-joiner",
))
result.Op, err = NewMergeJoinOp(
NewAllocator(ctx, mergeJoinerMemAccount),
joinType,
inputs[0],
inputs[1],
leftPhysTypes,
rightPhysTypes,
core.MergeJoiner.LeftOrdering.Columns,
core.MergeJoiner.RightOrdering.Columns,
unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx.Cfg), args.DiskQueueCfg, args.FDSemaphore,
joinType, inputs[0], inputs[1], leftPhysTypes, rightPhysTypes,
core.MergeJoiner.LeftOrdering.Columns, core.MergeJoiner.RightOrdering.Columns,
)
if err != nil {
return result, err
Expand All @@ -801,6 +793,10 @@ func NewColOperator(
}
}

// Merge joiner can run in auto mode because it falls back to disk if
// there is not enough memory available.
result.CanRunInAutoMode = true

case core.Sorter != nil:
if err := checkNumIn(inputs, 1); err != nil {
return result, err
Expand Down
9 changes: 4 additions & 5 deletions pkg/sql/colexec/mergejoinbase_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func (o *mergeJoinBase) isBufferedGroupFinished(
if input == &o.right {
bufferedGroup = o.proberState.rBufferedGroup
}
lastBufferedTupleIdx := bufferedGroup.Length() - 1
tupleToLookAtIdx := rowIdx
sel := batch.Selection()
if sel != nil {
Expand All @@ -101,16 +100,16 @@ func (o *mergeJoinBase) isBufferedGroupFinished(
switch colTyp {
// {{ range . }}
case _TYPES_T:
// We perform this null check on every equality column of the last
// We perform this null check on every equality column of the first
// buffered tuple regardless of the join type since it is done only once
// per batch. In some cases (like INNER JOIN, or LEFT OUTER JOIN with the
// right side being an input) this check will always return false since
// nulls couldn't be buffered up though.
if bufferedGroup.ColVec(int(colIdx)).Nulls().NullAt(lastBufferedTupleIdx) {
if bufferedGroup.firstTuple[colIdx].Nulls().NullAt(0) {
return true
}
bufferedCol := bufferedGroup.ColVec(int(colIdx))._TemplateType()
prevVal := execgen.UNSAFEGET(bufferedCol, lastBufferedTupleIdx)
bufferedCol := bufferedGroup.firstTuple[colIdx]._TemplateType()
prevVal := execgen.UNSAFEGET(bufferedCol, 0)
var curVal _GOTYPE
if batch.ColVec(int(colIdx)).MaybeHasNulls() && batch.ColVec(int(colIdx)).Nulls().NullAt(tupleToLookAtIdx) {
return true
Expand Down
Loading

0 comments on commit 6108aba

Please sign in to comment.