Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: control Chunk size for Joiners #9614

Merged
merged 16 commits into from
Mar 19, 2019
14 changes: 14 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *execu
}
}

// MockPhysicalPlan is used to return a specified executor in when build.
// It is mainly used for testing.
type MockPhysicalPlan interface {
plannercore.PhysicalPlan
GetExecutor() Executor
}

func (b *executorBuilder) build(p plannercore.Plan) Executor {
switch v := p.(type) {
case nil:
Expand Down Expand Up @@ -172,6 +179,10 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
case *plannercore.PhysicalWindow:
return b.buildWindow(v)
default:
if mp, ok := p.(MockPhysicalPlan); ok {
return mp.GetExecutor()
}

b.err = ErrUnknownPlan.GenWithStack("Unknown Plan %T", p)
return nil
}
Expand Down Expand Up @@ -835,6 +846,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
leftExec.retTypes(),
rightExec.retTypes(),
),
isOuterJoin: v.JoinType.IsOuterJoin(),
}

leftKeys := v.LeftKeys
Expand Down Expand Up @@ -902,6 +914,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), leftExec, rightExec),
concurrency: v.Concurrency,
joinType: v.JoinType,
isOuterJoin: v.JoinType.IsOuterJoin(),
innerIdx: v.InnerChildIdx,
}

Expand Down Expand Up @@ -1541,6 +1554,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
},
workerWg: new(sync.WaitGroup),
joiner: newJoiner(b.ctx, v.JoinType, v.OuterIndex == 1, defaultValues, v.OtherConditions, leftTypes, rightTypes),
isOuterJoin: v.JoinType.IsOuterJoin(),
indexRanges: v.Ranges,
keyOff2IdxOff: v.KeyOff2IdxOff,
}
Expand Down
77 changes: 77 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRo

func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.RecordBatch) error {
defer func() {
if r.expectedRowsRet == nil {
r.numNextCalled++
return
}
rowsRet := req.NumRows()
expected := r.expectedRowsRet[r.numNextCalled]
if rowsRet != expected {
Expand Down Expand Up @@ -727,3 +731,76 @@ func (s *testExecSuite) TestHashAggParallelRequiredRows(c *C) {
}
}
}

func (s *testExecSuite) TestMergeJoinRequiredRows(c *C) {
justReturn1 := func(valType *types.FieldType) interface{} {
switch valType.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(1)
case mysql.TypeDouble:
return float64(1)
default:
panic("not support")
}
}
joinTypes := []plannercore.JoinType{plannercore.RightOuterJoin, plannercore.LeftOuterJoin,
plannercore.LeftOuterSemiJoin, plannercore.AntiLeftOuterSemiJoin}
for _, joinType := range joinTypes {
ctx := defaultCtx()
required := make([]int, 100)
for i := range required {
required[i] = rand.Int()%ctx.GetSessionVars().MaxChunkSize + 1
}
innerSrc := newRequiredRowsDataSourceWithGenerator(ctx, 1, nil, justReturn1) // just return one row: (1, 1)
outerSrc := newRequiredRowsDataSourceWithGenerator(ctx, 10000000, required, justReturn1) // always return (1, 1)
exec := buildMergeJoinExec(ctx, joinType, innerSrc, outerSrc)
c.Assert(exec.Open(context.Background()), IsNil)

chk := exec.newFirstChunk()
for i := range required {
chk.SetRequiredRows(required[i], ctx.GetSessionVars().MaxChunkSize)
c.Assert(exec.Next(context.Background(), chunk.NewRecordBatch(chk)), IsNil)
}
c.Assert(exec.Close(), IsNil)
c.Assert(outerSrc.checkNumNextCalled(), IsNil)
}
}

func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, innerSrc, outerSrc Executor) Executor {
if joinType == plannercore.RightOuterJoin {
innerSrc, outerSrc = outerSrc, innerSrc
}

innerCols := innerSrc.Schema().Columns
outerCols := outerSrc.Schema().Columns
j := plannercore.PhysicalMergeJoin{
JoinType: joinType,
LeftConditions: nil,
RightConditions: nil,
DefaultValues: []types.Datum{types.NewDatum(1), types.NewDatum(1)},
LeftKeys: outerCols,
RightKeys: innerCols,
}.Init(ctx, nil)

j.SetChildren(&mockPlan{exec: outerSrc}, &mockPlan{exec: innerSrc})
cols := append(append([]*expression.Column{}, outerCols...), innerCols...)
schema := expression.NewSchema(cols...)
j.SetSchema(schema)

j.CompareFuncs = make([]expression.CompareFunc, 0, len(j.LeftKeys))
for i := range j.LeftKeys {
j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(j.LeftKeys[i], j.RightKeys[i]))
}

b := newExecutorBuilder(ctx, nil)
return b.build(j)
}

type mockPlan struct {
MockPhysicalPlan
exec Executor
}

func (mp *mockPlan) GetExecutor() Executor {
return mp.exec
}
22 changes: 19 additions & 3 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -62,7 +63,10 @@ type IndexLookUpJoin struct {
joinResult *chunk.Chunk
innerIter chunk.Iterator

joiner joiner
joiner joiner
isOuterJoin bool

requiredRows int64

indexRanges []*ranger.Range
keyOff2IdxOff []int
Expand Down Expand Up @@ -103,6 +107,8 @@ type lookUpJoinTask struct {
type outerWorker struct {
outerCtx

lookup *IndexLookUpJoin
qw4990 marked this conversation as resolved.
Show resolved Hide resolved

ctx sessionctx.Context
executor Executor

Expand Down Expand Up @@ -190,6 +196,7 @@ func (e *IndexLookUpJoin) newOuterWorker(resultCh, innerCh chan *lookUpJoinTask)
batchSize: 32,
maxBatchSize: e.ctx.GetSessionVars().IndexJoinBatchSize,
parentMemTracker: e.memTracker,
lookup: e,
}
return ow
}
Expand Down Expand Up @@ -218,6 +225,9 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.RecordBatch) erro
start := time.Now()
defer func() { e.runtimeStats.Record(time.Since(start), req.NumRows()) }()
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
req.Reset()
e.joinResult.Reset()
for {
Expand Down Expand Up @@ -251,7 +261,7 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, req *chunk.RecordBatch) erro
task.hasMatch = false
task.hasNull = false
}
if req.NumRows() == e.maxChunkSize {
if req.IsFull() {
return nil
}
}
Expand Down Expand Up @@ -360,9 +370,15 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
task.memTracker.AttachTo(ow.parentMemTracker)

ow.increaseBatchSize()
if ow.lookup.isOuterJoin { // if is outerJoin, push the requiredRows down
requiredRows := int(atomic.LoadInt64(&ow.lookup.requiredRows))
task.outerResult.SetRequiredRows(requiredRows, ow.maxBatchSize)
} else {
task.outerResult.SetRequiredRows(ow.batchSize, ow.maxBatchSize)
}

task.memTracker.Consume(task.outerResult.MemoryUsage())
for task.outerResult.NumRows() < ow.batchSize {
for !task.outerResult.IsFull() {
err := ow.executor.Next(ctx, chunk.NewRecordBatch(ow.executorChk))
if err != nil {
return task, errors.Trace(err)
Expand Down
18 changes: 14 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type HashJoinExec struct {
joinType plannercore.JoinType
innerIdx int

isOuterJoin bool
requiredRows int64

// We build individual joiner for each join worker when use chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
joiners []joiner
Expand Down Expand Up @@ -211,6 +214,10 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
}
}
outerResult := outerResource.chk
if e.isOuterJoin {
required := int(atomic.LoadInt64(&e.requiredRows))
outerResult.SetRequiredRows(required, e.maxChunkSize)
}
err := e.outerExec.Next(ctx, chunk.NewRecordBatch(outerResult))
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
Expand Down Expand Up @@ -427,7 +434,7 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
hasMatch = hasMatch || matched
hasNull = hasNull || isNull

if joinResult.chk.NumRows() == e.maxChunkSize {
if joinResult.chk.IsFull() {
e.joinResultCh <- joinResult
ok, joinResult := e.getNewJoinResult(workerID)
if !ok {
Expand Down Expand Up @@ -471,7 +478,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
return false, joinResult
}
}
if joinResult.chk.NumRows() == e.maxChunkSize {
if joinResult.chk.IsFull() {
e.joinResultCh <- joinResult
ok, joinResult = e.getNewJoinResult(workerID)
if !ok {
Expand All @@ -497,6 +504,9 @@ func (e *HashJoinExec) Next(ctx context.Context, req *chunk.RecordBatch) (err er
e.fetchOuterAndProbeHashTable(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
req.Reset()
if e.joinResultCh == nil {
return nil
Expand Down Expand Up @@ -649,7 +659,7 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
return &outerRow, nil
} else if e.outer {
e.joiner.onMissMatch(false, outerRow, chk)
if chk.NumRows() == e.maxChunkSize {
if chk.IsFull() {
return nil, nil
}
}
Expand Down Expand Up @@ -720,7 +730,7 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, req *chunk.RecordBatch)
e.hasMatch = e.hasMatch || matched
e.hasNull = e.hasNull || isNull

if err != nil || req.NumRows() == e.maxChunkSize {
if err != nil || req.IsFull() {
return errors.Trace(err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (j *leftOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk
chkForJoin = chk
}

numToAppend := j.maxChunkSize - chk.NumRows()
numToAppend := chk.RequiredRows() - chk.NumRows()
for ; inners.Current() != inners.End() && numToAppend > 0; numToAppend-- {
j.makeJoinRowToChunk(chkForJoin, outer, inners.Current())
inners.Next()
Expand Down Expand Up @@ -403,7 +403,7 @@ func (j *rightOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, ch
chkForJoin = chk
}

numToAppend := j.maxChunkSize - chk.NumRows()
numToAppend := chk.RequiredRows() - chk.NumRows()
for ; inners.Current() != inners.End() && numToAppend > 0; numToAppend-- {
j.makeJoinRowToChunk(chkForJoin, inners.Current(), outer)
inners.Next()
Expand Down Expand Up @@ -438,7 +438,7 @@ func (j *innerJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *ch
if len(j.conditions) == 0 {
chkForJoin = chk
}
inner, numToAppend := inners.Current(), j.maxChunkSize-chk.NumRows()
inner, numToAppend := inners.Current(), chk.RequiredRows()-chk.NumRows()
for ; inner != inners.End() && numToAppend > 0; inner, numToAppend = inners.Next(), numToAppend-1 {
if j.outerIsRight {
j.makeJoinRowToChunk(chkForJoin, inner, outer)
Expand Down
Loading