Skip to content

Commit

Permalink
global sort: new merge with 1 writer (pingcap#49474)
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy authored Dec 19, 2023
1 parent 210bc42 commit 971b611
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 105 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"iter.go",
"kv_reader.go",
"merge.go",
"merge_v2.go",
"onefile_writer.go",
"reader.go",
"split.go",
Expand Down Expand Up @@ -73,6 +74,7 @@ go_test(
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/membuf",
"//br/pkg/storage",
"//pkg/kv",
Expand All @@ -91,10 +93,12 @@ go_test(
"@com_github_johannesboyne_gofakes3//:gofakes3",
"@com_github_johannesboyne_gofakes3//backend/s3mem",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
5 changes: 2 additions & 3 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func writeExternalOneFile(s *writeTestSuite) {
}
writer := builder.BuildOneFile(
s.store, filePath, "writerID")
_ = writer.Init(ctx, 20*1024*1024)
intest.AssertNoError(writer.Init(ctx, 20*1024*1024))
key, val, _ := s.source.next()
for key != nil {
err := writer.WriteRow(ctx, key, val)
Expand All @@ -371,8 +371,7 @@ func writeExternalOneFile(s *writeTestSuite) {
if s.beforeWriterClose != nil {
s.beforeWriterClose()
}
err := writer.Close(ctx)
intest.AssertNoError(err)
intest.AssertNoError(writer.Close(ctx))
if s.afterWriterClose != nil {
s.afterWriterClose()
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func getFilesReadConcurrency(
for i := range statsFiles {
result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
result[i] = max(result[i], 1)
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
logutil.Logger(ctx).Debug("found hotspot file in getFilesReadConcurrency",
zap.String("filename", statsFiles[i]),
zap.Uint64("startOffset", startOffs[i]),
zap.Uint64("endOffset", endOffs[i]),
Expand Down
66 changes: 3 additions & 63 deletions br/pkg/lightning/backend/external/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func MergeOverlappingFiles(
for _, files := range dataFilesSlice {
files := files
eg.Go(func() error {
return mergeOverlappingFilesV2(
return mergeOverlappingFilesInternal(
egCtx,
files,
store,
Expand All @@ -74,68 +74,9 @@ func MergeOverlappingFiles(
return eg.Wait()
}

// unused for now.
func mergeOverlappingFilesImpl(ctx context.Context,
paths []string,
store storage.ExternalStorage,
readBufferSize int,
newFilePrefix string,
writerID string,
memSizeLimit uint64,
blockSize int,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
onClose OnCloseFunc,
checkHotspot bool,
) (err error) {
task := log.BeginTask(logutil.Logger(ctx).With(
zap.String("writer-id", writerID),
zap.Int("file-count", len(paths)),
), "merge overlapping files")
defer func() {
task.End(zap.ErrorLevel, err)
}()

zeroOffsets := make([]uint64, len(paths))
iter, err := NewMergeKVIter(ctx, paths, zeroOffsets, store, readBufferSize, checkHotspot, 0)
if err != nil {
return err
}
defer func() {
err := iter.Close()
if err != nil {
logutil.Logger(ctx).Warn("close iterator failed", zap.Error(err))
}
}()

writer := NewWriterBuilder().
SetMemorySizeLimit(memSizeLimit).
SetBlockSize(blockSize).
SetOnCloseFunc(onClose).
SetWriterBatchCount(writeBatchCount).
SetPropSizeDistance(propSizeDist).
SetPropKeysDistance(propKeysDist).
Build(store, newFilePrefix, writerID)

// currently use same goroutine to do read and write. The main advantage is
// there's no KV copy and iter can reuse the buffer.
for iter.Next() {
err = writer.WriteRow(ctx, iter.Key(), iter.Value(), nil)
if err != nil {
return err
}
}
err = iter.Error()
if err != nil {
return err
}
return writer.Close(ctx)
}

// mergeOverlappingFilesV2 reads from given files whose key range may overlap
// mergeOverlappingFilesInternal reads from given files whose key range may overlap
// and writes to one new sorted, nonoverlapping files.
func mergeOverlappingFilesV2(
func mergeOverlappingFilesInternal(
ctx context.Context,
paths []string,
store storage.ExternalStorage,
Expand Down Expand Up @@ -177,7 +118,6 @@ func mergeOverlappingFilesV2(
SetWriterBatchCount(writeBatchCount).
SetPropKeysDistance(propKeysDist).
SetPropSizeDistance(propSizeDist).
SetOnCloseFunc(onClose).
BuildOneFile(store, newFilePrefix, writerID)
err = writer.Init(ctx, partSize)
if err != nil {
Expand Down
190 changes: 190 additions & 0 deletions br/pkg/lightning/backend/external/merge_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package external

import (
"bytes"
"context"
"math"
"time"

"github.com/jfcg/sorty/v2"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"go.uber.org/zap"
)

// MergeOverlappingFilesV2 reads from given files whose key range may overlap
// and writes to new sorted, nonoverlapping files.
// Using 1 readAllData and 1 writer.
func MergeOverlappingFilesV2(
ctx context.Context,
dataFiles []string,
statFiles []string,
store storage.ExternalStorage,
startKey []byte,
endKey []byte,
partSize int64,
newFilePrefix string,
writerID string,
blockSize int,
writeBatchCount uint64,
propSizeDist uint64,
propKeysDist uint64,
onClose OnCloseFunc,
concurrency int,
checkHotspot bool,
) (err error) {
task := log.BeginTask(logutil.Logger(ctx).With(
zap.Int("data-file-count", len(dataFiles)),
zap.Int("stat-file-count", len(statFiles)),
zap.Binary("start-key", startKey),
zap.Binary("end-key", endKey),
zap.String("new-file-prefix", newFilePrefix),
zap.Int("concurrency", concurrency),
), "merge overlapping files")
defer func() {
task.End(zap.ErrorLevel, err)
}()

rangesGroupSize := 4 * size.GB
failpoint.Inject("mockRangesGroupSize", func(val failpoint.Value) {
rangesGroupSize = uint64(val.(int))
})

splitter, err := NewRangeSplitter(
ctx,
dataFiles,
statFiles,
store,
int64(rangesGroupSize),
math.MaxInt64,
int64(4*size.GB),
math.MaxInt64,
checkHotspot,
)
if err != nil {
return err
}

writer := NewWriterBuilder().
SetMemorySizeLimit(DefaultMemSizeLimit).
SetBlockSize(blockSize).
SetPropKeysDistance(propKeysDist).
SetPropSizeDistance(propSizeDist).
SetOnCloseFunc(onClose).
BuildOneFile(
store,
newFilePrefix,
writerID)
defer func() {
err = splitter.Close()
if err != nil {
logutil.Logger(ctx).Warn("close range splitter failed", zap.Error(err))
}
err = writer.Close(ctx)
if err != nil {
logutil.Logger(ctx).Warn("close writer failed", zap.Error(err))
}
}()

err = writer.Init(ctx, partSize)
if err != nil {
logutil.Logger(ctx).Warn("init writer failed", zap.Error(err))
return
}

bufPool := membuf.NewPool()
loaded := &memKVsAndBuffers{}
curStart := kv.Key(startKey).Clone()
var curEnd kv.Key
var maxKey, minKey kv.Key

for {
endKeyOfGroup, dataFilesOfGroup, statFilesOfGroup, _, err1 := splitter.SplitOneRangesGroup()
if err1 != nil {
logutil.Logger(ctx).Warn("split one ranges group failed", zap.Error(err1))
return
}
curEnd = kv.Key(endKeyOfGroup).Clone()
if len(endKeyOfGroup) == 0 {
curEnd = kv.Key(endKey).Clone()
}
now := time.Now()
err1 = readAllData(
ctx,
store,
dataFilesOfGroup,
statFilesOfGroup,
curStart,
curEnd,
bufPool,
loaded,
)
if err1 != nil {
logutil.Logger(ctx).Warn("read all data failed", zap.Error(err1))
return
}
readTime := time.Since(now)
now = time.Now()
sorty.MaxGor = uint64(concurrency)
sorty.Sort(len(loaded.keys), func(i, k, r, s int) bool {
if bytes.Compare(loaded.keys[i], loaded.keys[k]) < 0 { // strict comparator like < or >
if r != s {
loaded.keys[r], loaded.keys[s] = loaded.keys[s], loaded.keys[r]
loaded.values[r], loaded.values[s] = loaded.values[s], loaded.values[r]
}
return true
}
return false
})
sortTime := time.Since(now)
now = time.Now()
for i, key := range loaded.keys {
err1 = writer.WriteRow(ctx, key, loaded.values[i])
if err1 != nil {
logutil.Logger(ctx).Warn("write one row to writer failed", zap.Error(err1))
return
}
}
writeTime := time.Since(now)
logutil.Logger(ctx).Info("sort one group in MergeOverlappingFiles",
zap.Duration("read time", readTime),
zap.Duration("sort time", sortTime),
zap.Duration("write time", writeTime),
zap.Int("key len", len(loaded.keys)))

if len(minKey) == 0 {
minKey = kv.Key(loaded.keys[0]).Clone()
}
maxKey = kv.Key(loaded.keys[len(loaded.keys)-1]).Clone()
curStart = curEnd.Clone()
loaded.keys = nil
loaded.values = nil
loaded.memKVBuffers = nil

if len(endKeyOfGroup) == 0 {
break
}
}

var stat MultipleFilesStat
stat.Filenames = append(stat.Filenames,
[2]string{writer.dataFile, writer.statFile})

stat.build([]kv.Key{minKey}, []kv.Key{curEnd})
if onClose != nil {
onClose(&WriterSummary{
WriterID: writer.writerID,
Seq: 0,
Min: minKey,
Max: maxKey,
TotalSize: writer.totalSize,
MultipleFilesStats: []MultipleFilesStat{stat},
})
}
return
}
3 changes: 1 addition & 2 deletions br/pkg/lightning/backend/external/onefile_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ type OneFileWriter struct {
dataWriter storage.ExternalFileWriter
statWriter storage.ExternalFileWriter

onClose OnCloseFunc
closed bool
closed bool

logger *zap.Logger
}
Expand Down
Loading

0 comments on commit 971b611

Please sign in to comment.