Skip to content

Commit c3c067e

Browse files
authored
ddl: split range group for each index separately for global sort import (pingcap#50658)
close pingcap#50657
1 parent 4e202a4 commit c3c067e

8 files changed

+184
-129
lines changed

br/pkg/lightning/backend/external/writer.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,9 @@ func (rc *rangePropertiesCollector) encode() []byte {
8080

8181
// WriterSummary is the summary of a writer.
8282
type WriterSummary struct {
83-
WriterID string
84-
Seq int
83+
WriterID string
84+
GroupOffset int
85+
Seq int
8586
// Min and Max are the min and max key written by this writer, both are
8687
// inclusive, i.e. [Min, Max].
8788
// will be empty if no key is written.
@@ -100,6 +101,7 @@ func dummyOnCloseFunc(*WriterSummary) {}
100101

101102
// WriterBuilder builds a new Writer.
102103
type WriterBuilder struct {
104+
groupOffset int
103105
memSizeLimit uint64
104106
blockSize int
105107
writeBatchCount uint64
@@ -169,6 +171,15 @@ func (b *WriterBuilder) SetBlockSize(blockSize int) *WriterBuilder {
169171
return b
170172
}
171173

174+
// SetGroupOffset set the group offset of a writer.
175+
// This can be used to group the summaries from different writers.
176+
// For example, for adding multiple indexes with multi-schema-change,
177+
// we use to distinguish the summaries from different indexes.
178+
func (b *WriterBuilder) SetGroupOffset(offset int) *WriterBuilder {
179+
b.groupOffset = offset
180+
return b
181+
}
182+
172183
// Build builds a new Writer. The files writer will create are under the prefix
173184
// of "{prefix}/{writerID}".
174185
func (b *WriterBuilder) Build(
@@ -199,6 +210,7 @@ func (b *WriterBuilder) Build(
199210
filenamePrefix: filenamePrefix,
200211
keyAdapter: keyAdapter,
201212
writerID: writerID,
213+
groupOffset: b.groupOffset,
202214
onClose: b.onClose,
203215
closed: false,
204216
multiFileStats: make([]MultipleFilesStat, 1),
@@ -314,6 +326,7 @@ func GetMaxOverlappingTotal(stats []MultipleFilesStat) int64 {
314326
type Writer struct {
315327
store storage.ExternalStorage
316328
writerID string
329+
groupOffset int
317330
currentSeq int
318331
filenamePrefix string
319332
keyAdapter common.KeyAdapter
@@ -407,6 +420,7 @@ func (w *Writer) Close(ctx context.Context) error {
407420
w.kvLocations = nil
408421
w.onClose(&WriterSummary{
409422
WriterID: w.writerID,
423+
GroupOffset: w.groupOffset,
410424
Seq: w.currentSeq,
411425
Min: w.minKey,
412426
Max: w.maxKey,

pkg/ddl/backfilling_dist_executor.go

+31-3
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,40 @@ type BackfillTaskMeta struct {
5353
type BackfillSubTaskMeta struct {
5454
PhysicalTableID int64 `json:"physical_table_id"`
5555

56-
RangeSplitKeys [][]byte `json:"range_split_keys"`
57-
DataFiles []string `json:"data-files"`
58-
StatFiles []string `json:"stat-files"`
56+
// Used by read index step.
57+
RowStart []byte `json:"row_start"`
58+
RowEnd []byte `json:"row_end"`
59+
60+
// Used by global sort write & ingest step.
61+
RangeSplitKeys [][]byte `json:"range_split_keys,omitempty"`
62+
DataFiles []string `json:"data-files,omitempty"`
63+
StatFiles []string `json:"stat-files,omitempty"`
64+
// Each group of MetaGroups represents a different index kvs meta.
65+
MetaGroups []*external.SortedKVMeta `json:"meta_groups,omitempty"`
66+
// Only used for adding one single index.
67+
// Keep this for compatibility with v7.5.
5968
external.SortedKVMeta `json:",inline"`
6069
}
6170

71+
func decodeBackfillSubTaskMeta(raw []byte) (*BackfillSubTaskMeta, error) {
72+
var subtask BackfillSubTaskMeta
73+
err := json.Unmarshal(raw, &subtask)
74+
if err != nil {
75+
return nil, errors.Trace(err)
76+
}
77+
78+
// For compatibility with old version TiDB.
79+
if len(subtask.RowStart) == 0 {
80+
subtask.RowStart = subtask.SortedKVMeta.StartKey
81+
subtask.RowEnd = subtask.SortedKVMeta.EndKey
82+
}
83+
if len(subtask.MetaGroups) == 0 {
84+
m := subtask.SortedKVMeta
85+
subtask.MetaGroups = []*external.SortedKVMeta{&m}
86+
}
87+
return &subtask, nil
88+
}
89+
6290
// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
6391
func NewBackfillSubtaskExecutor(taskMeta []byte, d *ddl,
6492
bc ingest.BackendCtx, stage proto.Step) (execute.StepExecutor, error) {

pkg/ddl/backfilling_dist_scheduler.go

+90-64
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
118118

119119
failpoint.Inject("mockWriteIngest", func() {
120120
m := &BackfillSubTaskMeta{
121-
SortedKVMeta: external.SortedKVMeta{},
121+
MetaGroups: []*external.SortedKVMeta{},
122122
}
123123
metaBytes, _ := json.Marshal(m)
124124
metaArr := make([][]byte, 0, 16)
@@ -297,16 +297,14 @@ func generateNonPartitionPlan(
297297
}
298298
batch := recordRegionMetas[i:end]
299299
subTaskMeta := &BackfillSubTaskMeta{
300-
SortedKVMeta: external.SortedKVMeta{
301-
StartKey: batch[0].StartKey(),
302-
EndKey: batch[len(batch)-1].EndKey(),
303-
},
300+
RowStart: batch[0].StartKey(),
301+
RowEnd: batch[len(batch)-1].EndKey(),
304302
}
305303
if i == 0 {
306-
subTaskMeta.StartKey = startKey
304+
subTaskMeta.RowStart = startKey
307305
}
308306
if end == len(recordRegionMetas) {
309-
subTaskMeta.EndKey = endKey
307+
subTaskMeta.RowEnd = endKey
310308
}
311309
metaBytes, err := json.Marshal(subTaskMeta)
312310
if err != nil {
@@ -339,20 +337,55 @@ func generateGlobalSortIngestPlan(
339337
step proto.Step,
340338
logger *zap.Logger,
341339
) ([][]byte, error) {
342-
startKeyFromSumm, endKeyFromSumm, totalSize, multiFileStat, err := getSummaryFromLastStep(taskHandle, task.ID, step)
340+
var kvMetaGroups []*external.SortedKVMeta
341+
err := forEachBackfillSubtaskMeta(taskHandle, task.ID, step, func(subtask *BackfillSubTaskMeta) {
342+
if kvMetaGroups == nil {
343+
kvMetaGroups = make([]*external.SortedKVMeta, len(subtask.MetaGroups))
344+
}
345+
for i, cur := range subtask.MetaGroups {
346+
if kvMetaGroups[i] == nil {
347+
kvMetaGroups[i] = &external.SortedKVMeta{}
348+
}
349+
kvMetaGroups[i].Merge(cur)
350+
}
351+
})
343352
if err != nil {
344353
return nil, err
345354
}
346-
if len(startKeyFromSumm) == 0 && len(endKeyFromSumm) == 0 {
347-
// Skip global sort for empty table.
348-
return nil, nil
349-
}
350355
instanceIDs, err := scheduler.GetLiveExecIDs(ctx)
351356
if err != nil {
352357
return nil, err
353358
}
359+
metaArr := make([][]byte, 0, 16)
360+
for i, g := range kvMetaGroups {
361+
if g == nil {
362+
logger.Error("meet empty kv group when getting subtask summary",
363+
zap.Int64("taskID", task.ID))
364+
return nil, errors.Errorf("subtask kv group %d is empty", i)
365+
}
366+
newMeta, err := splitSubtaskMetaForOneKVMetaGroup(ctx, store, g, cloudStorageURI, int64(len(instanceIDs)), logger)
367+
if err != nil {
368+
return nil, errors.Trace(err)
369+
}
370+
metaArr = append(metaArr, newMeta...)
371+
}
372+
return metaArr, nil
373+
}
374+
375+
func splitSubtaskMetaForOneKVMetaGroup(
376+
ctx context.Context,
377+
store kv.StorageWithPD,
378+
kvMeta *external.SortedKVMeta,
379+
cloudStorageURI string,
380+
instanceCnt int64,
381+
logger *zap.Logger,
382+
) (metaArr [][]byte, err error) {
383+
if len(kvMeta.StartKey) == 0 && len(kvMeta.EndKey) == 0 {
384+
// Skip global sort for empty table.
385+
return nil, nil
386+
}
354387
splitter, err := getRangeSplitter(
355-
ctx, store, cloudStorageURI, int64(totalSize), int64(len(instanceIDs)), multiFileStat, logger)
388+
ctx, store, cloudStorageURI, int64(kvMeta.TotalKVSize), instanceCnt, kvMeta.MultipleFilesStats, logger)
356389
if err != nil {
357390
return nil, err
358391
}
@@ -363,33 +396,32 @@ func generateGlobalSortIngestPlan(
363396
}
364397
}()
365398

366-
metaArr := make([][]byte, 0, 16)
367-
startKey := startKeyFromSumm
399+
startKey := kvMeta.StartKey
368400
var endKey kv.Key
369401
for {
370402
endKeyOfGroup, dataFiles, statFiles, rangeSplitKeys, err := splitter.SplitOneRangesGroup()
371403
if err != nil {
372404
return nil, err
373405
}
374406
if len(endKeyOfGroup) == 0 {
375-
endKey = endKeyFromSumm
407+
endKey = kvMeta.EndKey
376408
} else {
377409
endKey = kv.Key(endKeyOfGroup).Clone()
378410
}
379411
logger.Info("split subtask range",
380412
zap.String("startKey", hex.EncodeToString(startKey)),
381413
zap.String("endKey", hex.EncodeToString(endKey)))
382414

383-
if startKey.Cmp(endKey) >= 0 {
415+
if bytes.Compare(startKey, endKey) >= 0 {
384416
return nil, errors.Errorf("invalid range, startKey: %s, endKey: %s",
385417
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
386418
}
387419
m := &BackfillSubTaskMeta{
388-
SortedKVMeta: external.SortedKVMeta{
420+
MetaGroups: []*external.SortedKVMeta{{
389421
StartKey: startKey,
390422
EndKey: endKey,
391-
TotalKVSize: totalSize / uint64(len(instanceIDs)),
392-
},
423+
TotalKVSize: kvMeta.TotalKVSize / uint64(instanceCnt),
424+
}},
393425
DataFiles: dataFiles,
394426
StatFiles: statFiles,
395427
RangeSplitKeys: rangeSplitKeys,
@@ -400,10 +432,11 @@ func generateGlobalSortIngestPlan(
400432
}
401433
metaArr = append(metaArr, metaBytes)
402434
if len(endKeyOfGroup) == 0 {
403-
return metaArr, nil
435+
break
404436
}
405437
startKey = endKey
406438
}
439+
return metaArr, nil
407440
}
408441

409442
func generateMergePlan(
@@ -413,33 +446,41 @@ func generateMergePlan(
413446
) ([][]byte, error) {
414447
// check data files overlaps,
415448
// if data files overlaps too much, we need a merge step.
416-
subTaskMetas, err := taskHandle.GetPreviousSubtaskMetas(task.ID, proto.BackfillStepReadIndex)
449+
multiStats := make([]external.MultipleFilesStat, 0, 100)
450+
var kvMetaGroups []*external.SortedKVMeta
451+
err := forEachBackfillSubtaskMeta(taskHandle, task.ID, proto.BackfillStepReadIndex,
452+
func(subtask *BackfillSubTaskMeta) {
453+
if kvMetaGroups == nil {
454+
kvMetaGroups = make([]*external.SortedKVMeta, len(subtask.MetaGroups))
455+
}
456+
for i, g := range subtask.MetaGroups {
457+
if kvMetaGroups[i] == nil {
458+
kvMetaGroups[i] = &external.SortedKVMeta{}
459+
}
460+
kvMetaGroups[i].Merge(g)
461+
multiStats = append(multiStats, g.MultipleFilesStats...)
462+
}
463+
})
417464
if err != nil {
418465
return nil, err
419466
}
420-
multiStats := make([]external.MultipleFilesStat, 0, 100)
421-
for _, bs := range subTaskMetas {
422-
var subtask BackfillSubTaskMeta
423-
err = json.Unmarshal(bs, &subtask)
424-
if err != nil {
425-
return nil, err
426-
}
427-
multiStats = append(multiStats, subtask.MultipleFilesStats...)
428-
}
429467
if skipMergeSort(multiStats) {
430468
logger.Info("skip merge sort")
431469
return nil, nil
432470
}
433471

434472
// generate merge sort plan.
435-
_, _, _, multiFileStat, err := getSummaryFromLastStep(taskHandle, task.ID, proto.BackfillStepReadIndex)
436-
if err != nil {
437-
return nil, err
438-
}
439473
dataFiles := make([]string, 0, 1000)
440-
for _, m := range multiFileStat {
441-
for _, filePair := range m.Filenames {
442-
dataFiles = append(dataFiles, filePair[0])
474+
for i, g := range kvMetaGroups {
475+
if g == nil {
476+
logger.Error("meet empty kv group when getting subtask summary",
477+
zap.Int64("taskID", task.ID))
478+
return nil, errors.Errorf("subtask kv group %d is empty", i)
479+
}
480+
for _, m := range g.MultipleFilesStats {
481+
for _, filePair := range m.Filenames {
482+
dataFiles = append(dataFiles, filePair[0])
483+
}
443484
}
444485
}
445486

@@ -508,40 +549,25 @@ func getRangeSplitter(
508549
rangeGroupSize, rangeGroupKeys, maxSizePerRange, maxKeysPerRange, true)
509550
}
510551

511-
func getSummaryFromLastStep(
552+
func forEachBackfillSubtaskMeta(
512553
taskHandle diststorage.TaskHandle,
513554
gTaskID int64,
514555
step proto.Step,
515-
) (startKey, endKey kv.Key, totalKVSize uint64, multiFileStat []external.MultipleFilesStat, err error) {
556+
fn func(subtask *BackfillSubTaskMeta),
557+
) error {
516558
subTaskMetas, err := taskHandle.GetPreviousSubtaskMetas(gTaskID, step)
517559
if err != nil {
518-
return nil, nil, 0, nil, errors.Trace(err)
560+
return errors.Trace(err)
519561
}
520562
for _, subTaskMeta := range subTaskMetas {
521-
var subtask BackfillSubTaskMeta
522-
err := json.Unmarshal(subTaskMeta, &subtask)
563+
subtask, err := decodeBackfillSubTaskMeta(subTaskMeta)
523564
if err != nil {
524-
return nil, nil, 0, nil, errors.Trace(err)
525-
}
526-
// Skip empty subtask.StartKey/EndKey because it means
527-
// no records need to be written in this subtask.
528-
if subtask.StartKey == nil || subtask.EndKey == nil {
529-
continue
530-
}
531-
532-
if len(startKey) == 0 {
533-
startKey = subtask.StartKey
534-
} else {
535-
startKey = external.BytesMin(startKey, subtask.StartKey)
536-
}
537-
if len(endKey) == 0 {
538-
endKey = subtask.EndKey
539-
} else {
540-
endKey = external.BytesMax(endKey, subtask.EndKey)
565+
logutil.BgLogger().Error("unmarshal error",
566+
zap.String("category", "ddl"),
567+
zap.Error(err))
568+
return errors.Trace(err)
541569
}
542-
totalKVSize += subtask.TotalKVSize
543-
544-
multiFileStat = append(multiFileStat, subtask.MultipleFilesStats...)
570+
fn(subtask)
545571
}
546-
return startKey, endKey, totalKVSize, multiFileStat, nil
572+
return nil
547573
}

pkg/ddl/backfilling_dist_scheduler_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
180180

181181
// update meta, same as import into.
182182
sortStepMeta := &ddl.BackfillSubTaskMeta{
183-
SortedKVMeta: external.SortedKVMeta{
183+
MetaGroups: []*external.SortedKVMeta{{
184184
StartKey: []byte("ta"),
185185
EndKey: []byte("tc"),
186186
TotalKVSize: 12,
@@ -191,7 +191,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
191191
},
192192
},
193193
},
194-
},
194+
}},
195195
}
196196
sortStepMetaBytes, err := json.Marshal(sortStepMeta)
197197
require.NoError(t, err)
@@ -220,7 +220,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
220220
gotSubtasks, err = mgr.GetSubtasksWithHistory(ctx, taskID, task.Step)
221221
require.NoError(t, err)
222222
mergeSortStepMeta := &ddl.BackfillSubTaskMeta{
223-
SortedKVMeta: external.SortedKVMeta{
223+
MetaGroups: []*external.SortedKVMeta{{
224224
StartKey: []byte("ta"),
225225
EndKey: []byte("tc"),
226226
TotalKVSize: 12,
@@ -231,7 +231,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
231231
},
232232
},
233233
},
234-
},
234+
}},
235235
}
236236
mergeSortStepMetaBytes, err := json.Marshal(mergeSortStepMeta)
237237
require.NoError(t, err)

0 commit comments

Comments
 (0)