Skip to content

Commit

Permalink
executor: control Chunk size for Joiners (pingcap#9614)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and kolbe committed Mar 20, 2019
1 parent 49fdd63 commit 49735ab
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 18 deletions.
14 changes: 14 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *execu
}
}

// MockPhysicalPlan is used to return a specified executor in when build.
// It is mainly used for testing.
type MockPhysicalPlan interface {
plannercore.PhysicalPlan
GetExecutor() Executor
}

func (b *executorBuilder) build(p plannercore.Plan) Executor {
switch v := p.(type) {
case nil:
Expand Down Expand Up @@ -172,6 +179,10 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
case *plannercore.PhysicalWindow:
return b.buildWindow(v)
default:
if mp, ok := p.(MockPhysicalPlan); ok {
return mp.GetExecutor()
}

b.err = ErrUnknownPlan.GenWithStack("Unknown Plan %T", p)
return nil
}
Expand Down Expand Up @@ -835,6 +846,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
leftExec.retTypes(),
rightExec.retTypes(),
),
isOuterJoin: v.JoinType.IsOuterJoin(),
}

leftKeys := v.LeftKeys
Expand Down Expand Up @@ -902,6 +914,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),
concurrency: v.Concurrency,
joinType: v.JoinType,
isOuterJoin: v.JoinType.IsOuterJoin(),
innerIdx: v.InnerChildIdx,
}

Expand Down Expand Up @@ -1541,6 +1554,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
},
workerWg: new(sync.WaitGroup),
joiner: newJoiner(b.ctx, v.JoinType, v.OuterIndex == 1, defaultValues, v.OtherConditions, leftTypes, rightTypes),
isOuterJoin: v.JoinType.IsOuterJoin(),
indexRanges: v.Ranges,
keyOff2IdxOff: v.KeyOff2IdxOff,
}
Expand Down
77 changes: 77 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRo

func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.RecordBatch) error {
defer func() {
if r.expectedRowsRet == nil {
r.numNextCalled++
return
}
rowsRet := req.NumRows()
expected := r.expectedRowsRet[r.numNextCalled]
if rowsRet != expected {
Expand Down Expand Up @@ -727,3 +731,76 @@ func (s *testExecSuite) TestHashAggParallelRequiredRows(c *C) {
}
}
}

func (s *testExecSuite) TestMergeJoinRequiredRows(c *C) {
justReturn1 := func(valType *types.FieldType) interface{} {
switch valType.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(1)
case mysql.TypeDouble:
return float64(1)
default:
panic("not support")
}
}
joinTypes := []plannercore.JoinType{plannercore.RightOuterJoin, plannercore.LeftOuterJoin,
plannercore.LeftOuterSemiJoin, plannercore.AntiLeftOuterSemiJoin}
for _, joinType := range joinTypes {
ctx := defaultCtx()
required := make([]int, 100)
for i := range required {
required[i] = rand.Int()%ctx.GetSessionVars().MaxChunkSize + 1
}
innerSrc := newRequiredRowsDataSourceWithGenerator(ctx, 1, nil, justReturn1) // just return one row: (1, 1)
outerSrc := newRequiredRowsDataSourceWithGenerator(ctx, 10000000, required, justReturn1) // always return (1, 1)
exec := buildMergeJoinExec(ctx, joinType, innerSrc, outerSrc)
c.Assert(exec.Open(context.Background()), IsNil)

chk := exec.newFirstChunk()
for i := range required {
chk.SetRequiredRows(required[i], ctx.GetSessionVars().MaxChunkSize)
c.Assert(exec.Next(context.Background(), chunk.NewRecordBatch(chk)), IsNil)
}
c.Assert(exec.Close(), IsNil)
c.Assert(outerSrc.checkNumNextCalled(), IsNil)
}
}

func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, innerSrc, outerSrc Executor) Executor {
if joinType == plannercore.RightOuterJoin {
innerSrc, outerSrc = outerSrc, innerSrc
}

innerCols := innerSrc.Schema().Columns
outerCols := outerSrc.Schema().Columns
j := plannercore.PhysicalMergeJoin{
JoinType: joinType,
LeftConditions: nil,
RightConditions: nil,
DefaultValues: []types.Datum{types.NewDatum(1), types.NewDatum(1)},
LeftKeys: outerCols,
RightKeys: innerCols,
}.Init(ctx, nil)

j.SetChildren(&mockPlan{exec: outerSrc}, &mockPlan{exec: innerSrc})
cols := append(append([]*expression.Column{}, outerCols...), innerCols...)
schema := expression.NewSchema(cols...)
j.SetSchema(schema)

j.CompareFuncs = make([]expression.CompareFunc, 0, len(j.LeftKeys))
for i := range j.LeftKeys {
j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(j.LeftKeys[i], j.RightKeys[i]))
}

b := newExecutorBuilder(ctx, nil)
return b.build(j)
}

type mockPlan struct {
MockPhysicalPlan
exec Executor
}

func (mp *mockPlan) GetExecutor() Executor {
return mp.exec
}
22 changes: 19 additions & 3 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -62,7 +63,10 @@ type IndexLookUpJoin struct {
joinResult *chunk.Chunk
innerIter chunk.Iterator

joiner joiner
joiner joiner
isOuterJoin bool

requiredRows int64

indexRanges []*ranger.Range
keyOff2IdxOff []int
Expand Down Expand Up @@ -103,6 +107,8 @@ type lookUpJoinTask struct {
type outerWorker struct {
outerCtx

lookup *IndexLookUpJoin

ctx sessionctx.Context
executor Executor

Expand Down Expand Up @@ -190,6 +196,7 @@ func (e *IndexLookUpJoin) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask)
batchSize: 32,
maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize,
parentMemTracker: e.memTracker,
lookup: e,
}
return ow
}
Expand Down Expand Up @@ -218,6 +225,9 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.RecordBatch) erro
start := time.Now()
defer func() { e.runtimeStats.Record(time.Since(start), req.NumRows()) }()
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
req.Reset()
e.joinResult.Reset()
for {
Expand Down Expand Up @@ -251,7 +261,7 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.RecordBatch) erro
task.hasMatch = false
task.hasNull = false
}
if req.NumRows() == e.maxChunkSize {
if req.IsFull() {
return nil
}
}
Expand Down Expand Up @@ -360,9 +370,15 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
task.memTracker.AttachTo(ow.parentMemTracker)

ow.increaseBatchSize()
if ow.lookup.isOuterJoin { // if is outerJoin, push the requiredRows down
requiredRows := int(atomic.LoadInt64(&ow.lookup.requiredRows))
task.outerResult.SetRequiredRows(requiredRows, ow.maxBatchSize)
} else {
task.outerResult.SetRequiredRows(ow.batchSize, ow.maxBatchSize)
}

task.memTracker.Consume(task.outerResult.MemoryUsage())
for task.outerResult.NumRows() < ow.batchSize {
for !task.outerResult.IsFull() {
err := ow.executor.Next(ctx, chunk.NewRecordBatch(ow.executorChk))
if err != nil {
return task, errors.Trace(err)
Expand Down
18 changes: 14 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type HashJoinExec struct {
joinType plannercore.JoinType
innerIdx int

isOuterJoin bool
requiredRows int64

// We build individual joiner for each join worker when use chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
joiners []joiner
Expand Down Expand Up @@ -211,6 +214,10 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
}
}
outerResult := outerResource.chk
if e.isOuterJoin {
required := int(atomic.LoadInt64(&e.requiredRows))
outerResult.SetRequiredRows(required, e.maxChunkSize)
}
err := e.outerExec.Next(ctx, chunk.NewRecordBatch(outerResult))
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
Expand Down Expand Up @@ -427,7 +434,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
hasMatch = hasMatch || matched
hasNull = hasNull || isNull

if joinResult.chk.NumRows() == e.maxChunkSize {
if joinResult.chk.IsFull() {
e.joinResultCh <- joinResult
ok, joinResult := e.getNewJoinResult(workerID)
if !ok {
Expand Down Expand Up @@ -471,7 +478,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
return false, joinResult
}
}
if joinResult.chk.NumRows() == e.maxChunkSize {
if joinResult.chk.IsFull() {
e.joinResultCh <- joinResult
ok, joinResult = e.getNewJoinResult(workerID)
if !ok {
Expand All @@ -497,6 +504,9 @@ func (e *HashJoinExec) Next(ctx context.Context, req *chunk.RecordBatch) (err er
e.fetchOuterAndProbeHashTable(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
req.Reset()
if e.joinResultCh == nil {
return nil
Expand Down Expand Up @@ -649,7 +659,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
return &outerRow, nil
} else if e.outer {
e.joiner.onMissMatch(false, outerRow, chk)
if chk.NumRows() == e.maxChunkSize {
if chk.IsFull() {
return nil, nil
}
}
Expand Down Expand Up @@ -720,7 +730,7 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, req *chunk.RecordBatch)
e.hasMatch = e.hasMatch || matched
e.hasNull = e.hasNull || isNull

if err != nil || req.NumRows() == e.maxChunkSize {
if err != nil || req.IsFull() {
return errors.Trace(err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (j *leftOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk
chkForJoin = chk
}

numToAppend := j.maxChunkSize - chk.NumRows()
numToAppend := chk.RequiredRows() - chk.NumRows()
for ; inners.Current() != inners.End() && numToAppend > 0; numToAppend-- {
j.makeJoinRowToChunk(chkForJoin, outer, inners.Current())
inners.Next()
Expand Down Expand Up @@ -403,7 +403,7 @@ func (j *rightOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, ch
chkForJoin = chk
}

numToAppend := j.maxChunkSize - chk.NumRows()
numToAppend := chk.RequiredRows() - chk.NumRows()
for ; inners.Current() != inners.End() && numToAppend > 0; numToAppend-- {
j.makeJoinRowToChunk(chkForJoin, inners.Current(), outer)
inners.Next()
Expand Down Expand Up @@ -438,7 +438,7 @@ func (j *innerJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *ch
if len(j.conditions) == 0 {
chkForJoin = chk
}
inner, numToAppend := inners.Current(), j.maxChunkSize-chk.NumRows()
inner, numToAppend := inners.Current(), chk.RequiredRows()-chk.NumRows()
for ; inner != inners.End() && numToAppend > 0; inner, numToAppend = inners.Next(), numToAppend-1 {
if j.outerIsRight {
j.makeJoinRowToChunk(chkForJoin, inner, outer)
Expand Down
Loading

0 comments on commit 49735ab

Please sign in to comment.