Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/lightning : remove get_regions call in physical backend (#46202) #46335

Open
wants to merge 1 commit into
base: release-6.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ func (local *local) writeAndIngestByRange(
ctx, cancel := context.WithCancel(ctxt)
defer cancel()

<<<<<<< HEAD
WriteAndIngest:
for retry := 0; retry < maxRetryTimes; {
if retry != 0 {
Expand Down Expand Up @@ -1121,6 +1122,11 @@ WriteAndIngest:
logutil.Key("endKey", end), log.ShortError(err), zap.Int("retry", retry))
continue WriteAndIngest
}
=======
err = local.SplitAndScatterRegionInBatches(ctx, initialSplitRanges, needSplit, maxBatchSplitRanges)
if err == nil || common.IsContextCanceledError(err) {
break
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
}

return err
Expand Down
23 changes: 18 additions & 5 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,15 @@ var (
func (local *local) SplitAndScatterRegionInBatches(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
regionSplitSize int64,
batchCnt int,
) error {
for i := 0; i < len(ranges); i += batchCnt {
batch := ranges[i:]
if len(batch) > batchCnt {
batch = batch[:batchCnt]
}
if err := local.SplitAndScatterRegionByRanges(ctx, batch, tableInfo, needSplit, regionSplitSize); err != nil {
if err := local.SplitAndScatterRegionByRanges(ctx, batch, needSplit); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -89,10 +87,13 @@ func (local *local) SplitAndScatterRegionInBatches(
func (local *local) SplitAndScatterRegionByRanges(
ctx context.Context,
ranges []Range,
tableInfo *checkpoints.TidbTableInfo,
needSplit bool,
<<<<<<< HEAD
regionSplitSize int64,
) error {
=======
) (err error) {
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
if len(ranges) == 0 {
return nil
}
Expand All @@ -108,7 +109,6 @@ func (local *local) SplitAndScatterRegionByRanges(
scatterRegions := make([]*split.RegionInfo, 0)
var retryKeys [][]byte
waitTime := splitRegionBaseBackOffTime
skippedKeys := 0
for i := 0; i < splitRetryTimes; i++ {
log.L().Info("split and scatter region",
logutil.Key("minKey", minKey),
Expand Down Expand Up @@ -170,6 +170,7 @@ func (local *local) SplitAndScatterRegionByRanges(
return nil
}

<<<<<<< HEAD
var tableRegionStats map[uint64]int64
if tableInfo != nil {
tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID)
Expand All @@ -180,6 +181,8 @@ func (local *local) SplitAndScatterRegionByRanges(
}
}

=======
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
Expand Down Expand Up @@ -289,6 +292,7 @@ func (local *local) SplitAndScatterRegionByRanges(
}
sendLoop:
for regionID, keys := range splitKeyMap {
<<<<<<< HEAD
// if region not in tableRegionStats, that means this region is newly split, so
// we can skip split it again.
regionSize, ok := tableRegionStats[regionID]
Expand All @@ -298,6 +302,8 @@ func (local *local) SplitAndScatterRegionByRanges(
if len(keys) == 1 && regionSize < regionSplitSize {
skippedKeys++
}
=======
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
select {
case ch <- &splitInfo{region: regionMap[regionID], keys: keys}:
case <-ctx.Done():
Expand Down Expand Up @@ -340,12 +346,19 @@ func (local *local) SplitAndScatterRegionByRanges(
scatterCount++
}
if scatterCount == len(scatterRegions) {
<<<<<<< HEAD
log.L().Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.L().Info("waiting for scattering regions timeout",
zap.Int("skipped_keys", skippedKeys),
=======
log.FromContext(ctx).Info("waiting for scattering regions done",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.FromContext(ctx).Info("waiting for scattering regions timeout",
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
Expand Down
97 changes: 95 additions & 2 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,15 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie
start = end
}

<<<<<<< HEAD
err = local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
if len(errPat) == 0 {
require.NoError(t, err)
} else {
=======
err = local.SplitAndScatterRegionByRanges(ctx, ranges, true)
if len(errPat) != 0 {
>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
require.Error(t, err)
require.Regexp(t, errPat, err.Error())
return
Expand All @@ -465,6 +470,94 @@ func TestBatchSplitRegionByRanges(t *testing.T) {
doTestBatchSplitRegionByRanges(context.Background(), t, nil, "", nil)
}

<<<<<<< HEAD
=======
type checkScatterClient struct {
*testSplitClient

mu sync.Mutex
notFoundFirstTime map[uint64]struct{}
scatterCounter atomic.Int32
}

func newCheckScatterClient(inner *testSplitClient) *checkScatterClient {
return &checkScatterClient{
testSplitClient: inner,
notFoundFirstTime: map[uint64]struct{}{},
scatterCounter: atomic.Int32{},
}
}

func (c *checkScatterClient) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error {
c.scatterCounter.Add(1)
return nil
}

func (c *checkScatterClient) GetRegionByID(ctx context.Context, regionID uint64) (*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.notFoundFirstTime[regionID]; !ok {
c.notFoundFirstTime[regionID] = struct{}{}
return nil, nil
}
return c.testSplitClient.GetRegionByID(ctx, regionID)
}

func TestMissingScatter(t *testing.T) {
ctx := context.Background()
splitHook := defaultHook{}
deferFunc := splitHook.setup(t)
defer deferFunc()

keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")}
client := initTestSplitClient(keys, nil)
checkClient := newCheckScatterClient(client)
local := &Backend{
splitCli: checkClient,
logger: log.L(),
}
local.RegionSplitBatchSize = 4
local.RegionSplitConcurrency = 4

// current region ranges: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
rangeStart := codec.EncodeBytes([]byte{}, []byte("b"))
rangeEnd := codec.EncodeBytes([]byte{}, []byte("c"))
regions, err := split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
require.NoError(t, err)
// regions is: [aay, bba), [bba, bbh), [bbh, cca)
checkRegionRanges(t, regions, [][]byte{[]byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca")})

// generate: ranges [b, ba), [ba, bb), [bb, bc), ... [by, bz)
ranges := make([]Range, 0)
start := []byte{'b'}
for i := byte('a'); i <= 'z'; i++ {
end := []byte{'b', i}
ranges = append(ranges, Range{start: start, end: end})
start = end
}

err = local.SplitAndScatterRegionByRanges(ctx, ranges, true)
require.NoError(t, err)

splitHook.check(t, client)

// check split ranges
regions, err = split.PaginateScanRegion(ctx, client, rangeStart, rangeEnd, 5)
require.NoError(t, err)
result := [][]byte{
[]byte("b"), []byte("ba"), []byte("bb"), []byte("bba"), []byte("bbh"), []byte("bc"),
[]byte("bd"), []byte("be"), []byte("bf"), []byte("bg"), []byte("bh"), []byte("bi"), []byte("bj"),
[]byte("bk"), []byte("bl"), []byte("bm"), []byte("bn"), []byte("bo"), []byte("bp"), []byte("bq"),
[]byte("br"), []byte("bs"), []byte("bt"), []byte("bu"), []byte("bv"), []byte("bw"), []byte("bx"),
[]byte("by"), []byte("bz"), []byte("cca"),
}
checkRegionRanges(t, regions, result)

// the old regions will not be scattered. They are [..., bba), [bba, bbh), [..., cca)
require.Equal(t, len(result)-3, int(checkClient.scatterCounter.Load()))
}

>>>>>>> f15ba117bc2 (pkg/lightning : remove get_regions call in physical backend (#46202))
type batchSizeHook struct{}

func (h batchSizeHook) setup(t *testing.T) func() {
Expand Down Expand Up @@ -599,7 +692,7 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) {
})
}

err := local.SplitAndScatterRegionInBatches(ctx, ranges, nil, true, 1000, 4)
err := local.SplitAndScatterRegionInBatches(ctx, ranges, true, 4)
require.NoError(t, err)

rangeStart := codec.EncodeBytes([]byte{}, []byte("a"))
Expand Down Expand Up @@ -695,7 +788,7 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) {
start = e
}

err := local.SplitAndScatterRegionByRanges(ctx, ranges, nil, true, 1000)
err := local.SplitAndScatterRegionByRanges(ctx, ranges, true)
require.NoError(t, err)

startKey := codec.EncodeBytes([]byte{}, rangeKeys[0])
Expand Down