Skip to content

Commit

Permalink
executor, mocktikv, util: add lookahead information passing for hash …
Browse files Browse the repository at this point in the history
…join
  • Loading branch information
time-and-fate committed Oct 27, 2019
1 parent 076503a commit fe62059
Show file tree
Hide file tree
Showing 7 changed files with 398 additions and 12 deletions.
17 changes: 17 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,23 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
defaultValues = make([]types.Datum, e.innerExec.Schema().Len())
}
}

if e.joinType == plannercore.InnerJoin || e.joinType == plannercore.SemiJoin {
if outerReader, ok := e.outerExec.(*TableReaderExecutor); ok {
bfSize := uint64(e.innerEstCount) / 50
if bfSize < 10 {
bfSize = 10
}
e.bloomFilter = make([]uint64, bfSize)
outerReader.bloomFilter = e.bloomFilter

outerReader.joinKeyIdx = make([]int64, len(e.outerKeys))
for i := range e.innerKeys {
outerReader.joinKeyIdx[i] = int64(e.outerKeys[i].Index)
}
}
}

e.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
Expand Down
69 changes: 57 additions & 12 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"fmt"
"hash/fnv"
"sync"
"sync/atomic"

Expand All @@ -24,6 +25,8 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -67,6 +70,11 @@ type HashJoinExec struct {
joinChkResourceCh []chan *chunk.Chunk
joinResultCh chan *hashjoinWorkerResult

// bloomFilter will be built from inner table and sent to outer table
// data source to "pre-filter" data.
// bloomFilter is initialized in executor builder.
bloomFilter []uint64

memTracker *memory.Tracker // track memory usage.
prepared bool
isOuterJoin bool
Expand Down Expand Up @@ -128,7 +136,7 @@ func (e *HashJoinExec) Close() error {

// Open implements the Executor Open interface.
func (e *HashJoinExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
if err := e.innerExec.Open(ctx); err != nil {
return err
}

Expand Down Expand Up @@ -166,18 +174,11 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
required := int(atomic.LoadInt64(&e.requiredRows))
outerResult.SetRequiredRows(required, e.maxChunkSize)
}
err := Next(ctx, e.outerExec, outerResult)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: err,
}
return
}
if !hasWaitedForInner {
if outerResult.NumRows() == 0 {
e.finished.Store(true)
return
}
// if outerResult.NumRows() == 0 {
// e.finished.Store(true)
// return
// }
jobFinished, innerErr := e.wait4Inner()
if innerErr != nil {
e.joinResultCh <- &hashjoinWorkerResult{
Expand All @@ -189,6 +190,13 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) {
}
hasWaitedForInner = true
}
err := Next(ctx, e.outerExec, outerResult)
if err != nil {
e.joinResultCh <- &hashjoinWorkerResult{
err: err,
}
return
}

if outerResult.NumRows() == 0 {
return
Expand Down Expand Up @@ -487,6 +495,12 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
e.innerFinished <- errors.Trace(err)
close(doneCh)
}

if err := e.outerExec.Open(ctx); err != nil {
e.innerFinished <- errors.Trace(err)
close(doneCh)
}

// Wait fetchInnerRows be finished.
// 1. if buildHashTableForList fails
// 2. if outerResult.NumRows() == 0, fetchOutChunks will not wait for inner.
Expand Down Expand Up @@ -520,6 +534,37 @@ func (e *HashJoinExec) buildHashTableForList(innerResultCh <-chan *chunk.Chunk)
if err != nil {
return err
}
if e.bloomFilter != nil {
err = e.putChunk2BloomFilter(chk, innerKeyColIdx, allTypes)
if err != nil {
return err
}
}
}
return nil
}

func (e *HashJoinExec) putChunk2BloomFilter(chk *chunk.Chunk, innerKeyColIdx []int, allTypes []*types.FieldType) error {
// We use []uint64 as bit array for bloom filter, a single uint64 is a unit
bfUnitLen := uint64(64) // a uint64 has 64 bits
bfUnitsNum := uint64(len(e.bloomFilter))
bfBitLen := bfUnitLen * bfUnitsNum

numRows := chk.NumRows()
for i := 0; i <= numRows-1; i++ {
row := chk.GetRow(i)
hasher := fnv.New64a()
for _, colIdx := range innerKeyColIdx {
datum := row.GetDatum(colIdx, allTypes[colIdx])
encoded, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, nil, datum)
if err != nil {
return err
}
_, _ = hasher.Write(encoded)
}
hashSum := hasher.Sum64()
hashSum %= bfBitLen
e.bloomFilter[hashSum/bfUnitLen] |= 1 << (hashSum % bfUnitLen)
}
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"fmt"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -72,6 +73,10 @@ type TableReaderExecutor struct {
memTracker *memory.Tracker
selectResultHook // for testing

// bloomFilter is built in hash join executor before `Open` is called here.
bloomFilter []uint64
joinKeyIdx []int64

keepOrder bool
desc bool
streaming bool
Expand Down Expand Up @@ -100,6 +105,15 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
return err
}
}

if e.bloomFilter != nil {
bfExec := &tipb.BloomFilter{
BitSet: e.bloomFilter,
ColIdx: e.joinKeyIdx,
}
e.dagPB.Executors = append(e.dagPB.Executors, &tipb.Executor{Tp: tipb.ExecType_TypeBloomFilter, BloomFilter: bfExec})
}

if e.runtimeStats != nil {
collExec := true
e.dagPB.CollectExecutionSummaries = &collExec
Expand Down Expand Up @@ -157,6 +171,7 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error
e.feedback.Invalidate()
return err
}
time.Sleep(time.Millisecond)
return nil
}

Expand Down
15 changes: 15 additions & 0 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/bloom"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
mockpkg "github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -157,6 +158,8 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor,
currExec, err = h.buildIndexScan(ctx, curr)
case tipb.ExecType_TypeSelection:
currExec, err = h.buildSelection(ctx, curr)
case tipb.ExecType_TypeBloomFilter:
currExec, err = h.buildBloomFilterExec(ctx, curr)
case tipb.ExecType_TypeAggregation:
currExec, err = h.buildHashAgg(ctx, curr)
case tipb.ExecType_TypeStreamAgg:
Expand Down Expand Up @@ -248,6 +251,18 @@ func (h *rpcHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (*
return e, nil
}

func (h *rpcHandler) buildBloomFilterExec(ctx *dagContext, executor *tipb.Executor) (*bloomFilterExec, error) {
relatedColumnOffsets := executor.BloomFilter.GetColIdx()
filter, _ := bloom.NewBloomFilterBySlice(executor.BloomFilter.GetBitSet())

return &bloomFilterExec{
bf: filter,
relatedColOffsets: relatedColumnOffsets,
evalCtx: ctx.evalCtx,
execDetail: new(execDetail),
}, nil
}

func (h *rpcHandler) buildSelection(ctx *dagContext, executor *tipb.Executor) (*selectionExec, error) {
var err error
var relatedColOffsets []int
Expand Down
99 changes: 99 additions & 0 deletions store/mockstore/mocktikv/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mocktikv
import (
"bytes"
"context"
"fmt"
"sort"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/bloom"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -39,6 +41,7 @@ var (
_ executor = &selectionExec{}
_ executor = &limitExec{}
_ executor = &topNExec{}
_ executor = &bloomFilterExec{}
)

type execDetail struct {
Expand Down Expand Up @@ -720,3 +723,99 @@ func convertToExprs(sc *stmtctx.StatementContext, fieldTps []*types.FieldType, p
}
return exprs, nil
}

type bloomFilterExec struct {
bf *bloom.BloomFilter
relatedColOffsets []int64
evalCtx *evalContext
src executor
execDetail *execDetail
// row []types.Datum
match float32
total float32
}

func (e *bloomFilterExec) ExecDetails() []*execDetail {
var suffix []*execDetail
if e.src != nil {
suffix = e.src.ExecDetails()
}
return append(suffix, e.execDetail)
}

func (e *bloomFilterExec) SetSrcExec(exec executor) {
e.src = exec
}

func (e *bloomFilterExec) GetSrcExec() executor {
return e.src
}

func (e *bloomFilterExec) ResetCounts() {
e.src.ResetCounts()
}

func (e *bloomFilterExec) Counts() []int64 {
return e.src.Counts()
}

func (e *bloomFilterExec) Cursor() ([]byte, bool) {
return e.src.Cursor()
}

func (e *bloomFilterExec) Next(ctx context.Context) (value [][]byte, err error) {
defer func(begin time.Time) {
e.execDetail.update(begin, value)
}(time.Now())
for {
value, err = e.src.Next(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if value == nil {
fmt.Println("\033[0;32mMatched:", int(e.match), " Total:", int(e.total), " Match Rate:", e.match/e.total, "\033[0m")
return nil, nil
}

// if any of the attribute is null, continue
flag := checkIsNull(e.evalCtx, value, e.relatedColOffsets)
if flag {
continue
}

key := buildKey(value, e.relatedColOffsets)
e.total++
match := e.bf.Probe(key)
if match {
e.match++
return value, nil
}
}
}

// return true, if any one of the choice attributes value is null.
// return false on all attributes value is not null.
func checkIsNull(e *evalContext, value [][]byte, relatedColOffsets []int64) bool {
for _, offset := range relatedColOffsets {
// data, err := tablecodec.DecodeColumnValue(value[offset], e.fieldTps[offset], e.sc.TimeZone)
// if err != nil {
// return true, errors.Trace(err)
// }
// if data.IsNull() {
// return true, nil
// }
if value[offset][0] == codec.NilFlag {
return true
}
}
return false
}

// build the key for probe the bloom filter, by concatenate related col's value.
func buildKey(value [][]byte, relatedColOffsets []int64) []byte {
var key []byte
for _, offset := range relatedColOffsets {
key = append(key, value[offset]...)
}
return key
}
Loading

0 comments on commit fe62059

Please sign in to comment.