Skip to content

Commit bb43e2d

Browse files
authored
external: increase an unit of read chunk to avoid a whole reload (pingcap#50965)
ref pingcap#50752
1 parent 253340a commit bb43e2d

File tree

4 files changed

+139
-10
lines changed

4 files changed

+139
-10
lines changed

br/pkg/lightning/backend/external/bench_test.go

+76-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package external
1616

1717
import (
1818
"context"
19+
"encoding/hex"
1920
"flag"
2021
"fmt"
2122
"io"
@@ -27,6 +28,7 @@ import (
2728

2829
"github.com/docker/go-units"
2930
"github.com/felixge/fgprof"
31+
"github.com/pingcap/tidb/br/pkg/membuf"
3032
"github.com/pingcap/tidb/br/pkg/storage"
3133
"github.com/pingcap/tidb/pkg/kv"
3234
"github.com/pingcap/tidb/pkg/util/intest"
@@ -43,11 +45,17 @@ type writeTestSuite struct {
4345
memoryLimit int
4446
beforeCreateWriter func()
4547
afterWriterClose func()
48+
49+
optionalFilePath string
50+
onClose OnCloseFunc
4651
}
4752

4853
func writePlainFile(s *writeTestSuite) {
4954
ctx := context.Background()
5055
filePath := "/test/writer"
56+
if s.optionalFilePath != "" {
57+
filePath = s.optionalFilePath
58+
}
5159
_ = s.store.DeleteFile(ctx, filePath)
5260
buf := make([]byte, s.memoryLimit)
5361
offset := 0
@@ -92,9 +100,13 @@ func cleanOldFiles(ctx context.Context, store storage.ExternalStorage, subDir st
92100
func writeExternalFile(s *writeTestSuite) {
93101
ctx := context.Background()
94102
filePath := "/test/writer"
103+
if s.optionalFilePath != "" {
104+
filePath = s.optionalFilePath
105+
}
95106
cleanOldFiles(ctx, s.store, filePath)
96107
builder := NewWriterBuilder().
97-
SetMemorySizeLimit(uint64(s.memoryLimit))
108+
SetMemorySizeLimit(uint64(s.memoryLimit)).
109+
SetOnCloseFunc(s.onClose)
98110

99111
if s.beforeCreateWriter != nil {
100112
s.beforeCreateWriter()
@@ -116,6 +128,9 @@ func writeExternalFile(s *writeTestSuite) {
116128
func writeExternalOneFile(s *writeTestSuite) {
117129
ctx := context.Background()
118130
filePath := "/test/writer"
131+
if s.optionalFilePath != "" {
132+
filePath = s.optionalFilePath
133+
}
119134
cleanOldFiles(ctx, s.store, filePath)
120135
builder := NewWriterBuilder().
121136
SetMemorySizeLimit(uint64(s.memoryLimit))
@@ -126,13 +141,21 @@ func writeExternalOneFile(s *writeTestSuite) {
126141
writer := builder.BuildOneFile(
127142
s.store, filePath, "writerID")
128143
intest.AssertNoError(writer.Init(ctx, 20*1024*1024))
144+
var minKey, maxKey []byte
145+
129146
key, val, _ := s.source.next()
147+
minKey = key
130148
for key != nil {
149+
maxKey = key
131150
err := writer.WriteRow(ctx, key, val)
132151
intest.AssertNoError(err)
133152
key, val, _ = s.source.next()
134153
}
135154
intest.AssertNoError(writer.Close(ctx))
155+
s.onClose(&WriterSummary{
156+
Min: minKey,
157+
Max: maxKey,
158+
})
136159
if s.afterWriterClose != nil {
137160
s.afterWriterClose()
138161
}
@@ -674,3 +697,55 @@ func TestMergeBench(t *testing.T) {
674697
testCompareMergeWithContent(t, 8, createAscendingFiles, newMergeStep)
675698
testCompareMergeWithContent(t, 8, createEvenlyDistributedFiles, newMergeStep)
676699
}
700+
701+
func TestReadAllDataLargeFiles(t *testing.T) {
702+
ctx := context.Background()
703+
store := openTestingStorage(t)
704+
705+
// ~ 100B * 20M = 2GB
706+
source := newAscendingKeyAsyncSource(20*1024*1024, 10, 90, nil)
707+
// ~ 1KB * 2M = 2GB
708+
source2 := newAscendingKeyAsyncSource(2*1024*1024, 10, 990, nil)
709+
var minKey, maxKey kv.Key
710+
recordMinMax := func(s *WriterSummary) {
711+
minKey = s.Min
712+
maxKey = s.Max
713+
}
714+
suite := &writeTestSuite{
715+
store: store,
716+
source: source,
717+
memoryLimit: 256 * 1024 * 1024,
718+
optionalFilePath: "/test/file",
719+
onClose: recordMinMax,
720+
}
721+
suite2 := &writeTestSuite{
722+
store: store,
723+
source: source2,
724+
memoryLimit: 256 * 1024 * 1024,
725+
optionalFilePath: "/test/file2",
726+
onClose: recordMinMax,
727+
}
728+
writeExternalOneFile(suite)
729+
t.Logf("minKey: %s, maxKey: %s", minKey, maxKey)
730+
writeExternalOneFile(suite2)
731+
t.Logf("minKey: %s, maxKey: %s", minKey, maxKey)
732+
733+
dataFiles, statFiles, err := GetAllFileNames(ctx, store, "")
734+
intest.AssertNoError(err)
735+
intest.Assert(len(dataFiles) == 2)
736+
737+
// choose the two keys so that expected concurrency is 579 and 19
738+
startKey, err := hex.DecodeString("00000001000000000000")
739+
intest.AssertNoError(err)
740+
endKey, err := hex.DecodeString("00a00000000000000000")
741+
intest.AssertNoError(err)
742+
bufPool := membuf.NewPool(
743+
membuf.WithBlockNum(0),
744+
membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc),
745+
)
746+
output := &memKVsAndBuffers{}
747+
now := time.Now()
748+
err = readAllData(ctx, store, dataFiles, statFiles, startKey, endKey, bufPool, output)
749+
t.Logf("read all data cost: %s", time.Since(now))
750+
intest.AssertNoError(err)
751+
}

br/pkg/lightning/backend/external/byte_reader.go

+7
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ package external
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"io"
2021

2122
"github.com/pingcap/errors"
23+
"github.com/pingcap/failpoint"
2224
"github.com/pingcap/tidb/br/pkg/membuf"
2325
"github.com/pingcap/tidb/br/pkg/storage"
2426
"github.com/pingcap/tidb/pkg/util/logutil"
@@ -325,6 +327,11 @@ func (r *byteReader) closeConcurrentReader() (reloadCnt, offsetInOldBuffer int)
325327
zap.Int("dropBytes", r.concurrentReader.bufSizePerConc*(len(r.curBuf)-r.curBufIdx)-r.curBufOffset),
326328
zap.Int("curBufIdx", r.curBufIdx),
327329
)
330+
failpoint.Inject("assertReloadAtMostOnce", func() {
331+
if r.concurrentReader.reloadCnt > 1 {
332+
panic(fmt.Sprintf("reloadCnt is %d", r.concurrentReader.reloadCnt))
333+
}
334+
})
328335
r.concurrentReader.largeBufferPool.Destroy()
329336
r.concurrentReader.largeBuf = nil
330337
r.concurrentReader.now = false

br/pkg/lightning/backend/external/engine.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,15 @@ func getFilesReadConcurrency(
223223
startOffs, endOffs := offsets[0], offsets[1]
224224
for i := range statsFiles {
225225
result[i] = (endOffs[i] - startOffs[i]) / uint64(ConcurrentReaderBufferSizePerConc)
226-
result[i] = max(result[i], 1)
227-
if result[i] > 1 {
228-
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
229-
zap.String("filename", statsFiles[i]),
230-
zap.Uint64("startOffset", startOffs[i]),
231-
zap.Uint64("endOffset", endOffs[i]),
232-
zap.Uint64("expected concurrency", result[i]),
233-
)
234-
}
226+
// let the stat internals cover the [startKey, endKey) since seekPropsOffsets
227+
// always return an offset that is less than or equal to the key.
228+
result[i] += 1
229+
logutil.Logger(ctx).Info("found hotspot file in getFilesReadConcurrency",
230+
zap.String("filename", statsFiles[i]),
231+
zap.Uint64("startOffset", startOffs[i]),
232+
zap.Uint64("endOffset", endOffs[i]),
233+
zap.Uint64("expected concurrency", result[i]),
234+
)
235235
}
236236
return result, startOffs, nil
237237
}

br/pkg/lightning/backend/external/reader_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/pingcap/failpoint"
2526
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
2627
"github.com/pingcap/tidb/br/pkg/lightning/common"
28+
"github.com/pingcap/tidb/br/pkg/membuf"
2729
"github.com/pingcap/tidb/br/pkg/storage"
2830
"github.com/pingcap/tidb/pkg/util/size"
2931
"github.com/stretchr/testify/require"
@@ -106,3 +108,48 @@ func TestReadAllOneFile(t *testing.T) {
106108

107109
testReadAndCompare(ctx, t, kvs, memStore, datas, stats, kvs[0].Key, memSizeLimit)
108110
}
111+
112+
func TestReadLargeFile(t *testing.T) {
113+
ctx := context.Background()
114+
memStore := storage.NewMemStorage()
115+
backup := ConcurrentReaderBufferSizePerConc
116+
t.Cleanup(func() {
117+
ConcurrentReaderBufferSizePerConc = backup
118+
})
119+
ConcurrentReaderBufferSizePerConc = 512 * 1024
120+
121+
w := NewWriterBuilder().
122+
SetPropSizeDistance(128*1024).
123+
SetPropKeysDistance(1000).
124+
BuildOneFile(memStore, "/test", "0")
125+
126+
require.NoError(t, w.Init(ctx, int64(5*size.MB)))
127+
128+
val := make([]byte, 10000)
129+
for i := 0; i < 10000; i++ {
130+
key := []byte(fmt.Sprintf("key%06d", i))
131+
require.NoError(t, w.WriteRow(ctx, key, val))
132+
}
133+
require.NoError(t, w.Close(ctx))
134+
135+
datas, stats, err := GetAllFileNames(ctx, memStore, "")
136+
require.NoError(t, err)
137+
require.Len(t, datas, 1)
138+
139+
failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/assertReloadAtMostOnce", "return()")
140+
defer failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/backend/external/assertReloadAtMostOnce")
141+
142+
bufPool := membuf.NewPool(
143+
membuf.WithBlockNum(0),
144+
membuf.WithBlockSize(ConcurrentReaderBufferSizePerConc),
145+
)
146+
output := &memKVsAndBuffers{}
147+
startKey := []byte("key000000")
148+
maxKey := []byte("key004998")
149+
endKey := []byte("key004999")
150+
err = readAllData(ctx, memStore, datas, stats, startKey, endKey, bufPool, output)
151+
require.NoError(t, err)
152+
output.build(ctx)
153+
require.Equal(t, startKey, output.keys[0])
154+
require.Equal(t, maxKey, output.keys[len(output.keys)-1])
155+
}

0 commit comments

Comments
 (0)