Skip to content

Commit

Permalink
mocktikv: add bloomfilterExec to mocktikv
Browse files Browse the repository at this point in the history
  • Loading branch information
time-and-fate committed Oct 29, 2019
1 parent 564ed84 commit c719d05
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 0 deletions.
18 changes: 18 additions & 0 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/bloom"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
mockpkg "github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -157,6 +158,8 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor,
currExec, err = h.buildIndexScan(ctx, curr)
case tipb.ExecType_TypeSelection:
currExec, err = h.buildSelection(ctx, curr)
case tipb.ExecType_TypeBloomFilter:
currExec, err = h.buildBloomFilterExec(ctx, curr)
case tipb.ExecType_TypeAggregation:
currExec, err = h.buildHashAgg(ctx, curr)
case tipb.ExecType_TypeStreamAgg:
Expand Down Expand Up @@ -248,6 +251,21 @@ func (h *rpcHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (*
return e, nil
}

func (h *rpcHandler) buildBloomFilterExec(ctx *dagContext, executor *tipb.Executor) (*bloomFilterExec, error) {
relatedColumnOffsets := executor.BloomFilter.GetColIdx()
filter, err := bloom.NewFilterBySlice(executor.BloomFilter.GetBitSet())
if err != nil {
return nil, err
}

return &bloomFilterExec{
bf: filter,
relatedColOffsets: relatedColumnOffsets,
evalCtx: ctx.evalCtx,
execDetail: new(execDetail),
}, nil
}

func (h *rpcHandler) buildSelection(ctx *dagContext, executor *tipb.Executor) (*selectionExec, error) {
var err error
var relatedColOffsets []int
Expand Down
99 changes: 99 additions & 0 deletions store/mockstore/mocktikv/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mocktikv
import (
"bytes"
"context"
"fmt"
"sort"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/bloom"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -39,6 +41,7 @@ var (
_ executor = &selectionExec{}
_ executor = &limitExec{}
_ executor = &topNExec{}
_ executor = &bloomFilterExec{}
)

type execDetail struct {
Expand Down Expand Up @@ -720,3 +723,99 @@ func convertToExprs(sc *stmtctx.StatementContext, fieldTps []*types.FieldType, p
}
return exprs, nil
}

type bloomFilterExec struct {
bf *bloom.Filter
relatedColOffsets []int64
evalCtx *evalContext
src executor
execDetail *execDetail
// row []types.Datum
match float32
total float32
}

func (e *bloomFilterExec) ExecDetails() []*execDetail {
var suffix []*execDetail
if e.src != nil {
suffix = e.src.ExecDetails()
}
return append(suffix, e.execDetail)
}

func (e *bloomFilterExec) SetSrcExec(exec executor) {
e.src = exec
}

func (e *bloomFilterExec) GetSrcExec() executor {
return e.src
}

func (e *bloomFilterExec) ResetCounts() {
e.src.ResetCounts()
}

func (e *bloomFilterExec) Counts() []int64 {
return e.src.Counts()
}

func (e *bloomFilterExec) Cursor() ([]byte, bool) {
return e.src.Cursor()
}

func (e *bloomFilterExec) Next(ctx context.Context) (value [][]byte, err error) {
defer func(begin time.Time) {
e.execDetail.update(begin, value)
}(time.Now())
for {
value, err = e.src.Next(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if value == nil {
fmt.Println("\033[0;32mMatched:", int(e.match), " Total:", int(e.total), " Match Rate:", e.match/e.total, "\033[0m")
return nil, nil
}

// if any of the attribute is null, continue
flag := checkIsNull(e.evalCtx, value, e.relatedColOffsets)
if flag {
continue
}

key := buildKey(value, e.relatedColOffsets)
e.total++
match := e.bf.Probe(key)
if match {
e.match++
return value, nil
}
}
}

// return true, if any one of the choice attributes value is null.
// return false on all attributes value is not null.
func checkIsNull(e *evalContext, value [][]byte, relatedColOffsets []int64) bool {
for _, offset := range relatedColOffsets {
// data, err := tablecodec.DecodeColumnValue(value[offset], e.fieldTps[offset], e.sc.TimeZone)
// if err != nil {
// return true, errors.Trace(err)
// }
// if data.IsNull() {
// return true, nil
// }
if value[offset][0] == codec.NilFlag {
return true
}
}
return false
}

// build the key for probe the bloom filter, by concatenate related col's value.
func buildKey(value [][]byte, relatedColOffsets []int64) []byte {
var key []byte
for _, offset := range relatedColOffsets {
key = append(key, value[offset]...)
}
return key
}

0 comments on commit c719d05

Please sign in to comment.