diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 65c7a766042d1..3d1b76b1e6eec 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -62,6 +62,7 @@ go_test( "engine_test.go", "file_test.go", "iter_test.go", + "merge_test.go", "misc_bench_test.go", "onefile_writer_test.go", "reader_test.go", diff --git a/br/pkg/lightning/backend/external/bench_test.go b/br/pkg/lightning/backend/external/bench_test.go index 82910eb72b562..0664f4e3acea5 100644 --- a/br/pkg/lightning/backend/external/bench_test.go +++ b/br/pkg/lightning/backend/external/bench_test.go @@ -505,8 +505,6 @@ func mergeStep(t *testing.T, s *mergeTestSuite) { DefaultBlockSize, DefaultMemSizeLimit, 8*1024, - 1*size.MB, - 8*1024, onClose, s.concurrency, s.mergeIterHotspot, diff --git a/br/pkg/lightning/backend/external/merge.go b/br/pkg/lightning/backend/external/merge.go index c08430a36b8c9..d371ac7050718 100644 --- a/br/pkg/lightning/backend/external/merge.go +++ b/br/pkg/lightning/backend/external/merge.go @@ -24,24 +24,11 @@ func MergeOverlappingFiles( blockSize int, memSizeLimit uint64, writeBatchCount uint64, - propSizeDist uint64, - propKeysDist uint64, onClose OnCloseFunc, concurrency int, checkHotspot bool, ) error { - var dataFilesSlice [][]string - batchCount := 1 - if len(paths) > concurrency { - batchCount = len(paths) / concurrency - } - for i := 0; i < len(paths); i += batchCount { - end := i + batchCount - if end > len(paths) { - end = len(paths) - } - dataFilesSlice = append(dataFilesSlice, paths[i:end]) - } + dataFilesSlice := splitDataFiles(paths, concurrency) logutil.Logger(ctx).Info("start to merge overlapping files", zap.Int("file-count", len(paths)), @@ -64,8 +51,6 @@ func MergeOverlappingFiles( memSizeLimit, blockSize, writeBatchCount, - propSizeDist, - propKeysDist, onClose, checkHotspot, ) @@ -74,6 +59,32 @@ func MergeOverlappingFiles( return eg.Wait() } +// split input data files into max 'concurrency' shares evenly, if there are not +// enough files, merge at least 2 files in one batch. +func splitDataFiles(paths []string, concurrency int) [][]string { + shares := concurrency + if len(paths) < 2*concurrency { + shares = max(1, len(paths)/2) + } + dataFilesSlice := make([][]string, 0, shares) + batchCount := len(paths) / shares + remainder := len(paths) % shares + start := 0 + for start < len(paths) { + end := start + batchCount + if remainder > 0 { + end++ + remainder-- + } + if end > len(paths) { + end = len(paths) + } + dataFilesSlice = append(dataFilesSlice, paths[start:end]) + start = end + } + return dataFilesSlice +} + // mergeOverlappingFilesInternal reads from given files whose key range may overlap // and writes to one new sorted, nonoverlapping files. // since some memory are taken by library, such as HTTP2, that we cannot calculate @@ -104,8 +115,6 @@ func mergeOverlappingFilesInternal( memSizeLimit uint64, blockSize int, writeBatchCount uint64, - propSizeDist uint64, - propKeysDist uint64, onClose OnCloseFunc, checkHotspot bool, ) (err error) { @@ -133,8 +142,6 @@ func mergeOverlappingFilesInternal( SetMemorySizeLimit(memSizeLimit). SetBlockSize(blockSize). SetWriterBatchCount(writeBatchCount). - SetPropKeysDistance(propKeysDist). - SetPropSizeDistance(propSizeDist). SetOnCloseFunc(onClose). BuildOneFile(store, newFilePrefix, writerID) err = writer.Init(ctx, partSize) diff --git a/br/pkg/lightning/backend/external/merge_test.go b/br/pkg/lightning/backend/external/merge_test.go new file mode 100644 index 0000000000000..f66fb2d1ec838 --- /dev/null +++ b/br/pkg/lightning/backend/external/merge_test.go @@ -0,0 +1,104 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSplitDataFiles(t *testing.T) { + allPaths := make([]string, 0, 100) + for i := 0; i < cap(allPaths); i++ { + allPaths = append(allPaths, fmt.Sprintf("%d", i)) + } + cases := []struct { + paths []string + concurrency int + result [][]string + }{ + { + paths: allPaths[:1], + concurrency: 1, + result: [][]string{allPaths[:1]}, + }, + { + paths: allPaths[:2], + concurrency: 1, + result: [][]string{allPaths[:2]}, + }, + { + paths: allPaths[:2], + concurrency: 4, + result: [][]string{allPaths[:2]}, + }, + { + paths: allPaths[:3], + concurrency: 4, + result: [][]string{allPaths[:3]}, + }, + { + paths: allPaths[:4], + concurrency: 4, + result: [][]string{allPaths[:2], allPaths[2:4]}, + }, + { + paths: allPaths[:5], + concurrency: 4, + result: [][]string{allPaths[:3], allPaths[3:5]}, + }, + { + paths: allPaths[:6], + concurrency: 4, + result: [][]string{allPaths[:2], allPaths[2:4], allPaths[4:6]}, + }, + { + paths: allPaths[:7], + concurrency: 4, + result: [][]string{allPaths[:3], allPaths[3:5], allPaths[5:7]}, + }, + { + paths: allPaths[:15], + concurrency: 4, + result: [][]string{allPaths[:4], allPaths[4:8], allPaths[8:12], allPaths[12:15]}, + }, + { + paths: allPaths[:83], + concurrency: 4, + result: [][]string{allPaths[:21], allPaths[21:42], allPaths[42:63], allPaths[63:83]}, + }, + { + paths: allPaths[:100], + concurrency: 4, + result: [][]string{allPaths[:25], allPaths[25:50], allPaths[50:75], allPaths[75:100]}, + }, + { + paths: allPaths[:100], + concurrency: 8, + result: [][]string{ + allPaths[:13], allPaths[13:26], allPaths[26:39], allPaths[39:52], + allPaths[52:64], allPaths[64:76], allPaths[76:88], allPaths[88:100], + }, + }, + } + for i, c := range cases { + t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { + result := splitDataFiles(c.paths, c.concurrency) + require.Equal(t, c.result, result) + }) + } +} diff --git a/br/pkg/lightning/backend/external/onefile_writer_test.go b/br/pkg/lightning/backend/external/onefile_writer_test.go index 153113e207724..d55a46309c58f 100644 --- a/br/pkg/lightning/backend/external/onefile_writer_test.go +++ b/br/pkg/lightning/backend/external/onefile_writer_test.go @@ -172,6 +172,7 @@ func checkOneFileWriterStatWithDistance(t *testing.T, kvCnt int, keysDistance ui } func TestMergeOverlappingFilesInternal(t *testing.T) { + changePropDist(t, defaultPropSizeDist, 2) // 1. Write to 5 files. // 2. merge 5 files into one file. // 3. read one file and check result. @@ -179,7 +180,6 @@ func TestMergeOverlappingFilesInternal(t *testing.T) { ctx := context.Background() memStore := storage.NewMemStorage() writer := NewWriterBuilder(). - SetPropKeysDistance(2). SetMemorySizeLimit(1000). SetKeyDuplicationEncoding(true). Build(memStore, "/test", "0") @@ -206,8 +206,6 @@ func TestMergeOverlappingFilesInternal(t *testing.T) { 1000, 1000, 8*1024, - 1*size.MB, - 2, nil, true, )) @@ -256,6 +254,7 @@ func TestMergeOverlappingFilesInternal(t *testing.T) { } func TestOnefileWriterManyRows(t *testing.T) { + changePropDist(t, defaultPropSizeDist, 2) // 1. write into one file with sorted order. // 2. merge one file. // 3. read kv file and check the result. @@ -263,7 +262,6 @@ func TestOnefileWriterManyRows(t *testing.T) { ctx := context.Background() memStore := storage.NewMemStorage() writer := NewWriterBuilder(). - SetPropKeysDistance(2). SetMemorySizeLimit(1000). BuildOneFile(memStore, "/test", "0") @@ -310,8 +308,6 @@ func TestOnefileWriterManyRows(t *testing.T) { 1000, 1000, 8*1024, - 1*size.MB, - 2, onClose, true, )) diff --git a/br/pkg/lightning/backend/external/sort_test.go b/br/pkg/lightning/backend/external/sort_test.go index 849db2fa2342a..64e6b14d24432 100644 --- a/br/pkg/lightning/backend/external/sort_test.go +++ b/br/pkg/lightning/backend/external/sort_test.go @@ -33,6 +33,17 @@ import ( "golang.org/x/exp/rand" ) +func changePropDist(t *testing.T, sizeDist, keysDist uint64) { + sizeDistBak := defaultPropSizeDist + keysDistBak := defaultPropKeysDist + t.Cleanup(func() { + defaultPropSizeDist = sizeDistBak + defaultPropKeysDist = keysDistBak + }) + defaultPropSizeDist = sizeDist + defaultPropKeysDist = keysDist +} + func TestGlobalSortLocalBasic(t *testing.T) { // 1. write data step seed := time.Now().Unix() @@ -90,6 +101,7 @@ func TestGlobalSortLocalBasic(t *testing.T) { } func TestGlobalSortLocalWithMerge(t *testing.T) { + changePropDist(t, 100, 2) // 1. write data step seed := time.Now().Unix() rand.Seed(uint64(seed)) @@ -99,8 +111,6 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { memSizeLimit := (rand.Intn(10) + 1) * 400 w := NewWriterBuilder(). - SetPropSizeDistance(100). - SetPropKeysDistance(2). SetMemorySizeLimit(uint64(memSizeLimit)). SetBlockSize(memSizeLimit). Build(memStore, "/test", "0") @@ -162,8 +172,6 @@ func TestGlobalSortLocalWithMerge(t *testing.T) { mergeMemSize, uint64(mergeMemSize), 8*1024, - 100, - 2, closeFn, 1, true, diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 2cf86b5d99ba5..5f6d0c77948be 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -41,7 +41,9 @@ import ( ) var ( - multiFileStatNum = 500 + multiFileStatNum = 500 + defaultPropSizeDist = 1 * size.MB + defaultPropKeysDist uint64 = 8 * 1024 // MergeSortOverlapThreshold is the threshold of overlap between sorted kv files. // if the overlap ratio is greater than this threshold, we will merge the files. @@ -117,8 +119,8 @@ func NewWriterBuilder() *WriterBuilder { memSizeLimit: DefaultMemSizeLimit, blockSize: DefaultBlockSize, writeBatchCount: 8 * 1024, - propSizeDist: 1 * size.MB, - propKeysDist: 8 * 1024, + propSizeDist: defaultPropSizeDist, + propKeysDist: defaultPropKeysDist, onClose: dummyOnCloseFunc, } } diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index 34fda5fabc93c..e44ec5c4d046d 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/size" ) type mergeSortExecutor struct { @@ -102,8 +101,6 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta external.DefaultBlockSize, external.DefaultMemSizeLimit, 8*1024, - 1*size.MB, - 8*1024, onClose, int(variable.GetDDLReorgWorkerCounter()), true) } diff --git a/pkg/disttask/importinto/BUILD.bazel b/pkg/disttask/importinto/BUILD.bazel index 612f8d934d478..0f75d93efe1dd 100644 --- a/pkg/disttask/importinto/BUILD.bazel +++ b/pkg/disttask/importinto/BUILD.bazel @@ -58,7 +58,6 @@ go_library( "//pkg/util/etcd", "//pkg/util/logutil", "//pkg/util/promutil", - "//pkg/util/size", "//pkg/util/sqlexec", "@com_github_docker_go_units//:go-units", "@com_github_go_sql_driver_mysql//:mysql", diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 22bf187a80e72..caef4fca38f95 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -42,7 +42,6 @@ import ( "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/util/logutil" - "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -337,8 +336,6 @@ func (m *mergeSortStepExecutor) RunSubtask(ctx context.Context, subtask *proto.S getKVGroupBlockSize(sm.KVGroup), external.DefaultMemSizeLimit, 8*1024, - 1*size.MB, - 8*1024, onClose, m.taskMeta.Plan.ThreadCnt, false)