Skip to content

Commit

Permalink
executor: implement disk-based sort (Part 2) (#14279)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Feb 7, 2020
1 parent 3c59e7c commit 13bf6cc
Show file tree
Hide file tree
Showing 6 changed files with 520 additions and 234 deletions.
170 changes: 140 additions & 30 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"sort"
Expand Down Expand Up @@ -46,13 +47,12 @@ var (
)

type mockDataSourceParameters struct {
schema *expression.Schema
genDataFunc func(row int, typ *types.FieldType) interface{}
ndvs []int // number of distinct values on columns[i] and zero represents no limit
orders []bool // columns[i] should be ordered if orders[i] is true
rows int // number of rows the DataSource should output
ctx sessionctx.Context
isRawDataSmall bool // false: rawData, true: rawDataSmall
schema *expression.Schema
genDataFunc func(row int, typ *types.FieldType) interface{}
ndvs []int // number of distinct values on columns[i] and zero represents no limit
orders []bool // columns[i] should be ordered if orders[i] is true
rows int // number of rows the DataSource should output
ctx sessionctx.Context
}

type mockDataSource struct {
Expand Down Expand Up @@ -154,10 +154,9 @@ func (mds *mockDataSource) randDatum(typ *types.FieldType) interface{} {
case mysql.TypeDouble:
return rand.Float64()
case mysql.TypeVarString:
if mds.p.isRawDataSmall {
return rawDataSmall
}
return rawData
buff := make([]byte, 10)
rand.Read(buff)
return base64.RawURLEncoding.EncodeToString(buff)
default:
panic("not implement")
}
Expand Down Expand Up @@ -508,18 +507,14 @@ type windowTestCase struct {
concurrency int
dataSourceSorted bool
ctx sessionctx.Context
rawDataSmall string
}

var rawData = strings.Repeat("x", 5*1024)
var rawDataSmall = strings.Repeat("x", 16)

func (a windowTestCase) columns() []*expression.Column {
rawDataTp := new(types.FieldType)
types.DefaultTypeForValue(rawData, rawDataTp)
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeDouble)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 2, RetType: rawDataTp},
{Index: 2, RetType: types.NewFieldType(mysql.TypeVarString)},
{Index: 3, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}
Expand All @@ -533,7 +528,7 @@ func defaultWindowTestCase() *windowTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
return &windowTestCase{ast.WindowFuncRowNumber, 1, nil, 1000, 10000000, 1, true, ctx}
return &windowTestCase{ast.WindowFuncRowNumber, 1, nil, 1000, 10000000, 1, true, ctx, strings.Repeat("x", 16)}
}

func benchmarkWindowExecWithCase(b *testing.B, casTest *windowTestCase) {
Expand All @@ -544,12 +539,21 @@ func benchmarkWindowExecWithCase(b *testing.B, casTest *windowTestCase) {

cols := casTest.columns()
dataSource := buildMockDataSource(mockDataSourceParameters{
schema: expression.NewSchema(cols...),
ndvs: []int{0, casTest.ndv, 0, 0},
orders: []bool{false, casTest.dataSourceSorted, false, false},
rows: casTest.rows,
ctx: casTest.ctx,
isRawDataSmall: true,
schema: expression.NewSchema(cols...),
ndvs: []int{0, casTest.ndv, 0, 0},
orders: []bool{false, casTest.dataSourceSorted, false, false},
rows: casTest.rows,
ctx: casTest.ctx,
genDataFunc: func(row int, typ *types.FieldType) interface{} {
switch typ.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return casTest.rawDataSmall
default:
panic("not implement")
}
},
})

b.ResetTimer()
Expand Down Expand Up @@ -679,6 +683,7 @@ type hashJoinTestCase struct {
joinType core.JoinType
disk bool
useOuterToBuild bool
rawData string
}

func (tc hashJoinTestCase) columns() []*expression.Column {
Expand All @@ -702,7 +707,7 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(nil, -1)
ctx.GetSessionVars().IndexLookupJoinConcurrency = 4
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}}
tc := &hashJoinTestCase{rows: 100000, concurrency: 4, ctx: ctx, keyIdx: []int{0, 1}, rawData: strings.Repeat("x", 5*1024)}
tc.cols = cols
tc.useOuterToBuild = useOuterToBuild
tc.joinType = joinType
Expand Down Expand Up @@ -762,7 +767,7 @@ func benchmarkHashJoinExecWithCase(b *testing.B, casTest *hashJoinTestCase) {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return rawData
return casTest.rawData
case mysql.TypeDouble:
return float64(row)
default:
Expand Down Expand Up @@ -915,7 +920,7 @@ func benchmarkBuildHashTableForList(b *testing.B, casTest *hashJoinTestCase) {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(row)
case mysql.TypeVarString:
return rawData
return casTest.rawData
default:
panic("not implement")
}
Expand Down Expand Up @@ -994,6 +999,7 @@ type indexJoinTestCase struct {
innerJoinKeyIdx []int
innerIdx []int
needOuterSort bool
rawData string
}

func (tc indexJoinTestCase) columns() []*expression.Column {
Expand All @@ -1019,6 +1025,7 @@ func defaultIndexJoinTestCase() *indexJoinTestCase {
outerJoinKeyIdx: []int{0, 1},
innerJoinKeyIdx: []int{0, 1},
innerIdx: []int{0, 1},
rawData: strings.Repeat("x", 5*1024),
}
return tc
}
Expand All @@ -1039,7 +1046,7 @@ func (tc indexJoinTestCase) getMockDataSourceOptByRows(rows int) mockDataSourceP
case mysql.TypeDouble:
return float64(row)
case mysql.TypeVarString:
return rawData
return tc.rawData
default:
panic("not implement")
}
Expand Down Expand Up @@ -1316,6 +1323,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
outerJoinKeyIdx: []int{0, 1},
innerJoinKeyIdx: []int{0, 1},
innerIdx: []int{0, 1},
rawData: strings.Repeat("x", 5*1024),
}
tc = &mergeJoinTestCase{*itc}
outerOpt := mockDataSourceParameters{
Expand All @@ -1329,7 +1337,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
case mysql.TypeDouble:
return float64(row)
case mysql.TypeVarString:
return rawData
return tc.rawData
default:
panic("not implement")
}
Expand All @@ -1348,7 +1356,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
case mysql.TypeDouble:
return float64(row)
case mysql.TypeVarString:
return rawData
return tc.rawData
default:
panic("not implement")
}
Expand Down Expand Up @@ -1443,3 +1451,105 @@ func BenchmarkMergeJoinExec(b *testing.B) {
})
}
}

type sortCase struct {
rows int
orderByIdx []int
ndvs []int
ctx sessionctx.Context
}

func (tc sortCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}

func (tc sortCase) String() string {
return fmt.Sprintf("(rows:%v, orderBy:%v, ndvs: %v)", tc.rows, tc.orderByIdx, tc.ndvs)
}

func defaultSortTestCase() *sortCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
tc := &sortCase{rows: 300000, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
return tc
}

func benchmarkSortExec(b *testing.B, cas *sortCase) {
opt := mockDataSourceParameters{
schema: expression.NewSchema(cas.columns()...),
rows: cas.rows,
ctx: cas.ctx,
ndvs: cas.ndvs,
}
dataSource := buildMockDataSource(opt)
exec := &SortExec{
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, stringutil.StringerStr("sort"), dataSource),
ByItems: make([]*core.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.schema,
}
for _, idx := range cas.orderByIdx {
exec.ByItems = append(exec.ByItems, &core.ByItems{Expr: cas.columns()[idx]})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource.prepareChunks()

b.StartTimer()
if err := exec.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
if err := exec.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
if chk.NumRows() == 0 {
break
}
}

if err := exec.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkSortExec(b *testing.B) {
b.ReportAllocs()
cas := defaultSortTestCase()
// all random data
cas.ndvs = []int{0, 0}
cas.orderByIdx = []int{0, 1}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkSortExec(b, cas)
})

ndvs := []int{1, 10000}
for _, ndv := range ndvs {
cas.ndvs = []int{ndv, 0}
cas.orderByIdx = []int{0, 1}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkSortExec(b, cas)
})

cas.ndvs = []int{ndv, 0}
cas.orderByIdx = []int{0}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkSortExec(b, cas)
})

cas.ndvs = []int{ndv, 0}
cas.orderByIdx = []int{1}
b.Run(fmt.Sprintf("%v", cas), func(b *testing.B) {
benchmarkSortExec(b, cas)
})
}
}
91 changes: 91 additions & 0 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
)

var _ = Suite(&testExecSuite{})
Expand Down Expand Up @@ -234,3 +239,89 @@ func assertEqualStrings(c *C, got []field, expect []string) {
c.Assert(string(got[i].str), Equals, expect[i])
}
}

func (s *testExecSuite) TestSortSpillDisk(c *C) {
originCfg := config.GetGlobalConfig()
newConf := *originCfg
newConf.OOMUseTmpStorage = true
newConf.MemQuotaQuery = 1
config.StoreGlobalConfig(&newConf)
defer config.StoreGlobalConfig(originCfg)

ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, -1)
cas := &sortCase{rows: 2048, orderByIdx: []int{0, 1}, ndvs: []int{0, 0}, ctx: ctx}
opt := mockDataSourceParameters{
schema: expression.NewSchema(cas.columns()...),
rows: cas.rows,
ctx: cas.ctx,
ndvs: cas.ndvs,
}
dataSource := buildMockDataSource(opt)
exec := &SortExec{
baseExecutor: newBaseExecutor(cas.ctx, dataSource.schema, stringutil.StringerStr("sort"), dataSource),
ByItems: make([]*core.ByItems, 0, len(cas.orderByIdx)),
schema: dataSource.schema,
}
for _, idx := range cas.orderByIdx {
exec.ByItems = append(exec.ByItems, &core.ByItems{Expr: cas.columns()[idx]})
}
tmpCtx := context.Background()
chk := newFirstChunk(exec)
dataSource.prepareChunks()
err := exec.Open(tmpCtx)
c.Assert(err, IsNil)
for {
err = exec.Next(tmpCtx, chk)
c.Assert(err, IsNil)
if chk.NumRows() == 0 {
break
}
}
// Test only 1 partition and all data in memory.
c.Assert(len(exec.partitionList), Equals, 1)
c.Assert(exec.partitionList[0].AlreadySpilled(), Equals, false)
c.Assert(exec.partitionList[0].NumRow(), Equals, 2048)
err = exec.Close()
c.Assert(err, IsNil)

ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, 1)
dataSource.prepareChunks()
err = exec.Open(tmpCtx)
c.Assert(err, IsNil)
for {
err = exec.Next(tmpCtx, chk)
c.Assert(err, IsNil)
if chk.NumRows() == 0 {
break
}
}
// Test 2 partitions and all data in disk.
c.Assert(len(exec.partitionList), Equals, 2)
c.Assert(exec.partitionList[0].AlreadySpilled(), Equals, true)
c.Assert(exec.partitionList[1].AlreadySpilled(), Equals, true)
c.Assert(exec.partitionList[0].NumRow(), Equals, 1024)
c.Assert(exec.partitionList[1].NumRow(), Equals, 1024)
err = exec.Close()
c.Assert(err, IsNil)

ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(nil, 24000)
dataSource.prepareChunks()
err = exec.Open(tmpCtx)
c.Assert(err, IsNil)
for {
err = exec.Next(tmpCtx, chk)
c.Assert(err, IsNil)
if chk.NumRows() == 0 {
break
}
}
// Test only 1 partition but spill disk.
c.Assert(len(exec.partitionList), Equals, 1)
c.Assert(exec.partitionList[0].AlreadySpilled(), Equals, true)
c.Assert(exec.partitionList[0].NumRow(), Equals, 2048)
err = exec.Close()
c.Assert(err, IsNil)
}
Loading

0 comments on commit 13bf6cc

Please sign in to comment.