diff --git a/executor/joiner.go b/executor/joiner.go index 562afa75dc6be..870669268d064 100644 --- a/executor/joiner.go +++ b/executor/joiner.go @@ -91,7 +91,6 @@ func newJoiner(ctx sessionctx.Context, joinType plan.JoinType, colTypes := make([]*types.FieldType, 0, len(lhsColTypes)+len(rhsColTypes)) colTypes = append(colTypes, lhsColTypes...) colTypes = append(colTypes, rhsColTypes...) - base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize) base.selected = make([]bool, 0, chunk.InitialCapacity) if joinType == plan.LeftOuterJoin || joinType == plan.RightOuterJoin { innerColTypes := lhsColTypes @@ -102,18 +101,25 @@ func newJoiner(ctx sessionctx.Context, joinType plan.JoinType, } switch joinType { case plan.SemiJoin: + base.shallowRow = chunk.MutRowFromTypes(colTypes) return &semiJoiner{base} case plan.AntiSemiJoin: + base.shallowRow = chunk.MutRowFromTypes(colTypes) return &antiSemiJoiner{base} case plan.LeftOuterSemiJoin: + base.shallowRow = chunk.MutRowFromTypes(colTypes) return &leftOuterSemiJoiner{base} case plan.AntiLeftOuterSemiJoin: + base.shallowRow = chunk.MutRowFromTypes(colTypes) return &antiLeftOuterSemiJoiner{base} case plan.LeftOuterJoin: + base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize) return &leftOuterJoiner{base} case plan.RightOuterJoin: + base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize) return &rightOuterJoiner{base} case plan.InnerJoin: + base.chk = chunk.NewChunkWithCapacity(colTypes, ctx.GetSessionVars().MaxChunkSize) return &innerJoiner{base} } panic("unsupported join type in func newJoiner()") @@ -125,6 +131,7 @@ type baseJoiner struct { defaultInner chunk.Row outerIsRight bool chk *chunk.Chunk + shallowRow chunk.MutRow selected []bool maxChunkSize int } @@ -142,6 +149,15 @@ func (j *baseJoiner) makeJoinRowToChunk(chk *chunk.Chunk, lhs, rhs chunk.Row) { chk.AppendPartialRow(lhs.Len(), rhs) } +// makeShallowJoinRow shallow copies `inner` and `outer` into `shallowRow`. +func (j *baseJoiner) makeShallowJoinRow(isRightJoin bool, inner, outer chunk.Row) { + if !isRightJoin { + inner, outer = outer, inner + } + j.shallowRow.ShallowCopyPartialRow(0, inner) + j.shallowRow.ShallowCopyPartialRow(inner.Len(), outer) +} + func (j *baseJoiner) filter(input, output *chunk.Chunk) (matched bool, err error) { j.selected, err = expression.VectorizedFilter(j.ctx, j.conditions, chunk.NewIterator4Chunk(input), j.selected) if err != nil { @@ -173,14 +189,9 @@ func (j *semiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk *chu } for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { - j.chk.Reset() - if j.outerIsRight { - j.makeJoinRowToChunk(j.chk, inner, outer) - } else { - j.makeJoinRowToChunk(j.chk, outer, inner) - } + j.makeShallowJoinRow(j.outerIsRight, inner, outer) - matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0)) + matched, err = expression.EvalBool(j.ctx, j.conditions, j.shallowRow.ToRow()) if err != nil { return false, errors.Trace(err) } @@ -212,14 +223,9 @@ func (j *antiSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk } for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { - j.chk.Reset() - if j.outerIsRight { - j.makeJoinRowToChunk(j.chk, inner, outer) - } else { - j.makeJoinRowToChunk(j.chk, outer, inner) - } + j.makeShallowJoinRow(j.outerIsRight, inner, outer) - matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0)) + matched, err = expression.EvalBool(j.ctx, j.conditions, j.shallowRow.ToRow()) if err != nil { return false, errors.Trace(err) } @@ -252,10 +258,9 @@ func (j *leftOuterSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, } for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { - j.chk.Reset() - j.makeJoinRowToChunk(j.chk, outer, inner) + j.makeShallowJoinRow(false, inner, outer) - matched, err = expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0)) + matched, err = expression.EvalBool(j.ctx, j.conditions, j.shallowRow.ToRow()) if err != nil { return false, errors.Trace(err) } @@ -295,10 +300,9 @@ func (j *antiLeftOuterSemiJoiner) tryToMatch(outer chunk.Row, inners chunk.Itera } for inner := inners.Current(); inner != inners.End(); inner = inners.Next() { - j.chk.Reset() - j.makeJoinRowToChunk(j.chk, outer, inner) - matched, err := expression.EvalBool(j.ctx, j.conditions, j.chk.GetRow(0)) + j.makeShallowJoinRow(false, inner, outer) + matched, err := expression.EvalBool(j.ctx, j.conditions, j.shallowRow.ToRow()) if err != nil { return false, errors.Trace(err) } @@ -330,7 +334,6 @@ func (j *leftOuterJoiner) tryToMatch(outer chunk.Row, inners chunk.Iterator, chk if inners.Len() == 0 { return false, nil } - j.chk.Reset() chkForJoin := j.chk if len(j.conditions) == 0 { diff --git a/util/chunk/chunk_test.go b/util/chunk/chunk_test.go index d6d7a1344aa48..f5158ca4b4da2 100644 --- a/util/chunk/chunk_test.go +++ b/util/chunk/chunk_test.go @@ -258,6 +258,18 @@ func newChunk(elemLen ...int) *Chunk { return chk } +func newChunkWithInitCap(cap int, elemLen ...int) *Chunk { + chk := &Chunk{} + for _, l := range elemLen { + if l > 0 { + chk.columns = append(chk.columns, newFixedLenColumn(l, cap)) + } else { + chk.columns = append(chk.columns, newVarLenColumn(cap, nil)) + } + } + return chk +} + var allTypes = []*types.FieldType{ types.NewFieldType(mysql.TypeTiny), types.NewFieldType(mysql.TypeShort), diff --git a/util/chunk/mutrow.go b/util/chunk/mutrow.go index 7cf1721296475..1eba29a2f9e5c 100644 --- a/util/chunk/mutrow.go +++ b/util/chunk/mutrow.go @@ -346,3 +346,26 @@ func setMutRowJSON(col *column, j json.BinaryJSON) { copy(col.data[1:], j.Value) col.offsets[1] = int32(dataLen) } + +// ShallowCopyPartialRow shallow copies the data of `row` to MutRow. +func (mr MutRow) ShallowCopyPartialRow(colIdx int, row Row) { + for i, srcCol := range row.c.columns { + dstCol := mr.c.columns[colIdx+i] + if !srcCol.isNull(row.idx) { + // MutRow only contains one row, so we can directly set the whole byte. + dstCol.nullBitmap[0] = 1 + } else { + dstCol.nullBitmap[0] = 0 + } + + if srcCol.isFixed() { + elemLen := len(srcCol.elemBuf) + offset := row.idx * elemLen + dstCol.data = srcCol.data[offset : offset+elemLen] + } else { + start, end := srcCol.offsets[row.idx], srcCol.offsets[row.idx+1] + dstCol.data = srcCol.data[start:end] + dstCol.offsets[1] = int32(len(dstCol.data)) + } + } +} diff --git a/util/chunk/mutrow_test.go b/util/chunk/mutrow_test.go index b4264e39a5855..bf2e925c7fb41 100644 --- a/util/chunk/mutrow_test.go +++ b/util/chunk/mutrow_test.go @@ -15,6 +15,7 @@ package chunk import ( "testing" + "time" "github.com/pingcap/check" "github.com/pingcap/tidb/mysql" @@ -134,3 +135,64 @@ func BenchmarkMutRowFromValues(b *testing.B) { MutRowFromValues(values) } } + +func (s *testChunkSuite) TestMutRowShallowCopyPartialRow(c *check.C) { + colTypes := make([]*types.FieldType, 0, 3) + colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeVarString}) + colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeLonglong}) + colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeTimestamp}) + + mutRow := MutRowFromTypes(colTypes) + row := MutRowFromValues("abc", 123, types.ZeroTimestamp).ToRow() + mutRow.ShallowCopyPartialRow(0, row) + c.Assert(row.GetString(0), check.Equals, mutRow.ToRow().GetString(0)) + c.Assert(row.GetInt64(1), check.Equals, mutRow.ToRow().GetInt64(1)) + c.Assert(row.GetTime(2), check.DeepEquals, mutRow.ToRow().GetTime(2)) + + row.c.Reset() + d := types.NewStringDatum("dfg") + row.c.AppendDatum(0, &d) + d = types.NewIntDatum(567) + row.c.AppendDatum(1, &d) + d = types.NewTimeDatum(types.Time{Time: types.FromGoTime(time.Now()), Fsp: 6, Type: mysql.TypeTimestamp}) + row.c.AppendDatum(2, &d) + + c.Assert(d.GetMysqlTime(), check.DeepEquals, mutRow.ToRow().GetTime(2)) + c.Assert(row.GetString(0), check.Equals, mutRow.ToRow().GetString(0)) + c.Assert(row.GetInt64(1), check.Equals, mutRow.ToRow().GetInt64(1)) + c.Assert(row.GetTime(2), check.DeepEquals, mutRow.ToRow().GetTime(2)) +} + +var rowsNum = 1024 + +func BenchmarkMutRowShallowCopyPartialRow(b *testing.B) { + b.ReportAllocs() + colTypes := make([]*types.FieldType, 0, 8) + colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeVarString}) + colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeVarString}) + colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeLonglong}) + colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeLonglong}) + colTypes = append(colTypes, &types.FieldType{Tp: mysql.TypeDatetime}) + + mutRow := MutRowFromTypes(colTypes) + row := MutRowFromValues("abc", "abcdefg", 123, 456, types.ZeroDatetime).ToRow() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 0; j < rowsNum; j++ { + mutRow.ShallowCopyPartialRow(0, row) + } + } +} + +func BenchmarkChunkAppendPartialRow(b *testing.B) { + b.ReportAllocs() + chk := newChunkWithInitCap(rowsNum, 0, 0, 8, 8, 16) + row := MutRowFromValues("abc", "abcdefg", 123, 456, types.ZeroDatetime).ToRow() + b.ResetTimer() + for i := 0; i < b.N; i++ { + chk.Reset() + for j := 0; j < rowsNum; j++ { + chk.AppendPartialRow(0, row) + } + } +}