Skip to content

Commit

Permalink
scheduler(ticdc): fix span loss bug when split regions (#11715)
Browse files Browse the repository at this point in the history
close #11675
  • Loading branch information
wk989898 authored Nov 6, 2024
1 parent 006a827 commit 1da37a2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 7 deletions.
35 changes: 31 additions & 4 deletions cdc/scheduler/internal/v3/keyspan/splitter_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ func (m *writeSplitter) splitRegionsByWrittenKeysV1(
restRegions := len(regions) - i
regionCount++
spanWriteWeight += regions[i].WrittenKeys
// If the restSpans count is one, and the restWeight is less than writeLimitPerSpan,
// If the restSpans count is one, and the restWeight is less than equal to writeLimitPerSpan,
// we will use the rest regions as the last span. If the restWeight is larger than writeLimitPerSpan,
// then we need to add more restSpans (restWeight / writeLimitPerSpan) to split the rest regions.
// then we need to add more restSpans (restWeight / writeLimitPerSpan) + 1 to split the rest regions.
if restSpans == 1 {
if restWeight < int64(writeLimitPerSpan) {
if restWeight <= int64(writeLimitPerSpan) {
spans = append(spans, tablepb.Span{
TableID: tableID,
StartKey: tablepb.Key(decodeKey(regions[spanStartIndex].StartKey)),
Expand All @@ -191,11 +191,12 @@ func (m *writeSplitter) splitRegionsByWrittenKeysV1(
regionCounts = append(regionCounts, lastSpanRegionCount)
weights = append(weights, lastSpanWriteWeight)
writeKeys = append(writeKeys, lastSpanWriteKey)
spanStartIndex = len(regions)
break
}
// If the restWeight is larger than writeLimitPerSpan,
// then we need to update the restSpans.
restSpans = int(restWeight) / int(writeLimitPerSpan)
restSpans = int(restWeight)/int(writeLimitPerSpan) + 1
}

// If the restRegions is less than equal to restSpans,
Expand Down Expand Up @@ -237,6 +238,32 @@ func (m *writeSplitter) splitRegionsByWrittenKeysV1(
spanStartIndex = i + 1
}
}
// All regions should be processed and append to spans
if spanStartIndex != len(regions) {
spans = append(spans, tablepb.Span{
TableID: tableID,
StartKey: tablepb.Key(decodeKey(regions[spanStartIndex].StartKey)),
EndKey: tablepb.Key(decodeKey(regions[len(regions)-1].EndKey)),
})
lastSpanRegionCount := len(regions) - spanStartIndex
lastSpanWriteWeight := uint64(0)
lastSpanWriteKey := uint64(0)
for j := spanStartIndex; j < len(regions); j++ {
lastSpanWriteKey += regions[j].WrittenKeys
lastSpanWriteWeight += regions[j].WrittenKeys
}
regionCounts = append(regionCounts, lastSpanRegionCount)
weights = append(weights, lastSpanWriteWeight)
writeKeys = append(writeKeys, lastSpanWriteKey)
log.Warn("some regions are added to the last span, it should not appear",
zap.Int("spanStartIndex", spanStartIndex),
zap.Int("regionsLength", len(regions)),
zap.Int("restSpans", restSpans),
zap.Int64("restWeight", restWeight),
zap.Any("prevSpan", spans[len(spans)-2]),
zap.Any("lastSpan", spans[len(spans)-1]),
)
}
return &splitRegionsInfo{
RegionCounts: regionCounts,
Weights: weights,
Expand Down
45 changes: 42 additions & 3 deletions cdc/scheduler/internal/v3/keyspan/splitter_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,44 @@ func TestSplitRegionEven(t *testing.T) {
info := splitter.splitRegionsByWrittenKeysV1(tblID, regions, 5)
require.Len(t, info.RegionCounts, 5)
require.Len(t, info.Weights, 5)
count := 0
for i, w := range info.Weights {
if i == 4 {
require.Equal(t, uint64(9576), w, i)
} else {
require.Equal(t, uint64(9591), w, i)
}
count += info.RegionCounts[i]
}
require.Equal(t, count, regionCount)
}

func TestSplitLargeRegion(t *testing.T) {
tblID := model.TableID(1)
regionCount := spanRegionLimit*5 + 1000
regions := make([]pdutil.RegionInfo, regionCount)
for i := 0; i < regionCount; i++ {
regions[i] = pdutil.RegionInfo{
ID: uint64(i),
StartKey: "" + strconv.Itoa(i),
EndKey: "" + strconv.Itoa(i),
WrittenKeys: 2,
}
}
splitter := newWriteSplitter(model.ChangeFeedID4Test("test", "test"), nil, 4)
info := splitter.splitRegionsByWrittenKeysV1(tblID, regions, 5)
require.Len(t, info.RegionCounts, 6)
require.Len(t, info.Weights, 6)
count := 0
for i, c := range info.RegionCounts {
if i == 5 {
require.Equal(t, 1000, c, i)
} else {
require.Equal(t, spanRegionLimit, c, i)
}
count += c
}
require.Equal(t, count, regionCount)
}

func TestSpanRegionLimitBase(t *testing.T) {
Expand All @@ -247,9 +278,12 @@ func TestSpanRegionLimitBase(t *testing.T) {
spanNum := getSpansNumber(len(regions), captureNum)
info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), spanNum)
require.Len(t, info.RegionCounts, spanNum)
count := 0
for _, c := range info.RegionCounts {
require.LessOrEqual(t, c, int(spanRegionLimit*1.1))
count += c
}
require.Equal(t, count, len(regions))
}

func TestSpanRegionLimit(t *testing.T) {
Expand All @@ -273,7 +307,7 @@ func TestSpanRegionLimit(t *testing.T) {
}

// total region number
totalRegionNumbers := spanRegionLimit * 10
totalRegionNumbers := spanRegionLimit*10 + 100

// writtenKeys over 20000 percentage
percentOver20000 := 1
Expand Down Expand Up @@ -318,7 +352,12 @@ func TestSpanRegionLimit(t *testing.T) {
spanNum := getSpansNumber(len(regions), captureNum)
info := splitter.splitRegionsByWrittenKeysV1(0, cloneRegions(regions), spanNum)
require.LessOrEqual(t, spanNum, len(info.RegionCounts))
for _, c := range info.RegionCounts {
require.LessOrEqual(t, c, int(spanRegionLimit*1.1))
count := 0
for i, c := range info.RegionCounts {
if i != len(info.RegionCounts)-1 {
require.LessOrEqual(t, c, int(spanRegionLimit*1.1))
}
count += c
}
require.Equal(t, count, totalRegionNumbers)
}

0 comments on commit 1da37a2

Please sign in to comment.