diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index fbcb9e1f00dfc..026dda56e8744 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -114,6 +114,7 @@ type ExternalEngineConfig struct { TotalFileSize int64 // TotalKVCount can be an estimated value. TotalKVCount int64 + CheckHotspot bool } // CheckCtx contains all parameters used in CheckRequirements diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 7bbfae8750dec..ed7f3e0c2f6d2 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", ], ) @@ -61,7 +62,7 @@ go_test( ], embed = [":external"], flaky = True, - shard_count = 41, + shard_count = 42, deps = [ "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 3688ed851a630..9c11a89c658a7 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -37,6 +37,16 @@ import ( "golang.org/x/sync/errgroup" ) +// during test on ks3, we found that we can open about 8000 connections to ks3, +// bigger than that, we might receive "connection reset by peer" error, and +// the read speed will be very slow, still investigating the reason. +// Also open too many connections will take many memory in kernel, and the +// test is based on k8s pod, not sure how it will behave on EC2. +// but, ks3 supporter says there's no such limit on connections. +// And our target for global sort is AWS s3, this default value might not fit well. +// TODO: adjust it according to cloud storage. +const maxCloudStorageConnections = 8000 + // Engine stored sorted key/value pairs in an external storage. type Engine struct { storage storage.ExternalStorage @@ -47,11 +57,19 @@ type Engine struct { splitKeys [][]byte regionSplitSize int64 bufPool *membuf.Pool + // checkHotspot is true means we will check hotspot file when using MergeKVIter. + // if hotspot file is detected, we will use multiple readers to read data. + // if it's false, MergeKVIter will read each file using 1 reader. + // this flag also affects the strategy of loading data, either: + // less load routine + check and read hotspot file concurrently (add-index uses this one) + // more load routine + read each file using 1 reader (import-into uses this one) + checkHotspot bool keyAdapter common.KeyAdapter duplicateDetection bool duplicateDB *pebble.DB dupDetectOpt common.DupDetectOpt + workerConcurrency int ts uint64 totalKVSize int64 @@ -74,9 +92,11 @@ func NewExternalEngine( duplicateDetection bool, duplicateDB *pebble.DB, dupDetectOpt common.DupDetectOpt, + workerConcurrency int, ts uint64, totalKVSize int64, totalKVCount int64, + checkHotspot bool, ) common.Engine { return &Engine{ storage: storage, @@ -87,10 +107,12 @@ func NewExternalEngine( splitKeys: splitKeys, regionSplitSize: regionSplitSize, bufPool: membuf.NewPool(), + checkHotspot: checkHotspot, keyAdapter: keyAdapter, duplicateDetection: duplicateDetection, duplicateDB: duplicateDB, dupDetectOpt: dupDetectOpt, + workerConcurrency: workerConcurrency, ts: ts, totalKVSize: totalKVSize, totalKVCount: totalKVCount, @@ -119,6 +141,17 @@ func split[T any](in []T, groupNum int) [][]T { return ret } +func (e *Engine) getAdjustedConcurrency() int { + if e.checkHotspot { + // estimate we will open at most 1000 files, so if e.dataFiles is small we can + // try to concurrently process ranges. + adjusted := int(MergeSortOverlapThreshold) / len(e.dataFiles) + return min(adjusted, 8) + } + adjusted := min(e.workerConcurrency, maxCloudStorageConnections/len(e.dataFiles)) + return max(adjusted, 1) +} + // LoadIngestData loads the data from the external storage to memory in [start, // end) range, so local backend can ingest it. The used byte slice of ingest data // are allocated from Engine.bufPool and must be released by @@ -128,12 +161,16 @@ func (e *Engine) LoadIngestData( regionRanges []common.Range, outCh chan<- common.DataAndRange, ) error { - // estimate we will open at most 1000 files, so if e.dataFiles is small we can - // try to concurrently process ranges. - concurrency := int(MergeSortOverlapThreshold) / len(e.dataFiles) - concurrency = min(concurrency, 8) + concurrency := e.getAdjustedConcurrency() rangeGroups := split(regionRanges, concurrency) + logutil.Logger(ctx).Info("load ingest data", + zap.Int("concurrency", concurrency), + zap.Int("ranges", len(regionRanges)), + zap.Int("range-groups", len(rangeGroups)), + zap.Int("data-files", len(e.dataFiles)), + zap.Bool("check-hotspot", e.checkHotspot), + ) eg, egCtx := errgroup.WithContext(ctx) for _, ranges := range rangeGroups { ranges := ranges @@ -148,17 +185,10 @@ func (e *Engine) LoadIngestData( return iter.Error() } for _, r := range ranges { - results, err := e.loadIngestData(egCtx, iter, r.Start, r.End) + err := e.loadIngestData(egCtx, iter, r.Start, r.End, outCh) if err != nil { return errors.Trace(err) } - for _, result := range results { - select { - case <-egCtx.Done(): - return egCtx.Err() - case outCh <- result: - } - } } return nil }) @@ -192,21 +222,29 @@ func (e *Engine) loadIngestData( ctx context.Context, iter *MergeKVIter, start, end []byte, -) ([]common.DataAndRange, error) { + outCh chan<- common.DataAndRange) error { if bytes.Equal(start, end) { - return nil, errors.Errorf("start key and end key must not be the same: %s", + return errors.Errorf("start key and end key must not be the same: %s", hex.EncodeToString(start)) } - startTs := time.Now() + readRateHist := metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort") + readDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_and_sort") + sendFn := func(dr common.DataAndRange) error { + select { + case <-ctx.Done(): + return ctx.Err() + case outCh <- dr: + } + return nil + } + + loadStartTs, batchStartTs := time.Now(), time.Now() keys := make([][]byte, 0, 1024) values := make([][]byte, 0, 1024) memBuf := e.bufPool.NewBuffer() cnt := 0 size := 0 - totalSize := 0 - largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize) - ret := make([]common.DataAndRange, 0, 1) curStart := start // there should be a key that just exceeds the end key in last loadIngestData @@ -217,7 +255,6 @@ func (e *Engine) loadIngestData( values = append(values, memBuf.AddBytes(v)) cnt++ size += len(k) + len(v) - totalSize += len(k) + len(v) } for iter.Next() { @@ -228,38 +265,49 @@ func (e *Engine) loadIngestData( if bytes.Compare(k, end) >= 0 { break } - if largeRegion && size > LargeRegionSplitDataThreshold { + // as we keep KV data in memory, to avoid OOM, we only keep at most 1 + // DataAndRange for each loadIngestData and regionJobWorker routine(channel + // is unbuffered). + if size > LargeRegionSplitDataThreshold { + readRateHist.Observe(float64(size) / 1024.0 / 1024.0 / time.Since(batchStartTs).Seconds()) + readDurHist.Observe(time.Since(batchStartTs).Seconds()) curKey := slices.Clone(k) - ret = append(ret, common.DataAndRange{ + if err := sendFn(common.DataAndRange{ Data: e.buildIngestData(keys, values, memBuf), Range: common.Range{Start: curStart, End: curKey}, - }) + }); err != nil { + return errors.Trace(err) + } keys = make([][]byte, 0, 1024) values = make([][]byte, 0, 1024) size = 0 curStart = curKey + batchStartTs = time.Now() + memBuf = e.bufPool.NewBuffer() } keys = append(keys, memBuf.AddBytes(k)) values = append(values, memBuf.AddBytes(v)) cnt++ size += len(k) + len(v) - totalSize += len(k) + len(v) } if iter.Error() != nil { - return nil, errors.Trace(iter.Error()) + return errors.Trace(iter.Error()) + } + if len(keys) > 0 { + readRateHist.Observe(float64(size) / 1024.0 / 1024.0 / time.Since(batchStartTs).Seconds()) + readDurHist.Observe(time.Since(batchStartTs).Seconds()) + if err := sendFn(common.DataAndRange{ + Data: e.buildIngestData(keys, values, memBuf), + Range: common.Range{Start: curStart, End: end}, + }); err != nil { + return errors.Trace(err) + } } - - metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort").Observe(float64(totalSize) / 1024.0 / 1024.0 / time.Since(startTs).Seconds()) - metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_and_sort").Observe(time.Since(startTs).Seconds()) logutil.Logger(ctx).Info("load data from external storage", - zap.Duration("cost time", time.Since(startTs)), + zap.Duration("cost time", time.Since(loadStartTs)), zap.Int("iterated count", cnt)) - ret = append(ret, common.DataAndRange{ - Data: e.buildIngestData(keys, values, memBuf), - Range: common.Range{Start: curStart, End: end}, - }) - return ret, nil + return nil } func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) { @@ -271,18 +319,18 @@ func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIte logger.Info("no stats files", zap.String("startKey", hex.EncodeToString(start))) } else { - offs, err := seekPropsOffsets(ctx, start, e.statsFiles, e.storage) + offs, err := seekPropsOffsets(ctx, start, e.statsFiles, e.storage, e.checkHotspot) if err != nil { return nil, errors.Trace(err) } offsets = offs - logger.Info("seek props offsets", + logger.Debug("seek props offsets", zap.Uint64s("offsets", offsets), zap.String("startKey", hex.EncodeToString(start)), zap.Strings("dataFiles", e.dataFiles), zap.Strings("statsFiles", e.statsFiles)) } - iter, err := NewMergeKVIter(ctx, e.dataFiles, offsets, e.storage, 64*1024) + iter, err := NewMergeKVIter(ctx, e.dataFiles, offsets, e.storage, 64*1024, e.checkHotspot) if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/backend/external/engine_test.go b/br/pkg/lightning/backend/external/engine_test.go index fd673925f50a2..7d2a6eca318ca 100644 --- a/br/pkg/lightning/backend/external/engine_test.go +++ b/br/pkg/lightning/backend/external/engine_test.go @@ -17,6 +17,7 @@ package external import ( "bytes" "context" + "fmt" "path" "slices" "strconv" @@ -321,3 +322,29 @@ func TestSplit(t *testing.T) { require.Equal(t, c.expected, got) } } + +func TestGetAdjustedConcurrency(t *testing.T) { + genFiles := func(n int) []string { + files := make([]string, 0, n) + for i := 0; i < n; i++ { + files = append(files, fmt.Sprintf("file%d", i)) + } + return files + } + e := &Engine{ + checkHotspot: true, + workerConcurrency: 32, + dataFiles: genFiles(100), + } + require.Equal(t, 8, e.getAdjustedConcurrency()) + e.dataFiles = genFiles(1000) + require.Equal(t, 1, e.getAdjustedConcurrency()) + + e.checkHotspot = false + e.dataFiles = genFiles(100) + require.Equal(t, 32, e.getAdjustedConcurrency()) + e.dataFiles = genFiles(1000) + require.Equal(t, 8, e.getAdjustedConcurrency()) + e.dataFiles = genFiles(10000) + require.Equal(t, 1, e.getAdjustedConcurrency()) +} diff --git a/br/pkg/lightning/backend/external/iter.go b/br/pkg/lightning/backend/external/iter.go index c3c2c1a6db370..0cbda86c6a02a 100644 --- a/br/pkg/lightning/backend/external/iter.go +++ b/br/pkg/lightning/backend/external/iter.go @@ -81,11 +81,15 @@ func (h *mergeHeap[T]) Pop() interface{} { } type mergeIter[T heapElem, R sortedReader[T]] struct { - h mergeHeap[T] - readers []*R - curr T - lastReaderIdx int - err error + h mergeHeap[T] + readers []*R + curr T + lastReaderIdx int + err error + + // determines whether to check reader hotspot, if hotspot is detected, we will + // try read this file concurrently. + checkHotspot bool hotspotMap map[int]int checkHotspotCnt int checkHotspotPeriod int @@ -103,7 +107,7 @@ type readerOpenerFn[T heapElem, R sortedReader[T]] func() (*R, error) func newMergeIter[ T heapElem, R sortedReader[T], -](ctx context.Context, readerOpeners []readerOpenerFn[T, R]) (*mergeIter[T, R], error) { +](ctx context.Context, readerOpeners []readerOpenerFn[T, R], checkHotspot bool) (*mergeIter[T, R], error) { logger := logutil.Logger(ctx) readers := make([]*R, len(readerOpeners)) closeReaders := func() { @@ -149,6 +153,7 @@ func newMergeIter[ h: make(mergeHeap[T], 0, len(readers)), readers: readers, lastReaderIdx: -1, + checkHotspot: checkHotspot, hotspotMap: make(map[int]int), logger: logger, } @@ -228,42 +233,44 @@ func (i *mergeIter[T, R]) next() bool { var zeroT T i.curr = zeroT if i.lastReaderIdx >= 0 { - i.hotspotMap[i.lastReaderIdx] = i.hotspotMap[i.lastReaderIdx] + 1 - i.checkHotspotCnt++ - - // check hotspot every checkPeriod times - if i.checkHotspotCnt == i.checkHotspotPeriod { - oldHotspotIdx := i.lastHotspotIdx - i.lastHotspotIdx = -1 - for idx, cnt := range i.hotspotMap { - // currently only one reader will become hotspot - if cnt > (i.checkHotspotPeriod / 2) { - i.lastHotspotIdx = idx - break + if i.checkHotspot { + i.hotspotMap[i.lastReaderIdx] = i.hotspotMap[i.lastReaderIdx] + 1 + i.checkHotspotCnt++ + + // check hotspot every checkPeriod times + if i.checkHotspotCnt == i.checkHotspotPeriod { + oldHotspotIdx := i.lastHotspotIdx + i.lastHotspotIdx = -1 + for idx, cnt := range i.hotspotMap { + // currently only one reader will become hotspot + if cnt > (i.checkHotspotPeriod / 2) { + i.lastHotspotIdx = idx + break + } } - } - // we are going to switch concurrent reader and free its memory. Clone - // the fields to avoid use-after-free. - if oldHotspotIdx != i.lastHotspotIdx { - if i.elemFromHotspot != nil { - (*i.elemFromHotspot).cloneInnerFields() - i.elemFromHotspot = nil + // we are going to switch concurrent reader and free its memory. Clone + // the fields to avoid use-after-free. + if oldHotspotIdx != i.lastHotspotIdx { + if i.elemFromHotspot != nil { + (*i.elemFromHotspot).cloneInnerFields() + i.elemFromHotspot = nil + } } - } - for idx, rp := range i.readers { - if rp == nil { - continue - } - isHotspot := i.lastHotspotIdx == idx - err := (*rp).switchConcurrentMode(isHotspot) - if err != nil { - i.err = err - return false + for idx, rp := range i.readers { + if rp == nil { + continue + } + isHotspot := i.lastHotspotIdx == idx + err := (*rp).switchConcurrentMode(isHotspot) + if err != nil { + i.err = err + return false + } } + i.checkHotspotCnt = 0 + i.hotspotMap = make(map[int]int) } - i.checkHotspotCnt = 0 - i.hotspotMap = make(map[int]int) } rd := *i.readers[i.lastReaderIdx] @@ -271,7 +278,7 @@ func (i *mergeIter[T, R]) next() bool { switch err { case nil: - if i.lastReaderIdx == i.lastHotspotIdx { + if i.checkHotspot && i.lastReaderIdx == i.lastHotspotIdx { i.elemFromHotspot = &e } heap.Push(&i.h, mergeHeapElem[T]{elem: e, readerIdx: i.lastReaderIdx}) @@ -356,6 +363,7 @@ func NewMergeKVIter( pathsStartOffset []uint64, exStorage storage.ExternalStorage, readBufferSize int, + checkHotspot bool, ) (*MergeKVIter, error) { readerOpeners := make([]readerOpenerFn[*kvPair, kvReaderProxy], 0, len(paths)) largeBufSize := ConcurrentReaderBufferSizePerConc * ConcurrentReaderConcurrency @@ -383,7 +391,7 @@ func NewMergeKVIter( }) } - it, err := newMergeIter[*kvPair, kvReaderProxy](ctx, readerOpeners) + it, err := newMergeIter[*kvPair, kvReaderProxy](ctx, readerOpeners, checkHotspot) return &MergeKVIter{iter: it, memPool: memPool}, err } @@ -455,6 +463,7 @@ func NewMergePropIter( ctx context.Context, paths []string, exStorage storage.ExternalStorage, + checkHotSpot bool, ) (*MergePropIter, error) { readerOpeners := make([]readerOpenerFn[*rangeProperty, statReaderProxy], 0, len(paths)) for i := range paths { @@ -468,7 +477,7 @@ func NewMergePropIter( }) } - it, err := newMergeIter[*rangeProperty, statReaderProxy](ctx, readerOpeners) + it, err := newMergeIter[*rangeProperty, statReaderProxy](ctx, readerOpeners, checkHotSpot) return &MergePropIter{iter: it}, err } diff --git a/br/pkg/lightning/backend/external/iter_test.go b/br/pkg/lightning/backend/external/iter_test.go index d7ed0cb7583b3..57b69f0f764ca 100644 --- a/br/pkg/lightning/backend/external/iter_test.go +++ b/br/pkg/lightning/backend/external/iter_test.go @@ -95,7 +95,7 @@ func TestMergeKVIter(t *testing.T) { } trackStore := &trackOpenMemStorage{MemStorage: memStore} - iter, err := NewMergeKVIter(ctx, filenames, []uint64{0, 0, 0}, trackStore, 5) + iter, err := NewMergeKVIter(ctx, filenames, []uint64{0, 0, 0}, trackStore, 5, true) require.NoError(t, err) // close one empty file immediately in NewMergeKVIter require.EqualValues(t, 2, trackStore.opened.Load()) @@ -147,7 +147,7 @@ func TestOneUpstream(t *testing.T) { } trackStore := &trackOpenMemStorage{MemStorage: memStore} - iter, err := NewMergeKVIter(ctx, filenames, []uint64{0, 0, 0}, trackStore, 5) + iter, err := NewMergeKVIter(ctx, filenames, []uint64{0, 0, 0}, trackStore, 5, true) require.NoError(t, err) require.EqualValues(t, 1, trackStore.opened.Load()) @@ -184,14 +184,14 @@ func TestAllEmpty(t *testing.T) { } trackStore := &trackOpenMemStorage{MemStorage: memStore} - iter, err := NewMergeKVIter(ctx, []string{filenames[0]}, []uint64{0}, trackStore, 5) + iter, err := NewMergeKVIter(ctx, []string{filenames[0]}, []uint64{0}, trackStore, 5, false) require.NoError(t, err) require.EqualValues(t, 0, trackStore.opened.Load()) require.False(t, iter.Next()) require.NoError(t, iter.Error()) require.NoError(t, iter.Close()) - iter, err = NewMergeKVIter(ctx, filenames, []uint64{0, 0}, trackStore, 5) + iter, err = NewMergeKVIter(ctx, filenames, []uint64{0, 0}, trackStore, 5, false) require.NoError(t, err) require.EqualValues(t, 0, trackStore.opened.Load()) require.False(t, iter.Next()) @@ -229,7 +229,7 @@ func TestCorruptContent(t *testing.T) { } trackStore := &trackOpenMemStorage{MemStorage: memStore} - iter, err := NewMergeKVIter(ctx, filenames, []uint64{0, 0, 0}, trackStore, 5) + iter, err := NewMergeKVIter(ctx, filenames, []uint64{0, 0, 0}, trackStore, 5, true) require.NoError(t, err) require.EqualValues(t, 2, trackStore.opened.Load()) @@ -317,7 +317,7 @@ func testMergeIterSwitchMode(t *testing.T, f func([]byte, int) []byte) { offsets := make([]uint64, len(dataNames)) - iter, err := NewMergeKVIter(context.Background(), dataNames, offsets, st, 2048) + iter, err := NewMergeKVIter(context.Background(), dataNames, offsets, st, 2048, true) require.NoError(t, err) for iter.Next() { @@ -356,7 +356,7 @@ func TestHotspot(t *testing.T) { } // readerBufSize = 8+5+8+5, every KV will cause reload - iter, err := NewMergeKVIter(ctx, filenames, make([]uint64, len(filenames)), store, 26) + iter, err := NewMergeKVIter(ctx, filenames, make([]uint64, len(filenames)), store, 26, true) require.NoError(t, err) iter.iter.checkHotspotPeriod = 2 // after read key00 and key01 from reader_0, it becomes hotspot @@ -460,7 +460,7 @@ func TestMemoryUsageWhenHotspotChange(t *testing.T) { beforeMem := getMemoryInUse() - iter, err := NewMergeKVIter(ctx, filenames, make([]uint64, len(filenames)), store, 1024) + iter, err := NewMergeKVIter(ctx, filenames, make([]uint64, len(filenames)), store, 1024, true) require.NoError(t, err) iter.iter.checkHotspotPeriod = 10 i := 0 diff --git a/br/pkg/lightning/backend/external/merge.go b/br/pkg/lightning/backend/external/merge.go index 6ca5ac04127ce..1fe696dcf5601 100644 --- a/br/pkg/lightning/backend/external/merge.go +++ b/br/pkg/lightning/backend/external/merge.go @@ -21,9 +21,10 @@ func MergeOverlappingFiles( propSizeDist uint64, propKeysDist uint64, onClose OnCloseFunc, + checkHotspot bool, ) error { zeroOffsets := make([]uint64, len(paths)) - iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize) + iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot) if err != nil { return err } diff --git a/br/pkg/lightning/backend/external/split.go b/br/pkg/lightning/backend/external/split.go index 40d713ed32259..2d24001e83621 100644 --- a/br/pkg/lightning/backend/external/split.go +++ b/br/pkg/lightning/backend/external/split.go @@ -95,8 +95,9 @@ func NewRangeSplitter( externalStorage storage.ExternalStorage, rangesGroupSize, rangesGroupKeys int64, maxRangeSize, maxRangeKeys int64, + checkHotSpot bool, ) (*RangeSplitter, error) { - propIter, err := NewMergePropIter(ctx, statFiles, externalStorage) + propIter, err := NewMergePropIter(ctx, statFiles, externalStorage, checkHotSpot) if err != nil { return nil, err } diff --git a/br/pkg/lightning/backend/external/split_test.go b/br/pkg/lightning/backend/external/split_test.go index 7259007629962..a49f697b46116 100644 --- a/br/pkg/lightning/backend/external/split_test.go +++ b/br/pkg/lightning/backend/external/split_test.go @@ -55,7 +55,7 @@ func TestGeneralProperties(t *testing.T) { dataFiles, statFiles, err := MockExternalEngine(memStore, keys, values) require.NoError(t, err) splitter, err := NewRangeSplitter( - ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 1, + ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 1, true, ) var lastEndKey []byte notExhausted: @@ -111,7 +111,7 @@ func TestOnlyOneGroup(t *testing.T) { require.NoError(t, err) splitter, err := NewRangeSplitter( - ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 10, + ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 10, true, ) require.NoError(t, err) endKey, dataFiles, statFiles, splitKeys, err := splitter.SplitOneRangesGroup() @@ -123,7 +123,7 @@ func TestOnlyOneGroup(t *testing.T) { require.NoError(t, splitter.Close()) splitter, err = NewRangeSplitter( - ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 1, + ctx, dataFiles, statFiles, memStore, 1000, 30, 1000, 1, true, ) require.NoError(t, err) endKey, dataFiles, statFiles, splitKeys, err = splitter.SplitOneRangesGroup() @@ -156,7 +156,7 @@ func TestSortedData(t *testing.T) { groupFileNumUpperBound := int(math.Ceil(float64(rangesGroupKV-1)/avgKVPerFile)) + 1 splitter, err := NewRangeSplitter( - ctx, dataFiles, statFiles, memStore, 1000, int64(rangesGroupKV), 1000, 10, + ctx, dataFiles, statFiles, memStore, 1000, int64(rangesGroupKV), 1000, 10, true, ) require.NoError(t, err) @@ -237,7 +237,7 @@ func TestRangeSplitterStrictCase(t *testing.T) { // group keys = 2, region keys = 1 splitter, err := NewRangeSplitter( - ctx, dataFiles123, statFiles123, memStore, 1000, 2, 1000, 1, + ctx, dataFiles123, statFiles123, memStore, 1000, 2, 1000, 1, true, ) require.NoError(t, err) @@ -319,7 +319,7 @@ func TestExactlyKeyNum(t *testing.T) { // maxRangeKeys = 3 splitter, err := NewRangeSplitter( - ctx, dataFiles, statFiles, memStore, 1000, 100, 1000, 3, + ctx, dataFiles, statFiles, memStore, 1000, 100, 1000, 3, true, ) require.NoError(t, err) endKey, splitDataFiles, splitStatFiles, splitKeys, err := splitter.SplitOneRangesGroup() @@ -331,7 +331,7 @@ func TestExactlyKeyNum(t *testing.T) { // rangesGroupKeys = 3 splitter, err = NewRangeSplitter( - ctx, dataFiles, statFiles, memStore, 1000, 3, 1000, 1, + ctx, dataFiles, statFiles, memStore, 1000, 3, 1000, 1, true, ) require.NoError(t, err) endKey, splitDataFiles, splitStatFiles, splitKeys, err = splitter.SplitOneRangesGroup() diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go index 6af609bf6eea4..20683a3e769af 100644 --- a/br/pkg/lightning/backend/external/util.go +++ b/br/pkg/lightning/backend/external/util.go @@ -22,11 +22,13 @@ import ( "sort" "strings" + "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // seekPropsOffsets seeks the statistic files to find the largest offset of @@ -37,12 +39,15 @@ func seekPropsOffsets( start kv.Key, paths []string, exStorage storage.ExternalStorage, -) ([]uint64, error) { - iter, err := NewMergePropIter(ctx, paths, exStorage) + checkHotSpot bool, +) (_ []uint64, err error) { + logger := logutil.Logger(ctx) + task := log.BeginTask(logger, "seek props offsets") + defer task.End(zapcore.ErrorLevel, err) + iter, err := NewMergePropIter(ctx, paths, exStorage, checkHotSpot) if err != nil { return nil, err } - logger := logutil.Logger(ctx) defer func() { if err := iter.Close(); err != nil { logger.Warn("failed to close merge prop iterator", zap.Error(err)) diff --git a/br/pkg/lightning/backend/external/util_test.go b/br/pkg/lightning/backend/external/util_test.go index fd8bf2cf7434e..e9c32e5bae2a2 100644 --- a/br/pkg/lightning/backend/external/util_test.go +++ b/br/pkg/lightning/backend/external/util_test.go @@ -70,18 +70,18 @@ func TestSeekPropsOffsets(t *testing.T) { err = w2.Close(ctx) require.NoError(t, err) - got, err := seekPropsOffsets(ctx, []byte("key2.5"), []string{file1, file2}, store) + got, err := seekPropsOffsets(ctx, []byte("key2.5"), []string{file1, file2}, store, true) require.NoError(t, err) require.Equal(t, []uint64{10, 20}, got) - got, err = seekPropsOffsets(ctx, []byte("key3"), []string{file1, file2}, store) + got, err = seekPropsOffsets(ctx, []byte("key3"), []string{file1, file2}, store, true) require.NoError(t, err) require.Equal(t, []uint64{30, 20}, got) - _, err = seekPropsOffsets(ctx, []byte("key0"), []string{file1, file2}, store) + _, err = seekPropsOffsets(ctx, []byte("key0"), []string{file1, file2}, store, true) require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]") - got, err = seekPropsOffsets(ctx, []byte("key1"), []string{file1, file2}, store) + got, err = seekPropsOffsets(ctx, []byte("key1"), []string{file1, file2}, store, false) require.NoError(t, err) require.Equal(t, []uint64{10, 0}, got) - got, err = seekPropsOffsets(ctx, []byte("key999"), []string{file1, file2}, store) + got, err = seekPropsOffsets(ctx, []byte("key999"), []string{file1, file2}, store, false) require.NoError(t, err) require.Equal(t, []uint64{50, 40}, got) @@ -98,7 +98,7 @@ func TestSeekPropsOffsets(t *testing.T) { require.NoError(t, err) err = w4.Close(ctx) require.NoError(t, err) - got, err = seekPropsOffsets(ctx, []byte("key3"), []string{file1, file2, file3, file4}, store) + got, err = seekPropsOffsets(ctx, []byte("key3"), []string{file1, file2, file3, file4}, store, true) require.NoError(t, err) require.Equal(t, []uint64{30, 20, 0, 30}, got) } diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index f392651473fc9..dd80793f5d610 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -188,6 +188,7 @@ func TestWriterDuplicateDetect(t *testing.T) { 1*size.MB, 2, nil, + false, ) require.NoError(t, err) @@ -385,6 +386,7 @@ func TestWriterMultiFileStat(t *testing.T) { 1*size.MB, 2, closeFn, + true, ) require.NoError(t, err) require.Equal(t, 3, len(summary.MultipleFilesStats)) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 50c0a20a5cd6b..e3bbe263711c8 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -969,9 +969,11 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig local.DupeDetectEnabled, local.duplicateDB, local.DuplicateDetectOpt, + local.WorkerConcurrency, ts, externalCfg.TotalFileSize, externalCfg.TotalKVCount, + externalCfg.CheckHotspot, ) local.externalEngine[engineUUID] = externalEngine return nil diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index fac02d5fc013a..3751a95b2371a 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -1788,9 +1788,11 @@ func TestSplitRangeAgain4BigRegionExternalEngine(t *testing.T) { false, nil, common.DupDetectOpt{}, + 10, 123, 456, 789, + true, ) jobCh := make(chan *regionJob, 10) diff --git a/pkg/ddl/backfilling_dispatcher.go b/pkg/ddl/backfilling_dispatcher.go index d80953b792cb4..598c8cceb3491 100644 --- a/pkg/ddl/backfilling_dispatcher.go +++ b/pkg/ddl/backfilling_dispatcher.go @@ -499,7 +499,7 @@ func getRangeSplitter( maxKeysPerRange = max(maxKeysPerRange, int64(config.SplitRegionKeys)) return external.NewRangeSplitter(ctx, dataFiles, statFiles, extStore, - rangeGroupSize, rangeGroupKeys, maxSizePerRange, maxKeysPerRange) + rangeGroupSize, rangeGroupKeys, maxSizePerRange, maxKeysPerRange, true) } func getSummaryFromLastStep( diff --git a/pkg/ddl/backfilling_import_cloud.go b/pkg/ddl/backfilling_import_cloud.go index 82dfef011d2d8..b5f0e70749ec7 100644 --- a/pkg/ddl/backfilling_import_cloud.go +++ b/pkg/ddl/backfilling_import_cloud.go @@ -88,6 +88,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub SplitKeys: sm.RangeSplitKeys, TotalFileSize: int64(sm.TotalKVSize), TotalKVCount: 0, + CheckHotspot: true, }, }, engineUUID) if err != nil { diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index 729c09c95836e..253c504e47c6b 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -114,7 +114,9 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta 8*1024, 1*size.MB, 8*1024, - onClose) + onClose, + true, + ) } func (*mergeSortExecutor) Cleanup(ctx context.Context) error { diff --git a/pkg/disttask/framework/scheduler/BUILD.bazel b/pkg/disttask/framework/scheduler/BUILD.bazel index fd7b82d8371e8..b9db1e97f2546 100644 --- a/pkg/disttask/framework/scheduler/BUILD.bazel +++ b/pkg/disttask/framework/scheduler/BUILD.bazel @@ -12,6 +12,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//br/pkg/lightning/common", + "//br/pkg/lightning/log", "//pkg/config", "//pkg/disttask/framework/dispatcher", "//pkg/disttask/framework/handle", @@ -26,7 +27,9 @@ go_library( "//pkg/util", "//pkg/util/backoff", "//pkg/util/dbterror", + "//pkg/util/gctuner", "//pkg/util/logutil", + "//pkg/util/memory", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_zap//:zap", diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 357c39433a3d5..2af66f498c26f 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/pkg/disttask/framework/dispatcher" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" @@ -32,7 +33,9 @@ import ( "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/util/backoff" "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/gctuner" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/memory" "go.uber.org/zap" ) @@ -139,7 +142,7 @@ func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) { return s.updateErrorToSubtask(ctx, task.ID, err) } -func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { +func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) (resErr error) { if ctx.Err() != nil { s.onError(ctx.Err()) return s.getError() @@ -148,7 +151,14 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { defer runCancel(ErrFinishSubtask) s.registerCancelFunc(runCancel) s.resetError() - logutil.Logger(s.logCtx).Info("scheduler run a step", zap.Any("step", task.Step), zap.Any("concurrency", task.Concurrency)) + stepLogger := log.BeginTask(logutil.Logger(s.logCtx).With( + zap.Any("step", task.Step), + zap.Uint64("concurrency", task.Concurrency), + zap.Float64("mem-limit-percent", gctuner.GlobalMemoryLimitTuner.GetPercentage()), + zap.String("server-mem-limit", memory.ServerMemoryLimitOriginText.Load()), + ), "schedule step") + // log as info level, subtask might be cancelled, let caller check it. + defer stepLogger.End(zap.InfoLevel, resErr) summary, cleanup, err := runSummaryCollectLoop(ctx, task, s.taskTable) if err != nil { diff --git a/pkg/disttask/importinto/planner.go b/pkg/disttask/importinto/planner.go index 8facd0113b4eb..6ea23054b4766 100644 --- a/pkg/disttask/importinto/planner.go +++ b/pkg/disttask/importinto/planner.go @@ -509,5 +509,6 @@ func getRangeSplitter(ctx context.Context, store storage.ExternalStorage, kvMeta int64(math.MaxInt64), regionSplitSize, regionSplitKeys, + false, ) } diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index 3462a5266b2ea..1b8bb0ccb6aff 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -319,7 +319,9 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S 8*1024, 1*size.MB, 8*1024, - onClose) + onClose, + false, + ) } func (m *mergeSortStepExecutor) OnFinished(_ context.Context, subtask *proto.Subtask) error { @@ -382,6 +384,7 @@ func (e *writeAndIngestStepExecutor) RunSubtask(ctx context.Context, subtask *pr RegionSplitSize: sm.RangeSplitSize, TotalFileSize: int64(sm.TotalKVSize), TotalKVCount: 0, + CheckHotspot: false, }, }, engineUUID) if err != nil { diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index 3b691ad85cbcb..b77f1ee50a81d 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -88,7 +88,7 @@ go_test( embed = [":importer"], flaky = True, race = "on", - shard_count = 17, + shard_count = 18, deps = [ "//br/pkg/errors", "//br/pkg/lightning/backend/encode", diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index fa1a4e19ef890..dc99f98b71ed3 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -1242,13 +1242,26 @@ func (e *LoadDataController) CreateColAssignExprs(sctx sessionctx.Context) ([]ex return res, allWarnings, nil } +func (e *LoadDataController) getBackendWorkerConcurrency() int { + // when using global sort, write&ingest step buffers KV data in memory, + // suppose cpu:mem ratio 1:2(true in most case), and we assign 1G per concurrency, + // so we can use 2 * threadCnt as concurrency. write&ingest step is mostly + // IO intensive, so CPU usage is below ThreadCnt in our tests. + // The real concurrency used is adjusted in external engine later. + // when using local sort, use the default value as lightning. + if e.IsGlobalSort() { + return int(e.ThreadCnt) * 2 + } + return config.DefaultRangeConcurrency * 2 +} + func (e *LoadDataController) getLocalBackendCfg(pdAddr, dataDir string) local.BackendConfig { backendConfig := local.BackendConfig{ PDAddr: pdAddr, LocalStoreDir: dataDir, MaxConnPerStore: config.DefaultRangeConcurrency, ConnCompressType: config.CompressionNone, - WorkerConcurrency: config.DefaultRangeConcurrency * 2, + WorkerConcurrency: e.getBackendWorkerConcurrency(), KVWriteBatchSize: config.KVWriteBatchSize, RegionSplitBatchSize: config.DefaultRegionSplitBatchSize, RegionSplitConcurrency: runtime.GOMAXPROCS(0), diff --git a/pkg/executor/importer/import_test.go b/pkg/executor/importer/import_test.go index baf3c0e25178d..6875b89d666f7 100644 --- a/pkg/executor/importer/import_test.go +++ b/pkg/executor/importer/import_test.go @@ -288,3 +288,16 @@ func TestGetLocalBackendCfg(t *testing.T) { require.Greater(t, cfg.RaftKV2SwitchModeDuration, time.Duration(0)) require.Equal(t, config.DefaultSwitchTiKVModeInterval, cfg.RaftKV2SwitchModeDuration) } + +func TestGetBackendWorkerConcurrency(t *testing.T) { + c := &LoadDataController{ + Plan: &Plan{ + ThreadCnt: 3, + }, + } + require.Equal(t, 32, c.getBackendWorkerConcurrency()) + c.Plan.CloudStorageURI = "xxx" + require.Equal(t, 6, c.getBackendWorkerConcurrency()) + c.Plan.ThreadCnt = 123 + require.Equal(t, 246, c.getBackendWorkerConcurrency()) +} diff --git a/pkg/executor/show.go b/pkg/executor/show.go index 9875cfa117ced..cc9fe3831d5d1 100644 --- a/pkg/executor/show.go +++ b/pkg/executor/show.go @@ -2262,7 +2262,7 @@ func fillOneImportJobInfo(info *importer.JobInfo, result *chunk.Chunk, importedR result.AppendInt64(3, info.TableID) result.AppendString(4, info.Step) result.AppendString(5, info.Status) - result.AppendString(6, units.HumanSize(float64(info.SourceFileSize))) + result.AppendString(6, units.BytesSize(float64(info.SourceFileSize))) if info.Summary != nil { result.AppendUint64(7, info.Summary.ImportedRows) } else if importedRowCount >= 0 { diff --git a/tests/realtikvtest/importintotest/import_into_test.go b/tests/realtikvtest/importintotest/import_into_test.go index 50967db18d1a3..95519e0f81378 100644 --- a/tests/realtikvtest/importintotest/import_into_test.go +++ b/tests/realtikvtest/importintotest/import_into_test.go @@ -655,7 +655,9 @@ func (s *mockGCSSuite) TestMaxWriteSpeed() { start := time.Now() sql := fmt.Sprintf(`IMPORT INTO load_test_write_speed.t FROM 'gs://test-load/speed-test.csv?endpoint=%s'`, gcsEndpoint) - s.tk.MustQuery(sql) + result := s.tk.MustQuery(sql) + fileSize := result.Rows()[0][6].(string) + s.Equal("7.598KiB", fileSize) duration := time.Since(start).Seconds() s.tk.MustQuery("SELECT count(1) FROM load_test_write_speed.t;").Check(testkit.Rows( strconv.Itoa(lineCount), diff --git a/tests/realtikvtest/importintotest/job_test.go b/tests/realtikvtest/importintotest/job_test.go index b2a5a49dc6b55..cc9e6e796f09e 100644 --- a/tests/realtikvtest/importintotest/job_test.go +++ b/tests/realtikvtest/importintotest/job_test.go @@ -57,7 +57,7 @@ func (s *mockGCSSuite) compareJobInfoWithoutTime(jobInfo *importer.JobInfo, row s.Equal(strconv.Itoa(int(jobInfo.TableID)), row[3]) s.Equal(jobInfo.Step, row[4]) s.Equal(jobInfo.Status, row[5]) - s.Equal(units.HumanSize(float64(jobInfo.SourceFileSize)), row[6]) + s.Equal(units.BytesSize(float64(jobInfo.SourceFileSize)), row[6]) if jobInfo.Summary == nil { s.Equal("", row[7].(string)) } else {