Skip to content

Commit

Permalink
external engine: support concurrent read and split into smaller data (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Oct 11, 2023
1 parent 533c4df commit aa99c16
Show file tree
Hide file tree
Showing 13 changed files with 345 additions and 105 deletions.
13 changes: 7 additions & 6 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ type LocalEngineConfig struct {

// ExternalEngineConfig is the configuration used for local backend external engine.
type ExternalEngineConfig struct {
StorageURI string
DataFiles []string
StatFiles []string
MinKey []byte
MaxKey []byte
SplitKeys [][]byte
StorageURI string
DataFiles []string
StatFiles []string
MinKey []byte
MaxKey []byte
SplitKeys [][]byte
RegionSplitSize int64
// TotalFileSize can be an estimated value.
TotalFileSize int64
// TotalKVCount can be an estimated value.
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
"//br/pkg/storage",
Expand Down Expand Up @@ -57,7 +58,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 39,
shard_count = 40,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
190 changes: 141 additions & 49 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,34 @@ import (
"bytes"
"context"
"encoding/hex"
"slices"
"sort"
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// Engine stored sorted key/value pairs in an external storage.
type Engine struct {
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
minKey []byte
maxKey []byte
splitKeys [][]byte
bufPool *membuf.Pool

iter *MergeKVIter
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
minKey []byte
maxKey []byte
splitKeys [][]byte
regionSplitSize int64
bufPool *membuf.Pool

keyAdapter common.KeyAdapter
duplicateDetection bool
Expand All @@ -66,6 +68,7 @@ func NewExternalEngine(
minKey []byte,
maxKey []byte,
splitKeys [][]byte,
regionSplitSize int64,
keyAdapter common.KeyAdapter,
duplicateDetection bool,
duplicateDB *pebble.DB,
Expand All @@ -81,6 +84,7 @@ func NewExternalEngine(
minKey: minKey,
maxKey: maxKey,
splitKeys: splitKeys,
regionSplitSize: regionSplitSize,
bufPool: membuf.NewPool(),
keyAdapter: keyAdapter,
duplicateDetection: duplicateDetection,
Expand All @@ -94,12 +98,100 @@ func NewExternalEngine(
}
}

func split[T any](in []T, groupNum int) [][]T {
if len(in) == 0 {
return nil
}
if groupNum <= 0 {
groupNum = 1
}
ceil := (len(in) + groupNum - 1) / groupNum
ret := make([][]T, 0, groupNum)
l := len(in)
for i := 0; i < l; i += ceil {
if i+ceil > l {
ret = append(ret, in[i:])
} else {
ret = append(ret, in[i:i+ceil])
}
}
return ret
}

// LoadIngestData loads the data from the external storage to memory in [start,
// end) range, so local backend can ingest it. The used byte slice of ingest data
// are allocated from Engine.bufPool and must be released by
// MemoryIngestData.Finish(). For external.Engine, LoadIngestData must be called
// with strictly increasing start / end key.
func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common.IngestData, error) {
// MemoryIngestData.DecRef().
func (e *Engine) LoadIngestData(
ctx context.Context,
regionRanges []common.Range,
outCh chan<- common.DataAndRange,
) error {
// estimate we will open at most 1000 files, so if e.dataFiles is small we can
// try to concurrently process ranges.
concurrency := int(MergeSortOverlapThreshold) / len(e.dataFiles)
concurrency = min(concurrency, 8)
rangeGroups := split(regionRanges, concurrency)

eg, egCtx := errgroup.WithContext(ctx)
for _, ranges := range rangeGroups {
ranges := ranges
eg.Go(func() error {
iter, err := e.createMergeIter(egCtx, ranges[0].Start)
if err != nil {
return errors.Trace(err)
}
defer iter.Close()

if !iter.Next() {
return iter.Error()
}
for _, r := range ranges {
results, err := e.loadIngestData(egCtx, iter, r.Start, r.End)
if err != nil {
return errors.Trace(err)
}
for _, result := range results {
select {
case <-egCtx.Done():
return egCtx.Err()
case outCh <- result:
}
}
}
return nil
})
}
return eg.Wait()
}

func (e *Engine) buildIngestData(keys, values [][]byte, buf *membuf.Buffer) *MemoryIngestData {
return &MemoryIngestData{
keyAdapter: e.keyAdapter,
duplicateDetection: e.duplicateDetection,
duplicateDB: e.duplicateDB,
dupDetectOpt: e.dupDetectOpt,
keys: keys,
values: values,
ts: e.ts,
memBuf: buf,
refCnt: atomic.NewInt64(0),
importedKVSize: e.importedKVSize,
importedKVCount: e.importedKVCount,
}
}

// LargeRegionSplitDataThreshold is exposed for test.
var LargeRegionSplitDataThreshold = int(config.SplitRegionSize)

// loadIngestData loads the data from the external storage to memory in [start,
// end) range, and if the range is large enough, it will return multiple data.
// The input `iter` should be called Next() before calling this function.
func (e *Engine) loadIngestData(
ctx context.Context,
iter *MergeKVIter,
start, end []byte,
) ([]common.DataAndRange, error) {
if bytes.Equal(start, end) {
return nil, errors.Errorf("start key and end key must not be the same: %s",
hex.EncodeToString(start))
Expand All @@ -109,54 +201,59 @@ func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common.
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
memBuf := e.bufPool.NewBuffer()

if e.iter == nil {
iter, err := e.createMergeIter(ctx, start)
if err != nil {
return nil, errors.Trace(err)
}
e.iter = iter
} else {
// there should be a key that just exceeds the end key in last LoadIngestData
// invocation.
k, v := e.iter.Key(), e.iter.Value()
cnt := 0
size := 0
largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize)
ret := make([]common.DataAndRange, 0, 1)
curStart := start

// there should be a key that just exceeds the end key in last loadIngestData
// invocation.
k, v := iter.Key(), iter.Value()
if len(k) > 0 {
keys = append(keys, memBuf.AddBytes(k))
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
}

cnt := 0
for e.iter.Next() {
cnt++
k, v := e.iter.Key(), e.iter.Value()
for iter.Next() {
k, v = iter.Key(), iter.Value()
if bytes.Compare(k, start) < 0 {
continue
}
if bytes.Compare(k, end) >= 0 {
break
}
if largeRegion && size > LargeRegionSplitDataThreshold {
curKey := slices.Clone(k)
ret = append(ret, common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Range: common.Range{Start: curStart, End: curKey},
})
keys = make([][]byte, 0, 1024)
values = make([][]byte, 0, 1024)
size = 0
curStart = curKey
}

keys = append(keys, memBuf.AddBytes(k))
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
}
if e.iter.Error() != nil {
return nil, errors.Trace(e.iter.Error())
if iter.Error() != nil {
return nil, errors.Trace(iter.Error())
}

logutil.Logger(ctx).Info("load data from external storage",
zap.Duration("cost time", time.Since(now)),
zap.Int("iterated count", cnt))
return &MemoryIngestData{
keyAdapter: e.keyAdapter,
duplicateDetection: e.duplicateDetection,
duplicateDB: e.duplicateDB,
dupDetectOpt: e.dupDetectOpt,
keys: keys,
values: values,
ts: e.ts,
memBuf: memBuf,
refCnt: atomic.NewInt64(0),
importedKVSize: e.importedKVSize,
importedKVCount: e.importedKVCount,
}, nil
ret = append(ret, common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Range: common.Range{Start: curStart, End: end},
})
return ret, nil
}

func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) {
Expand Down Expand Up @@ -227,13 +324,8 @@ func (e *Engine) SplitRanges(
return ranges, nil
}

// Close releases the resources of the engine.
func (e *Engine) Close() error {
if e.iter == nil {
return nil
}
return errors.Trace(e.iter.Close())
}
// Close implements common.Engine.
func (e *Engine) Close() error { return nil }

// MemoryIngestData is the in-memory implementation of IngestData.
type MemoryIngestData struct {
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,47 @@ func TestMemoryIngestData(t *testing.T) {
testNewIter(t, data, []byte("key6"), []byte("key9"), nil, nil)
checkDupDB(t, db, nil, nil)
}

func TestSplit(t *testing.T) {
cases := []struct {
input []int
conc int
expected [][]int
}{
{
input: []int{1, 2, 3, 4, 5},
conc: 1,
expected: [][]int{{1, 2, 3, 4, 5}},
},
{
input: []int{1, 2, 3, 4, 5},
conc: 2,
expected: [][]int{{1, 2, 3}, {4, 5}},
},
{
input: []int{1, 2, 3, 4, 5},
conc: 0,
expected: [][]int{{1, 2, 3, 4, 5}},
},
{
input: []int{1, 2, 3, 4, 5},
conc: 5,
expected: [][]int{{1}, {2}, {3}, {4}, {5}},
},
{
input: []int{},
conc: 5,
expected: nil,
},
{
input: []int{1, 2, 3, 4, 5},
conc: 100,
expected: [][]int{{1}, {2}, {3}, {4}, {5}},
},
}

for _, c := range cases {
got := split(c.input, c.conc)
require.Equal(t, c.expected, got)
}
}
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/external/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (r *RangeSplitter) Close() error {
return r.propIter.Close()
}

// GetRangeSplitSize returns the expected size of one range.
func (r *RangeSplitter) GetRangeSplitSize() int64 {
return r.rangeSize
}

// SplitOneRangesGroup splits one group of ranges. `endKeyOfGroup` represents the
// end key of the group, but it will be nil when the group is the last one.
// `dataFiles` and `statFiles` are all the files that have overlapping key ranges
Expand Down
15 changes: 13 additions & 2 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,8 +1046,19 @@ func (e *Engine) Finish(totalBytes, totalCount int64) {

// LoadIngestData return (local) Engine itself because Engine has implemented
// IngestData interface.
func (e *Engine) LoadIngestData(_ context.Context, _, _ []byte) (common.IngestData, error) {
return e, nil
func (e *Engine) LoadIngestData(
ctx context.Context,
regionRanges []common.Range,
outCh chan<- common.DataAndRange,
) error {
for _, r := range regionRanges {
select {
case <-ctx.Done():
return ctx.Err()
case outCh <- common.DataAndRange{Data: e, Range: r}:
}
}
return nil
}

type sstMeta struct {
Expand Down
Loading

0 comments on commit aa99c16

Please sign in to comment.