Skip to content

Commit

Permalink
importinto: optimize write ingest step and fix oom (pingcap#48047)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Oct 31, 2023
1 parent 5f7b697 commit 99a4f35
Show file tree
Hide file tree
Showing 27 changed files with 260 additions and 113 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type ExternalEngineConfig struct {
TotalFileSize int64
// TotalKVCount can be an estimated value.
TotalKVCount int64
CheckHotspot bool
}

// CheckCtx contains all parameters used in CheckRequirements
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

Expand All @@ -61,7 +62,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 41,
shard_count = 42,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
120 changes: 84 additions & 36 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ import (
"golang.org/x/sync/errgroup"
)

// during test on ks3, we found that we can open about 8000 connections to ks3,
// bigger than that, we might receive "connection reset by peer" error, and
// the read speed will be very slow, still investigating the reason.
// Also open too many connections will take many memory in kernel, and the
// test is based on k8s pod, not sure how it will behave on EC2.
// but, ks3 supporter says there's no such limit on connections.
// And our target for global sort is AWS s3, this default value might not fit well.
// TODO: adjust it according to cloud storage.
const maxCloudStorageConnections = 8000

// Engine stored sorted key/value pairs in an external storage.
type Engine struct {
storage storage.ExternalStorage
Expand All @@ -47,11 +57,19 @@ type Engine struct {
splitKeys [][]byte
regionSplitSize int64
bufPool *membuf.Pool
// checkHotspot is true means we will check hotspot file when using MergeKVIter.
// if hotspot file is detected, we will use multiple readers to read data.
// if it's false, MergeKVIter will read each file using 1 reader.
// this flag also affects the strategy of loading data, either:
// less load routine + check and read hotspot file concurrently (add-index uses this one)
// more load routine + read each file using 1 reader (import-into uses this one)
checkHotspot bool

keyAdapter common.KeyAdapter
duplicateDetection bool
duplicateDB *pebble.DB
dupDetectOpt common.DupDetectOpt
workerConcurrency int
ts uint64

totalKVSize int64
Expand All @@ -74,9 +92,11 @@ func NewExternalEngine(
duplicateDetection bool,
duplicateDB *pebble.DB,
dupDetectOpt common.DupDetectOpt,
workerConcurrency int,
ts uint64,
totalKVSize int64,
totalKVCount int64,
checkHotspot bool,
) common.Engine {
return &Engine{
storage: storage,
Expand All @@ -87,10 +107,12 @@ func NewExternalEngine(
splitKeys: splitKeys,
regionSplitSize: regionSplitSize,
bufPool: membuf.NewPool(),
checkHotspot: checkHotspot,
keyAdapter: keyAdapter,
duplicateDetection: duplicateDetection,
duplicateDB: duplicateDB,
dupDetectOpt: dupDetectOpt,
workerConcurrency: workerConcurrency,
ts: ts,
totalKVSize: totalKVSize,
totalKVCount: totalKVCount,
Expand Down Expand Up @@ -119,6 +141,17 @@ func split[T any](in []T, groupNum int) [][]T {
return ret
}

func (e *Engine) getAdjustedConcurrency() int {
if e.checkHotspot {
// estimate we will open at most 1000 files, so if e.dataFiles is small we can
// try to concurrently process ranges.
adjusted := int(MergeSortOverlapThreshold) / len(e.dataFiles)
return min(adjusted, 8)
}
adjusted := min(e.workerConcurrency, maxCloudStorageConnections/len(e.dataFiles))
return max(adjusted, 1)
}

// LoadIngestData loads the data from the external storage to memory in [start,
// end) range, so local backend can ingest it. The used byte slice of ingest data
// are allocated from Engine.bufPool and must be released by
Expand All @@ -128,12 +161,16 @@ func (e *Engine) LoadIngestData(
regionRanges []common.Range,
outCh chan<- common.DataAndRange,
) error {
// estimate we will open at most 1000 files, so if e.dataFiles is small we can
// try to concurrently process ranges.
concurrency := int(MergeSortOverlapThreshold) / len(e.dataFiles)
concurrency = min(concurrency, 8)
concurrency := e.getAdjustedConcurrency()
rangeGroups := split(regionRanges, concurrency)

logutil.Logger(ctx).Info("load ingest data",
zap.Int("concurrency", concurrency),
zap.Int("ranges", len(regionRanges)),
zap.Int("range-groups", len(rangeGroups)),
zap.Int("data-files", len(e.dataFiles)),
zap.Bool("check-hotspot", e.checkHotspot),
)
eg, egCtx := errgroup.WithContext(ctx)
for _, ranges := range rangeGroups {
ranges := ranges
Expand All @@ -148,17 +185,10 @@ func (e *Engine) LoadIngestData(
return iter.Error()
}
for _, r := range ranges {
results, err := e.loadIngestData(egCtx, iter, r.Start, r.End)
err := e.loadIngestData(egCtx, iter, r.Start, r.End, outCh)
if err != nil {
return errors.Trace(err)
}
for _, result := range results {
select {
case <-egCtx.Done():
return egCtx.Err()
case outCh <- result:
}
}
}
return nil
})
Expand Down Expand Up @@ -192,21 +222,29 @@ func (e *Engine) loadIngestData(
ctx context.Context,
iter *MergeKVIter,
start, end []byte,
) ([]common.DataAndRange, error) {
outCh chan<- common.DataAndRange) error {
if bytes.Equal(start, end) {
return nil, errors.Errorf("start key and end key must not be the same: %s",
return errors.Errorf("start key and end key must not be the same: %s",
hex.EncodeToString(start))
}

startTs := time.Now()
readRateHist := metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort")
readDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_and_sort")
sendFn := func(dr common.DataAndRange) error {
select {
case <-ctx.Done():
return ctx.Err()
case outCh <- dr:
}
return nil
}

loadStartTs, batchStartTs := time.Now(), time.Now()
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
memBuf := e.bufPool.NewBuffer()
cnt := 0
size := 0
totalSize := 0
largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize)
ret := make([]common.DataAndRange, 0, 1)
curStart := start

// there should be a key that just exceeds the end key in last loadIngestData
Expand All @@ -217,7 +255,6 @@ func (e *Engine) loadIngestData(
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
totalSize += len(k) + len(v)
}

for iter.Next() {
Expand All @@ -228,38 +265,49 @@ func (e *Engine) loadIngestData(
if bytes.Compare(k, end) >= 0 {
break
}
if largeRegion && size > LargeRegionSplitDataThreshold {
// as we keep KV data in memory, to avoid OOM, we only keep at most 1
// DataAndRange for each loadIngestData and regionJobWorker routine(channel
// is unbuffered).
if size > LargeRegionSplitDataThreshold {
readRateHist.Observe(float64(size) / 1024.0 / 1024.0 / time.Since(batchStartTs).Seconds())
readDurHist.Observe(time.Since(batchStartTs).Seconds())
curKey := slices.Clone(k)
ret = append(ret, common.DataAndRange{
if err := sendFn(common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Range: common.Range{Start: curStart, End: curKey},
})
}); err != nil {
return errors.Trace(err)
}
keys = make([][]byte, 0, 1024)
values = make([][]byte, 0, 1024)
size = 0
curStart = curKey
batchStartTs = time.Now()
memBuf = e.bufPool.NewBuffer()
}

keys = append(keys, memBuf.AddBytes(k))
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
totalSize += len(k) + len(v)
}
if iter.Error() != nil {
return nil, errors.Trace(iter.Error())
return errors.Trace(iter.Error())
}
if len(keys) > 0 {
readRateHist.Observe(float64(size) / 1024.0 / 1024.0 / time.Since(batchStartTs).Seconds())
readDurHist.Observe(time.Since(batchStartTs).Seconds())
if err := sendFn(common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Range: common.Range{Start: curStart, End: end},
}); err != nil {
return errors.Trace(err)
}
}

metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort").Observe(float64(totalSize) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_and_sort").Observe(time.Since(startTs).Seconds())
logutil.Logger(ctx).Info("load data from external storage",
zap.Duration("cost time", time.Since(startTs)),
zap.Duration("cost time", time.Since(loadStartTs)),
zap.Int("iterated count", cnt))
ret = append(ret, common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Range: common.Range{Start: curStart, End: end},
})
return ret, nil
return nil
}

func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) {
Expand All @@ -271,18 +319,18 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte
logger.Info("no stats files",
zap.String("startKey", hex.EncodeToString(start)))
} else {
offs, err := seekPropsOffsets(ctx, start, e.statsFiles, e.storage)
offs, err := seekPropsOffsets(ctx, start, e.statsFiles, e.storage, e.checkHotspot)
if err != nil {
return nil, errors.Trace(err)
}
offsets = offs
logger.Info("seek props offsets",
logger.Debug("seek props offsets",
zap.Uint64s("offsets", offsets),
zap.String("startKey", hex.EncodeToString(start)),
zap.Strings("dataFiles", e.dataFiles),
zap.Strings("statsFiles", e.statsFiles))
}
iter, err := NewMergeKVIter(ctx, e.dataFiles, offsets, e.storage, 64*1024)
iter, err := NewMergeKVIter(ctx, e.dataFiles, offsets, e.storage, 64*1024, e.checkHotspot)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
27 changes: 27 additions & 0 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package external
import (
"bytes"
"context"
"fmt"
"path"
"slices"
"strconv"
Expand Down Expand Up @@ -321,3 +322,29 @@ func TestSplit(t *testing.T) {
require.Equal(t, c.expected, got)
}
}

func TestGetAdjustedConcurrency(t *testing.T) {
genFiles := func(n int) []string {
files := make([]string, 0, n)
for i := 0; i < n; i++ {
files = append(files, fmt.Sprintf("file%d", i))
}
return files
}
e := &Engine{
checkHotspot: true,
workerConcurrency: 32,
dataFiles: genFiles(100),
}
require.Equal(t, 8, e.getAdjustedConcurrency())
e.dataFiles = genFiles(1000)
require.Equal(t, 1, e.getAdjustedConcurrency())

e.checkHotspot = false
e.dataFiles = genFiles(100)
require.Equal(t, 32, e.getAdjustedConcurrency())
e.dataFiles = genFiles(1000)
require.Equal(t, 8, e.getAdjustedConcurrency())
e.dataFiles = genFiles(10000)
require.Equal(t, 1, e.getAdjustedConcurrency())
}
Loading

0 comments on commit 99a4f35

Please sign in to comment.