Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/driver: Use BatchGet method supported in client-go. #29860

Merged
merged 9 commits into from
Nov 24, 2021
91 changes: 52 additions & 39 deletions store/driver/txn/batch_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,54 @@ package txn

import (
"context"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/txnkv/transaction"
)

// tikvBatchGetter is the BatchGetter struct for tikv
type tikvBatchGetter struct {
tidbBatchGetter BatchGetter
}

func (b tikvBatchGetter) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
// toTiDBKeys
kvKeys := *(*[]kv.Key)(unsafe.Pointer(&keys))
vals, err := b.tidbBatchGetter.BatchGet(ctx, kvKeys)
return vals, err
}

// tikvBatchBufferGetter is the BatchBufferGetter struct for tikv
dwangxxx marked this conversation as resolved.
Show resolved Hide resolved
type tikvBatchBufferGetter struct {
tidbMiddleCache Getter
tidbBuffer BatchBufferGetter
}

func (b tikvBatchBufferGetter) Get(k []byte) ([]byte, error) {
dwangxxx marked this conversation as resolved.
Show resolved Hide resolved
// Get from buffer
val, err := b.tidbBuffer.Get(context.TODO(), k)
if err == nil || !kv.IsErrNotFound(err) || b.tidbMiddleCache == nil {
if kv.IsErrNotFound(err) {
err = tikverr.ErrNotExist
}
return val, err
}
// Get from middle cache
val, err = b.tidbMiddleCache.Get(context.TODO(), k)
if err == nil {
return val, err
}
// TiDB err NotExist to TiKV err NotExist
dwangxxx marked this conversation as resolved.
Show resolved Hide resolved
err = tikverr.ErrNotExist
return val, err
}

func (b tikvBatchBufferGetter) Len() int {
return b.tidbBuffer.Len()
}

// BatchBufferGetter is the interface for BatchGet.
type BatchBufferGetter interface {
Len() int
Expand All @@ -42,50 +85,20 @@ type Getter interface {

// BufferBatchGetter is the type for BatchGet with MemBuffer.
type BufferBatchGetter struct {
buffer BatchBufferGetter
middle Getter
snapshot BatchGetter
tikvBufferBatchGetter transaction.BufferBatchGetter
}

// NewBufferBatchGetter creates a new BufferBatchGetter.
func NewBufferBatchGetter(buffer BatchBufferGetter, middleCache Getter, snapshot BatchGetter) *BufferBatchGetter {
return &BufferBatchGetter{buffer: buffer, middle: middleCache, snapshot: snapshot}
tikvBuffer := tikvBatchBufferGetter{tidbMiddleCache: middleCache, tidbBuffer: buffer}
tikvSnapshot := tikvBatchGetter{snapshot}
return &BufferBatchGetter{tikvBufferBatchGetter: *transaction.NewBufferBatchGetter(tikvBuffer, tikvSnapshot)}
}

// BatchGet implements the BatchGetter interface.
func (b *BufferBatchGetter) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
if b.buffer.Len() == 0 {
return b.snapshot.BatchGet(ctx, keys)
}
bufferValues := make([][]byte, len(keys))
shrinkKeys := make([]kv.Key, 0, len(keys))
for i, key := range keys {
val, err := b.buffer.Get(ctx, key)
if err == nil {
bufferValues[i] = val
continue
}
if !kv.IsErrNotFound(err) {
return nil, errors.Trace(err)
}
if b.middle != nil {
val, err = b.middle.Get(ctx, key)
if err == nil {
bufferValues[i] = val
continue
}
}
shrinkKeys = append(shrinkKeys, key)
}
storageValues, err := b.snapshot.BatchGet(ctx, shrinkKeys)
if err != nil {
return nil, errors.Trace(err)
}
for i, key := range keys {
if len(bufferValues[i]) == 0 {
continue
}
storageValues[string(key)] = bufferValues[i]
}
return storageValues, nil
tikvKeys := toTiKVKeys(keys)
storageValues, err := b.tikvBufferBatchGetter.BatchGet(ctx, tikvKeys)

return storageValues, err
}