diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index d901b3c2711e6..52a9b71286087 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -69,6 +69,11 @@ func newTestClient( } } +// ScatterRegions scatters regions in a batch. +func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error { + return nil +} + func (c *testClient) GetAllRegions() map[uint64]*restore.RegionInfo { c.mu.RLock() defer c.mu.RUnlock() diff --git a/br/pkg/restore/split.go b/br/pkg/restore/split.go index c962a2109aac6..ada8662522c21 100644 --- a/br/pkg/restore/split.go +++ b/br/pkg/restore/split.go @@ -24,6 +24,8 @@ import ( "github.com/tikv/pd/pkg/codec" "go.uber.org/multierr" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // Constants for split retry machinery. @@ -112,6 +114,7 @@ SplitRegions: regionMap[region.Region.GetId()] = region } for regionID, keys := range splitKeyMap { + log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID)) var newRegions []*RegionInfo region := regionMap[regionID] log.Info("split regions", @@ -142,6 +145,7 @@ SplitRegions: logutil.Keys(keys), rtree.ZapRanges(ranges)) continue SplitRegions } + log.Info("scattered regions", zap.Int("count", len(newRegions))) if len(newRegions) != len(keys) { log.Warn("split key count and new region count mismatch", zap.Int("new region count", len(newRegions)), @@ -294,8 +298,6 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet))) var errs error for _, region := range newRegionSet { - // Wait for a while until the regions successfully split. - rs.waitForSplit(ctx, region.Region.Id) err := rs.client.ScatterRegion(ctx, region) if err == nil { // it is safe accroding to the Go language spec. @@ -328,15 +330,54 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe } +// isUnsupportedError checks whether we should fallback to ScatterRegion API when meeting the error. +func isUnsupportedError(err error) bool { + s, ok := status.FromError(errors.Cause(err)) + if !ok { + // Not a gRPC error. Something other went wrong. + return false + } + // In two conditions, we fallback to ScatterRegion: + // (1) If the RPC endpoint returns UNIMPLEMENTED. (This is just for making test cases not be so magic.) + // (2) If the Message is "region 0 not found": + // In fact, PD reuses the gRPC endpoint `ScatterRegion` for the batch version of scattering. + // When the request contains the field `regionIDs`, it would use the batch version, + // Otherwise, it uses the old version and scatter the region with `regionID` in the request. + // When facing 4.x, BR(which uses v5.x PD clients and call `ScatterRegions`!) would set `regionIDs` + // which would be ignored by protocol buffers, and leave the `regionID` be zero. + // Then the older version of PD would try to search the region with ID 0. + // (Then it consistently fails, and returns "region 0 not found".) + return s.Code() == codes.Unimplemented || + strings.Contains(s.Message(), "region 0 not found") +} + // ScatterRegions scatter the regions. func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) { - rs.ScatterRegionsWithBackoffer( - ctx, newRegions, - // backoff about 6s, or we give up scattering this region. - &exponentialBackoffer{ - attempt: 7, - baseBackoff: 100 * time.Millisecond, - }) + for _, region := range newRegions { + // Wait for a while until the regions successfully split. + rs.waitForSplit(ctx, region.Region.Id) + } + + err := utils.WithRetry(ctx, func() error { + err := rs.client.ScatterRegions(ctx, newRegions) + if isUnsupportedError(err) { + log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err)) + rs.ScatterRegionsWithBackoffer( + ctx, newRegions, + // backoff about 6s, or we give up scattering this region. + &exponentialBackoffer{ + attempt: 7, + baseBackoff: 100 * time.Millisecond, + }) + return nil + } + return err + // the retry is for the temporary network errors during sending request. + }, &exponentialBackoffer{attempt: 3, baseBackoff: 500 * time.Millisecond}) + + if err != nil { + log.Warn("failed to batch scatter region", logutil.ShortError(err)) + } } func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error { diff --git a/br/pkg/restore/split_client.go b/br/pkg/restore/split_client.go index 10a9913d8e683..ed24fc3984a52 100755 --- a/br/pkg/restore/split_client.go +++ b/br/pkg/restore/split_client.go @@ -60,6 +60,8 @@ type SplitClient interface { BatchSplitRegionsWithOrigin(ctx context.Context, regionInfo *RegionInfo, keys [][]byte) (*RegionInfo, []*RegionInfo, error) // ScatterRegion scatters a specified region. ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error + // ScatterRegions scatters regions in a batch. + ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error // GetOperator gets the status of operator of the specified region. GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) // ScanRegion gets a list of regions, starts from the region that contains key. @@ -114,6 +116,24 @@ func (c *pdClient) needScatter(ctx context.Context) bool { return c.needScatterVal } +// ScatterRegions scatters regions in a batch. +func (c *pdClient) ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error { + c.mu.Lock() + defer c.mu.Unlock() + regionsID := make([]uint64, 0, len(regionInfo)) + for _, v := range regionInfo { + regionsID = append(regionsID, v.Region.Id) + } + resp, err := c.client.ScatterRegions(ctx, regionsID) + if err != nil { + return err + } + if pbErr := resp.GetHeader().GetError(); pbErr.GetType() != pdpb.ErrorType_OK { + return errors.Annotatef(berrors.ErrPDInvalidResponse, "pd returns error during batch scattering: %s", pbErr) + } + return nil +} + func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) { c.mu.Lock() defer c.mu.Unlock() diff --git a/br/pkg/restore/split_test.go b/br/pkg/restore/split_test.go index 5e43d3378e579..fdfbba8df54d0 100644 --- a/br/pkg/restore/split_test.go +++ b/br/pkg/restore/split_test.go @@ -21,17 +21,19 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/placement" + "go.uber.org/multierr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) type TestClient struct { - mu sync.RWMutex - stores map[uint64]*metapb.Store - regions map[uint64]*restore.RegionInfo - regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions - nextRegionID uint64 - injectInScatter func(*restore.RegionInfo) error + mu sync.RWMutex + stores map[uint64]*metapb.Store + regions map[uint64]*restore.RegionInfo + regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions + nextRegionID uint64 + injectInScatter func(*restore.RegionInfo) error + supportBatchScatter bool scattered map[uint64]bool } @@ -55,6 +57,36 @@ func NewTestClient( } } +func (c *TestClient) InstallBatchScatterSupport() { + c.supportBatchScatter = true +} + +// ScatterRegions scatters regions in a batch. +func (c *TestClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error { + if !c.supportBatchScatter { + return status.Error(codes.Unimplemented, "Ah, yep") + } + regions := map[uint64]*restore.RegionInfo{} + for _, region := range regionInfo { + regions[region.Region.Id] = region + } + var err error + for i := 0; i < 3; i++ { + if len(regions) == 0 { + return nil + } + for id, region := range regions { + splitErr := c.ScatterRegion(ctx, region) + if splitErr == nil { + delete(regions, id) + } + err = multierr.Append(err, splitErr) + + } + } + return nil +} + func (c *TestClient) GetAllRegions() map[uint64]*restore.RegionInfo { c.mu.RLock() defer c.mu.RUnlock() @@ -282,7 +314,18 @@ func TestScatterFinishInTime(t *testing.T) { // [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj), // [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, ) func TestSplitAndScatter(t *testing.T) { - client := initTestClient() + t.Run("BatchScatter", func(t *testing.T) { + client := initTestClient() + client.InstallBatchScatterSupport() + runTestSplitAndScatterWith(t, client) + }) + t.Run("BackwardCompatibility", func(t *testing.T) { + client := initTestClient() + runTestSplitAndScatterWith(t, client) + }) +} + +func runTestSplitAndScatterWith(t *testing.T, client *TestClient) { ranges := initRanges() rewriteRules := initRewriteRules() regionSplitter := restore.NewRegionSplitter(client) @@ -320,7 +363,6 @@ func TestSplitAndScatter(t *testing.T) { t.Fatalf("region %d has not been scattered: %#v", key, regions[key]) } } - } // region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )