diff --git a/executor/builder.go b/executor/builder.go index 9b8465268121a..2e8ea67429ebf 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1033,6 +1033,23 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo defaultValues = make([]types.Datum, e.innerExec.Schema().Len()) } } + + if e.joinType == plannercore.InnerJoin || e.joinType == plannercore.SemiJoin { + if outerReader, ok := e.outerExec.(*TableReaderExecutor); ok { + bfSize := uint64(e.innerEstCount) / 50 + if bfSize < 10 { + bfSize = 10 + } + e.bloomFilter = make([]uint64, bfSize) + outerReader.bloomFilter = e.bloomFilter + + outerReader.joinKeyIdx = make([]int64, len(e.outerKeys)) + for i := range e.innerKeys { + outerReader.joinKeyIdx[i] = int64(e.outerKeys[i].Index) + } + } + } + e.joiners = make([]joiner, e.concurrency) for i := uint(0); i < e.concurrency; i++ { e.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, diff --git a/executor/join.go b/executor/join.go index d1cf41533d227..37c3d5282cdfa 100644 --- a/executor/join.go +++ b/executor/join.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "hash/fnv" "sync" "sync/atomic" @@ -24,6 +25,8 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/memory" @@ -67,6 +70,11 @@ type HashJoinExec struct { joinChkResourceCh []chan *chunk.Chunk joinResultCh chan *hashjoinWorkerResult + // bloomFilter will be built from inner table and sent to outer table + // data source to "pre-filter" data. + // bloomFilter is initialized in executor builder. + bloomFilter []uint64 + memTracker *memory.Tracker // track memory usage. prepared bool isOuterJoin bool @@ -128,7 +136,7 @@ func (e *HashJoinExec) Close() error { // Open implements the Executor Open interface. func (e *HashJoinExec) Open(ctx context.Context) error { - if err := e.baseExecutor.Open(ctx); err != nil { + if err := e.innerExec.Open(ctx); err != nil { return err } @@ -166,18 +174,11 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) { required := int(atomic.LoadInt64(&e.requiredRows)) outerResult.SetRequiredRows(required, e.maxChunkSize) } - err := Next(ctx, e.outerExec, outerResult) - if err != nil { - e.joinResultCh <- &hashjoinWorkerResult{ - err: err, - } - return - } if !hasWaitedForInner { - if outerResult.NumRows() == 0 { - e.finished.Store(true) - return - } + // if outerResult.NumRows() == 0 { + // e.finished.Store(true) + // return + // } jobFinished, innerErr := e.wait4Inner() if innerErr != nil { e.joinResultCh <- &hashjoinWorkerResult{ @@ -189,6 +190,13 @@ func (e *HashJoinExec) fetchOuterChunks(ctx context.Context) { } hasWaitedForInner = true } + err := Next(ctx, e.outerExec, outerResult) + if err != nil { + e.joinResultCh <- &hashjoinWorkerResult{ + err: err, + } + return + } if outerResult.NumRows() == 0 { return @@ -487,6 +495,12 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) { e.innerFinished <- errors.Trace(err) close(doneCh) } + + if err := e.outerExec.Open(ctx); err != nil { + e.innerFinished <- errors.Trace(err) + close(doneCh) + } + // Wait fetchInnerRows be finished. // 1. if buildHashTableForList fails // 2. if outerResult.NumRows() == 0, fetchOutChunks will not wait for inner. @@ -520,6 +534,37 @@ func (e *HashJoinExec) buildHashTableForList(innerResultCh <-chan *chunk.Chunk) if err != nil { return err } + if e.bloomFilter != nil { + err = e.putChunk2BloomFilter(chk, innerKeyColIdx, allTypes) + if err != nil { + return err + } + } + } + return nil +} + +func (e *HashJoinExec) putChunk2BloomFilter(chk *chunk.Chunk, innerKeyColIdx []int, allTypes []*types.FieldType) error { + // We use []uint64 as bit array for bloom filter, a single uint64 is a unit + bfUnitLen := uint64(64) // a uint64 has 64 bits + bfUnitsNum := uint64(len(e.bloomFilter)) + bfBitLen := bfUnitLen * bfUnitsNum + + numRows := chk.NumRows() + for i := 0; i <= numRows-1; i++ { + row := chk.GetRow(i) + hasher := fnv.New64a() + for _, colIdx := range innerKeyColIdx { + datum := row.GetDatum(colIdx, allTypes[colIdx]) + encoded, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, nil, datum) + if err != nil { + return err + } + _, _ = hasher.Write(encoded) + } + hashSum := hasher.Sum64() + hashSum %= bfBitLen + e.bloomFilter[hashSum/bfUnitLen] |= 1 << (hashSum % bfUnitLen) } return nil } diff --git a/executor/table_reader.go b/executor/table_reader.go index 050388e7a63b0..73c2201448ac1 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -16,6 +16,7 @@ package executor import ( "context" "fmt" + "time" "github.com/opentracing/opentracing-go" "github.com/pingcap/parser/model" @@ -72,6 +73,10 @@ type TableReaderExecutor struct { memTracker *memory.Tracker selectResultHook // for testing + // bloomFilter is built in hash join executor before `Open` is called here. + bloomFilter []uint64 + joinKeyIdx []int64 + keepOrder bool desc bool streaming bool @@ -100,6 +105,15 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { return err } } + + if e.bloomFilter != nil { + bfExec := &tipb.BloomFilter{ + BitSet: e.bloomFilter, + ColIdx: e.joinKeyIdx, + } + e.dagPB.Executors = append(e.dagPB.Executors, &tipb.Executor{Tp: tipb.ExecType_TypeBloomFilter, BloomFilter: bfExec}) + } + if e.runtimeStats != nil { collExec := true e.dagPB.CollectExecutionSummaries = &collExec @@ -157,6 +171,7 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error e.feedback.Invalidate() return err } + time.Sleep(time.Millisecond) return nil } diff --git a/store/mockstore/mocktikv/cop_handler_dag.go b/store/mockstore/mocktikv/cop_handler_dag.go index 155bac5c346aa..adc11894345db 100644 --- a/store/mockstore/mocktikv/cop_handler_dag.go +++ b/store/mockstore/mocktikv/cop_handler_dag.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/bloom" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" mockpkg "github.com/pingcap/tidb/util/mock" @@ -157,6 +158,8 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, currExec, err = h.buildIndexScan(ctx, curr) case tipb.ExecType_TypeSelection: currExec, err = h.buildSelection(ctx, curr) + case tipb.ExecType_TypeBloomFilter: + currExec, err = h.buildBloomFilterExec(ctx, curr) case tipb.ExecType_TypeAggregation: currExec, err = h.buildHashAgg(ctx, curr) case tipb.ExecType_TypeStreamAgg: @@ -248,6 +251,18 @@ func (h *rpcHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (* return e, nil } +func (h *rpcHandler) buildBloomFilterExec(ctx *dagContext, executor *tipb.Executor) (*bloomFilterExec, error) { + relatedColumnOffsets := executor.BloomFilter.GetColIdx() + filter, _ := bloom.NewBloomFilterBySlice(executor.BloomFilter.GetBitSet()) + + return &bloomFilterExec{ + bf: filter, + relatedColOffsets: relatedColumnOffsets, + evalCtx: ctx.evalCtx, + execDetail: new(execDetail), + }, nil +} + func (h *rpcHandler) buildSelection(ctx *dagContext, executor *tipb.Executor) (*selectionExec, error) { var err error var relatedColOffsets []int diff --git a/store/mockstore/mocktikv/executor.go b/store/mockstore/mocktikv/executor.go index 7808e18cff554..2ce6955f7bb19 100644 --- a/store/mockstore/mocktikv/executor.go +++ b/store/mockstore/mocktikv/executor.go @@ -16,6 +16,7 @@ package mocktikv import ( "bytes" "context" + "fmt" "sort" "time" @@ -28,6 +29,7 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/bloom" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tipb/go-tipb" @@ -39,6 +41,7 @@ var ( _ executor = &selectionExec{} _ executor = &limitExec{} _ executor = &topNExec{} + _ executor = &bloomFilterExec{} ) type execDetail struct { @@ -720,3 +723,99 @@ func convertToExprs(sc *stmtctx.StatementContext, fieldTps []*types.FieldType, p } return exprs, nil } + +type bloomFilterExec struct { + bf *bloom.BloomFilter + relatedColOffsets []int64 + evalCtx *evalContext + src executor + execDetail *execDetail + // row []types.Datum + match float32 + total float32 +} + +func (e *bloomFilterExec) ExecDetails() []*execDetail { + var suffix []*execDetail + if e.src != nil { + suffix = e.src.ExecDetails() + } + return append(suffix, e.execDetail) +} + +func (e *bloomFilterExec) SetSrcExec(exec executor) { + e.src = exec +} + +func (e *bloomFilterExec) GetSrcExec() executor { + return e.src +} + +func (e *bloomFilterExec) ResetCounts() { + e.src.ResetCounts() +} + +func (e *bloomFilterExec) Counts() []int64 { + return e.src.Counts() +} + +func (e *bloomFilterExec) Cursor() ([]byte, bool) { + return e.src.Cursor() +} + +func (e *bloomFilterExec) Next(ctx context.Context) (value [][]byte, err error) { + defer func(begin time.Time) { + e.execDetail.update(begin, value) + }(time.Now()) + for { + value, err = e.src.Next(ctx) + if err != nil { + return nil, errors.Trace(err) + } + if value == nil { + fmt.Println("\033[0;32mMatched:", int(e.match), " Total:", int(e.total), " Match Rate:", e.match/e.total, "\033[0m") + return nil, nil + } + + // if any of the attribute is null, continue + flag := checkIsNull(e.evalCtx, value, e.relatedColOffsets) + if flag { + continue + } + + key := buildKey(value, e.relatedColOffsets) + e.total++ + match := e.bf.Probe(key) + if match { + e.match++ + return value, nil + } + } +} + +// return true, if any one of the choice attributes value is null. +// return false on all attributes value is not null. +func checkIsNull(e *evalContext, value [][]byte, relatedColOffsets []int64) bool { + for _, offset := range relatedColOffsets { + // data, err := tablecodec.DecodeColumnValue(value[offset], e.fieldTps[offset], e.sc.TimeZone) + // if err != nil { + // return true, errors.Trace(err) + // } + // if data.IsNull() { + // return true, nil + // } + if value[offset][0] == codec.NilFlag { + return true + } + } + return false +} + +// build the key for probe the bloom filter, by concatenate related col's value. +func buildKey(value [][]byte, relatedColOffsets []int64) []byte { + var key []byte + for _, offset := range relatedColOffsets { + key = append(key, value[offset]...) + } + return key +} diff --git a/util/bloom/bloomfilter.go b/util/bloom/bloomfilter.go new file mode 100644 index 0000000000000..298e1bc2ca00a --- /dev/null +++ b/util/bloom/bloomfilter.go @@ -0,0 +1,67 @@ +package bloom + +import ( + "fmt" + "hash/fnv" +) + +// BloomFilter a simple abstraction of bloom filter +type BloomFilter struct { + bitSet []uint64 + length uint64 + unitSize uint64 +} + +func NewBloomFilter(length int) (*BloomFilter, error) { + if length <= 0 { + return nil, fmt.Errorf("length is not positive") + } + bitset := make([]uint64, length) + bits := uint64(64) + return &BloomFilter{ + bitSet: bitset, + length: bits * uint64(length), + unitSize: bits, + }, nil +} + +// NewBloomFilterBySlice create a bloom filter by the given slice +func NewBloomFilterBySlice(bs []uint64) (*BloomFilter, error) { + if len(bs) == 0 { + return nil, fmt.Errorf("len(bs) == 0") + } + + bits := uint64(64) + return &BloomFilter{ + bitSet: bs, + length: bits * uint64(len(bs)), + unitSize: bits, + }, nil +} + +// Insert a key into the filter +func (bf *BloomFilter) Insert(key []byte) { + idx, shift := bf.hash(key) + bf.bitSet[idx] |= 1 << shift +} + +// Probe check whether the given key is in the filter +func (bf *BloomFilter) Probe(key []byte) bool { + idx, shift := bf.hash(key) + + return bf.bitSet[idx]&(1<