From 12aa6db13ca16f71ebc4a3817bf53f7349c1d6ae Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 21 Dec 2021 14:10:07 +0800 Subject: [PATCH 1/4] restore: use new ScatterRegions API Signed-off-by: Yu Juncen --- br/pkg/restore/split.go | 33 +++++++++++++------ br/pkg/restore/split_client.go | 20 ++++++++++++ br/pkg/restore/split_test.go | 58 +++++++++++++++++++++++++++++----- 3 files changed, 94 insertions(+), 17 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index c962a2109aac6..5792e39cbd32f 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -24,6 +24,8 @@ import ( "github.com/tikv/pd/pkg/codec" "go.uber.org/multierr" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // Constants for split retry machinery. @@ -112,6 +114,7 @@ SplitRegions: regionMap[region.Region.GetId()] = region } for regionID, keys := range splitKeyMap { + log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID)) var newRegions []*RegionInfo region := regionMap[regionID] log.Info("split regions", @@ -142,6 +145,7 @@ SplitRegions: logutil.Keys(keys), rtree.ZapRanges(ranges)) continue SplitRegions } + log.Info("scattered regions", zap.Int("count", len(newRegions))) if len(newRegions) != len(keys) { log.Warn("split key count and new region count mismatch", zap.Int("new region count", len(newRegions)), @@ -294,8 +298,6 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet))) var errs error for _, region := range newRegionSet { - // Wait for a while until the regions successfully split. - rs.waitForSplit(ctx, region.Region.Id) err := rs.client.ScatterRegion(ctx, region) if err == nil { // it is safe accroding to the Go language spec. @@ -330,13 +332,26 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe // ScatterRegions scatter the regions. func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) { - rs.ScatterRegionsWithBackoffer( - ctx, newRegions, - // backoff about 6s, or we give up scattering this region. - &exponentialBackoffer{ - attempt: 7, - baseBackoff: 100 * time.Millisecond, - }) + for _, region := range newRegions { + // Wait for a while until the regions successfully split. + rs.waitForSplit(ctx, region.Region.Id) + } + + err := rs.client.ScatterRegions(ctx, newRegions) + if status.Code(err) == codes.Unimplemented { + log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) + rs.ScatterRegionsWithBackoffer( + ctx, newRegions, + // backoff about 6s, or we give up scattering this region. + &exponentialBackoffer{ + attempt: 7, + baseBackoff: 100 * time.Millisecond, + }) + return + } + if err != nil { + log.Warn("failed to batch scatter region", logutil.ShortError(err)) + } } func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { diff --git a/br/pkg/restore/split_client.go b/br/pkg/restore/split_client.go index 10a9913d8e683..ed24fc3984a52 100755 --- a/br/pkg/restore/split_client.go +++ b/br/pkg/restore/split_client.go @@ -60,6 +60,8 @@ type SplitClient interface { BatchSplitRegionsWithOrigin(ctx context.Context, regionInfo *RegionInfo, keys [][]byte) (*RegionInfo, []*RegionInfo, error) // ScatterRegion scatters a specified region. ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error + // ScatterRegions scatters regions in a batch. + ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error // GetOperator gets the status of operator of the specified region. GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) // ScanRegion gets a list of regions, starts from the region that contains key. @@ -114,6 +116,24 @@ func (c *pdClient) needScatter(ctx context.Context) bool { return c.needScatterVal } +// ScatterRegions scatters regions in a batch. +func (c *pdClient) ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error { + c.mu.Lock() + defer c.mu.Unlock() + regionsID := make([]uint64, 0, len(regionInfo)) + for _, v := range regionInfo { + regionsID = append(regionsID, v.Region.Id) + } + resp, err := c.client.ScatterRegions(ctx, regionsID) + if err != nil { + return err + } + if pbErr := resp.GetHeader().GetError(); pbErr.GetType() != pdpb.ErrorType_OK { + return errors.Annotatef(berrors.ErrPDInvalidResponse, "pd returns error during batch scattering: %s", pbErr) + } + return nil +} + func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 5e43d3378e579..fdfbba8df54d0 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -21,17 +21,19 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/placement" + "go.uber.org/multierr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type TestClient struct { - mu sync.RWMutex - stores map[uint64]*metapb.Store - regions map[uint64]*restore.RegionInfo - regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions - nextRegionID uint64 - injectInScatter func(*restore.RegionInfo) error + mu sync.RWMutex + stores map[uint64]*metapb.Store + regions map[uint64]*restore.RegionInfo + regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions + nextRegionID uint64 + injectInScatter func(*restore.RegionInfo) error + supportBatchScatter bool scattered map[uint64]bool } @@ -55,6 +57,36 @@ func NewTestClient( } } +func (c *TestClient) InstallBatchScatterSupport() { + c.supportBatchScatter = true +} + +// ScatterRegions scatters regions in a batch. +func (c *TestClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error { + if !c.supportBatchScatter { + return status.Error(codes.Unimplemented, "Ah, yep") + } + regions := map[uint64]*restore.RegionInfo{} + for _, region := range regionInfo { + regions[region.Region.Id] = region + } + var err error + for i := 0; i < 3; i++ { + if len(regions) == 0 { + return nil + } + for id, region := range regions { + splitErr := c.ScatterRegion(ctx, region) + if splitErr == nil { + delete(regions, id) + } + err = multierr.Append(err, splitErr) + + } + } + return nil +} + func (c *TestClient) GetAllRegions() map[uint64]*restore.RegionInfo { c.mu.RLock() defer c.mu.RUnlock() @@ -282,7 +314,18 @@ func TestScatterFinishInTime(t *testing.T) { // [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), // [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, ) func TestSplitAndScatter(t *testing.T) { - client := initTestClient() + t.Run("BatchScatter", func(t *testing.T) { + client := initTestClient() + client.InstallBatchScatterSupport() + runTestSplitAndScatterWith(t, client) + }) + t.Run("BackwardCompatibility", func(t *testing.T) { + client := initTestClient() + runTestSplitAndScatterWith(t, client) + }) +} + +func runTestSplitAndScatterWith(t *testing.T, client *TestClient) { ranges := initRanges() rewriteRules := initRewriteRules() regionSplitter := restore.NewRegionSplitter(client) @@ -320,7 +363,6 @@ func TestSplitAndScatter(t *testing.T) { t.Fatalf("region %d has not been scattered: %#v", key, regions[key]) } } - } // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) From bb9116b1ecb68aff5925a109d887fed0485935c3 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Tue, 21 Dec 2021 16:10:12 +0800 Subject: [PATCH 2/4] restore: fallback to old version properly Signed-off-by: Yu Juncen --- .../backend/local/localhelper_test.go | 5 ++++ br/pkg/restore/split.go | 23 ++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index d901b3c2711e6..52a9b71286087 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -69,6 +69,11 @@ func newTestClient( } } +// ScatterRegions scatters regions in a batch. +func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error { + return nil +} + func (c *testClient) GetAllRegions() map[uint64]*restore.RegionInfo { c.mu.RLock() defer c.mu.RUnlock() diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 5792e39cbd32f..478078001d488 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -330,6 +330,27 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe } +// isUnsupportedError checks whether we should fallback to ScatterRegion API when meeting the error. +func isUnsupportedError(err error) bool { + s, ok := status.FromError(errors.Cause(err)) + if !ok { + // Not a gRPC error. Something other went wrong. + return false + } + // In two conditions, we fallback to ScatterRegion: + // (1) If the RPC endpoint returns UNIMPLEMENTED. (This is just for making test cases not be so magic.) + // (2) If the Message is "region 0 not found": + // In fact, PD reuses the gRPC endpoint `ScatterRegion` for the batch version of scattering. + // When the request contains the field `regionIDs`, it would use the batch version, + // Otherwise, it uses the old version and scatter the region with `regionID` in the request. + // When facing 4.x, BR(which uses v5.x PD clients and call `ScatterRegions`!) would set `regionIDs` + // which would be ignored by protocol buffers, and leave the `regionID` be zero. + // Then the older version of PD would try to search the region with ID 0. + // (Then it consistently fails, and returns "region 0 not found".) + return s.Code() == codes.Unimplemented || + strings.Contains(s.Message(), "region 0 not found") +} + // ScatterRegions scatter the regions. func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) { for _, region := range newRegions { @@ -338,7 +359,7 @@ func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*Regi } err := rs.client.ScatterRegions(ctx, newRegions) - if status.Code(err) == codes.Unimplemented { + if isUnsupportedError(err) { log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) rs.ScatterRegionsWithBackoffer( ctx, newRegions, From c2c22f081349690ca634f94128a6ccca1eef3a9d Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Wed, 22 Dec 2021 10:30:07 +0800 Subject: [PATCH 3/4] restore: added a comment about retry Signed-off-by: Yu Juncen --- br/pkg/restore/split.go | 1 + 1 file changed, 1 insertion(+) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 478078001d488..35ec7cc3ce866 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -358,6 +358,7 @@ func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*Regi rs.waitForSplit(ctx, region.Region.Id) } + // No retry needed, the internal implementation of `ScatterRegions` involves retry already. err := rs.client.ScatterRegions(ctx, newRegions) if isUnsupportedError(err) { log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) From 582a9fd148094b5ed19b725c73e7fff385e888d5 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Wed, 22 Dec 2021 13:14:35 +0800 Subject: [PATCH 4/4] restore: add retry... Signed-off-by: Yu Juncen --- br/pkg/restore/split.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index 35ec7cc3ce866..ada8662522c21 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -358,19 +358,23 @@ func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*Regi rs.waitForSplit(ctx, region.Region.Id) } - // No retry needed, the internal implementation of `ScatterRegions` involves retry already. - err := rs.client.ScatterRegions(ctx, newRegions) - if isUnsupportedError(err) { - log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) - rs.ScatterRegionsWithBackoffer( - ctx, newRegions, - // backoff about 6s, or we give up scattering this region. - &exponentialBackoffer{ - attempt: 7, - baseBackoff: 100 * time.Millisecond, - }) - return - } + err := utils.WithRetry(ctx, func() error { + err := rs.client.ScatterRegions(ctx, newRegions) + if isUnsupportedError(err) { + log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) + rs.ScatterRegionsWithBackoffer( + ctx, newRegions, + // backoff about 6s, or we give up scattering this region. + &exponentialBackoffer{ + attempt: 7, + baseBackoff: 100 * time.Millisecond, + }) + return nil + } + return err + // the retry is for the temporary network errors during sending request. + }, &exponentialBackoffer{attempt: 3, baseBackoff: 500 * time.Millisecond}) + if err != nil { log.Warn("failed to batch scatter region", logutil.ShortError(err)) }