Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

external engine: support concurrent read and split into smaller data #47510

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ type LocalEngineConfig struct {

// ExternalEngineConfig is the configuration used for local backend external engine.
type ExternalEngineConfig struct {
StorageURI string
DataFiles []string
StatFiles []string
MinKey []byte
MaxKey []byte
SplitKeys [][]byte
StorageURI string
DataFiles []string
StatFiles []string
MinKey []byte
MaxKey []byte
SplitKeys [][]byte
RegionSplitSize int64
// TotalFileSize can be an estimated value.
TotalFileSize int64
// TotalKVCount can be an estimated value.
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
"//br/pkg/storage",
Expand Down Expand Up @@ -57,7 +58,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 39,
shard_count = 40,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
190 changes: 141 additions & 49 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,34 @@ import (
"bytes"
"context"
"encoding/hex"
"slices"
"sort"
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// Engine stored sorted key/value pairs in an external storage.
type Engine struct {
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
minKey []byte
maxKey []byte
splitKeys [][]byte
bufPool *membuf.Pool

iter *MergeKVIter
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
minKey []byte
maxKey []byte
splitKeys [][]byte
regionSplitSize int64
bufPool *membuf.Pool

keyAdapter common.KeyAdapter
duplicateDetection bool
Expand All @@ -66,6 +68,7 @@ func NewExternalEngine(
minKey []byte,
maxKey []byte,
splitKeys [][]byte,
regionSplitSize int64,
keyAdapter common.KeyAdapter,
duplicateDetection bool,
duplicateDB *pebble.DB,
Expand All @@ -81,6 +84,7 @@ func NewExternalEngine(
minKey: minKey,
maxKey: maxKey,
splitKeys: splitKeys,
regionSplitSize: regionSplitSize,
bufPool: membuf.NewPool(),
keyAdapter: keyAdapter,
duplicateDetection: duplicateDetection,
Expand All @@ -94,12 +98,100 @@ func NewExternalEngine(
}
}

func split[T any](in []T, groupNum int) [][]T {
if len(in) == 0 {
return nil
}
if groupNum <= 0 {
groupNum = 1
}
ceil := (len(in) + groupNum - 1) / groupNum
ret := make([][]T, 0, groupNum)
l := len(in)
for i := 0; i < l; i += ceil {
if i+ceil > l {
ret = append(ret, in[i:])
} else {
ret = append(ret, in[i:i+ceil])
}
}
return ret
}

// LoadIngestData loads the data from the external storage to memory in [start,
// end) range, so local backend can ingest it. The used byte slice of ingest data
// are allocated from Engine.bufPool and must be released by
// MemoryIngestData.Finish(). For external.Engine, LoadIngestData must be called
// with strictly increasing start / end key.
func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common.IngestData, error) {
// MemoryIngestData.DecRef().
func (e *Engine) LoadIngestData(
ctx context.Context,
regionRanges []common.Range,
outCh chan<- common.DataAndRange,
) error {
// estimate we will open at most 1000 files, so if e.dataFiles is small we can
// try to concurrently process ranges.
concurrency := int(MergeSortOverlapThreshold) / len(e.dataFiles)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set a lower/upper limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concurrency = min(concurrency, 8)
rangeGroups := split(regionRanges, concurrency)

eg, egCtx := errgroup.WithContext(ctx)
for _, ranges := range rangeGroups {
ranges := ranges
eg.Go(func() error {
iter, err := e.createMergeIter(egCtx, ranges[0].Start)
if err != nil {
return errors.Trace(err)
}
defer iter.Close()

if !iter.Next() {
return iter.Error()
}
for _, r := range ranges {
results, err := e.loadIngestData(egCtx, iter, r.Start, r.End)
if err != nil {
return errors.Trace(err)
}
for _, result := range results {
select {
case <-egCtx.Done():
return egCtx.Err()
case outCh <- result:
}
}
}
return nil
})
}
return eg.Wait()
}

func (e *Engine) buildIngestData(keys, values [][]byte, buf *membuf.Buffer) *MemoryIngestData {
return &MemoryIngestData{
keyAdapter: e.keyAdapter,
duplicateDetection: e.duplicateDetection,
duplicateDB: e.duplicateDB,
dupDetectOpt: e.dupDetectOpt,
keys: keys,
values: values,
ts: e.ts,
memBuf: buf,
refCnt: atomic.NewInt64(0),
importedKVSize: e.importedKVSize,
importedKVCount: e.importedKVCount,
}
}

// LargeRegionSplitDataThreshold is exposed for test.
var LargeRegionSplitDataThreshold = int(config.SplitRegionSize)

// loadIngestData loads the data from the external storage to memory in [start,
// end) range, and if the range is large enough, it will return multiple data.
// The input `iter` should be called Next() before calling this function.
func (e *Engine) loadIngestData(
ctx context.Context,
iter *MergeKVIter,
start, end []byte,
) ([]common.DataAndRange, error) {
if bytes.Equal(start, end) {
return nil, errors.Errorf("start key and end key must not be the same: %s",
hex.EncodeToString(start))
Expand All @@ -109,54 +201,59 @@ func (e *Engine) LoadIngestData(ctx context.Context, start, end []byte) (common.
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
memBuf := e.bufPool.NewBuffer()

if e.iter == nil {
iter, err := e.createMergeIter(ctx, start)
if err != nil {
return nil, errors.Trace(err)
}
e.iter = iter
} else {
// there should be a key that just exceeds the end key in last LoadIngestData
// invocation.
k, v := e.iter.Key(), e.iter.Value()
cnt := 0
size := 0
largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize)
ret := make([]common.DataAndRange, 0, 1)
curStart := start

// there should be a key that just exceeds the end key in last loadIngestData
// invocation.
k, v := iter.Key(), iter.Value()
if len(k) > 0 {
keys = append(keys, memBuf.AddBytes(k))
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
}

cnt := 0
for e.iter.Next() {
cnt++
k, v := e.iter.Key(), e.iter.Value()
for iter.Next() {
k, v = iter.Key(), iter.Value()
if bytes.Compare(k, start) < 0 {
continue
}
if bytes.Compare(k, end) >= 0 {
break
}
if largeRegion && size > LargeRegionSplitDataThreshold {
curKey := slices.Clone(k)
ret = append(ret, common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Range: common.Range{Start: curStart, End: curKey},
})
keys = make([][]byte, 0, 1024)
values = make([][]byte, 0, 1024)
size = 0
curStart = curKey
}

keys = append(keys, memBuf.AddBytes(k))
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
}
if e.iter.Error() != nil {
return nil, errors.Trace(e.iter.Error())
if iter.Error() != nil {
return nil, errors.Trace(iter.Error())
}

logutil.Logger(ctx).Info("load data from external storage",
zap.Duration("cost time", time.Since(now)),
zap.Int("iterated count", cnt))
return &MemoryIngestData{
keyAdapter: e.keyAdapter,
duplicateDetection: e.duplicateDetection,
duplicateDB: e.duplicateDB,
dupDetectOpt: e.dupDetectOpt,
keys: keys,
values: values,
ts: e.ts,
memBuf: memBuf,
refCnt: atomic.NewInt64(0),
importedKVSize: e.importedKVSize,
importedKVCount: e.importedKVCount,
}, nil
ret = append(ret, common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Range: common.Range{Start: curStart, End: end},
})
return ret, nil
}

func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) {
Expand Down Expand Up @@ -227,13 +324,8 @@ func (e *Engine) SplitRanges(
return ranges, nil
}

// Close releases the resources of the engine.
func (e *Engine) Close() error {
if e.iter == nil {
return nil
}
return errors.Trace(e.iter.Close())
}
// Close implements common.Engine.
func (e *Engine) Close() error { return nil }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doube confirm, external engine close do not need release any resource, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the rest lgtm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, local engine need to close its pebble DB. And before this PR external engine holds an iterator member that needs to be closed, but in this PR the iterator becomes a local variable that closed in line 144.


// MemoryIngestData is the in-memory implementation of IngestData.
type MemoryIngestData struct {
Expand Down
44 changes: 44 additions & 0 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,47 @@ func TestMemoryIngestData(t *testing.T) {
testNewIter(t, data, []byte("key6"), []byte("key9"), nil, nil)
checkDupDB(t, db, nil, nil)
}

func TestSplit(t *testing.T) {
cases := []struct {
input []int
conc int
expected [][]int
}{
{
input: []int{1, 2, 3, 4, 5},
conc: 1,
expected: [][]int{{1, 2, 3, 4, 5}},
},
{
input: []int{1, 2, 3, 4, 5},
conc: 2,
expected: [][]int{{1, 2, 3}, {4, 5}},
},
{
input: []int{1, 2, 3, 4, 5},
conc: 0,
expected: [][]int{{1, 2, 3, 4, 5}},
},
{
input: []int{1, 2, 3, 4, 5},
conc: 5,
expected: [][]int{{1}, {2}, {3}, {4}, {5}},
},
{
input: []int{},
conc: 5,
expected: nil,
},
{
input: []int{1, 2, 3, 4, 5},
conc: 100,
expected: [][]int{{1}, {2}, {3}, {4}, {5}},
},
}

for _, c := range cases {
got := split(c.input, c.conc)
require.Equal(t, c.expected, got)
}
}
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/external/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (r *RangeSplitter) Close() error {
return r.propIter.Close()
}

// GetRangeSplitSize returns the expected size of one range.
func (r *RangeSplitter) GetRangeSplitSize() int64 {
return r.rangeSize
}

// SplitOneRangesGroup splits one group of ranges. `endKeyOfGroup` represents the
// end key of the group, but it will be nil when the group is the last one.
// `dataFiles` and `statFiles` are all the files that have overlapping key ranges
Expand Down
15 changes: 13 additions & 2 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,8 +1046,19 @@ func (e *Engine) Finish(totalBytes, totalCount int64) {

// LoadIngestData return (local) Engine itself because Engine has implemented
// IngestData interface.
func (e *Engine) LoadIngestData(_ context.Context, _, _ []byte) (common.IngestData, error) {
return e, nil
func (e *Engine) LoadIngestData(
ctx context.Context,
regionRanges []common.Range,
outCh chan<- common.DataAndRange,
) error {
for _, r := range regionRanges {
select {
case <-ctx.Done():
return ctx.Err()
case outCh <- common.DataAndRange{Data: e, Range: r}:
}
}
return nil
}

type sstMeta struct {
Expand Down
Loading
Loading