Skip to content

Commit

Permalink
restore: use new ScatterRegions API (pingcap#30899)
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Dec 22, 2021
1 parent 2fb260f commit efb5330
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 17 deletions.
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func newTestClient(
}
}

// ScatterRegions scatters regions in a batch.
func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error {
return nil
}

func (c *testClient) GetAllRegions() map[uint64]*restore.RegionInfo {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
59 changes: 50 additions & 9 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// Constants for split retry machinery.
Expand Down Expand Up @@ -112,6 +114,7 @@ SplitRegions:
regionMap[region.Region.GetId()] = region
}
for regionID, keys := range splitKeyMap {
log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID))
var newRegions []*RegionInfo
region := regionMap[regionID]
log.Info("split regions",
Expand Down Expand Up @@ -142,6 +145,7 @@ SplitRegions:
logutil.Keys(keys), rtree.ZapRanges(ranges))
continue SplitRegions
}
log.Info("scattered regions", zap.Int("count", len(newRegions)))
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
Expand Down Expand Up @@ -294,8 +298,6 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe
log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet)))
var errs error
for _, region := range newRegionSet {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
err := rs.client.ScatterRegion(ctx, region)
if err == nil {
// it is safe accroding to the Go language spec.
Expand Down Expand Up @@ -328,15 +330,54 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe

}

// isUnsupportedError checks whether we should fallback to ScatterRegion API when meeting the error.
func isUnsupportedError(err error) bool {
s, ok := status.FromError(errors.Cause(err))
if !ok {
// Not a gRPC error. Something other went wrong.
return false
}
// In two conditions, we fallback to ScatterRegion:
// (1) If the RPC endpoint returns UNIMPLEMENTED. (This is just for making test cases not be so magic.)
// (2) If the Message is "region 0 not found":
// In fact, PD reuses the gRPC endpoint `ScatterRegion` for the batch version of scattering.
// When the request contains the field `regionIDs`, it would use the batch version,
// Otherwise, it uses the old version and scatter the region with `regionID` in the request.
// When facing 4.x, BR(which uses v5.x PD clients and call `ScatterRegions`!) would set `regionIDs`
// which would be ignored by protocol buffers, and leave the `regionID` be zero.
// Then the older version of PD would try to search the region with ID 0.
// (Then it consistently fails, and returns "region 0 not found".)
return s.Code() == codes.Unimplemented ||
strings.Contains(s.Message(), "region 0 not found")
}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
for _, region := range newRegions {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
}

err := utils.WithRetry(ctx, func() error {
err := rs.client.ScatterRegions(ctx, newRegions)
if isUnsupportedError(err) {
log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err))
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
return nil
}
return err
// the retry is for the temporary network errors during sending request.
}, &exponentialBackoffer{attempt: 3, baseBackoff: 500 * time.Millisecond})

if err != nil {
log.Warn("failed to batch scatter region", logutil.ShortError(err))
}
}

func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type SplitClient interface {
BatchSplitRegionsWithOrigin(ctx context.Context, regionInfo *RegionInfo, keys [][]byte) (*RegionInfo, []*RegionInfo, error)
// ScatterRegion scatters a specified region.
ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error
// ScatterRegions scatters regions in a batch.
ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// ScanRegion gets a list of regions, starts from the region that contains key.
Expand Down Expand Up @@ -114,6 +116,24 @@ func (c *pdClient) needScatter(ctx context.Context) bool {
return c.needScatterVal
}

// ScatterRegions scatters regions in a batch.
func (c *pdClient) ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error {
c.mu.Lock()
defer c.mu.Unlock()
regionsID := make([]uint64, 0, len(regionInfo))
for _, v := range regionInfo {
regionsID = append(regionsID, v.Region.Id)
}
resp, err := c.client.ScatterRegions(ctx, regionsID)
if err != nil {
return err
}
if pbErr := resp.GetHeader().GetError(); pbErr.GetType() != pdpb.ErrorType_OK {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "pd returns error during batch scattering: %s", pbErr)
}
return nil
}

func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
58 changes: 50 additions & 8 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/placement"
"go.uber.org/multierr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type TestClient struct {
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
injectInScatter func(*restore.RegionInfo) error
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
injectInScatter func(*restore.RegionInfo) error
supportBatchScatter bool

scattered map[uint64]bool
}
Expand All @@ -55,6 +57,36 @@ func NewTestClient(
}
}

func (c *TestClient) InstallBatchScatterSupport() {
c.supportBatchScatter = true
}

// ScatterRegions scatters regions in a batch.
func (c *TestClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error {
if !c.supportBatchScatter {
return status.Error(codes.Unimplemented, "Ah, yep")
}
regions := map[uint64]*restore.RegionInfo{}
for _, region := range regionInfo {
regions[region.Region.Id] = region
}
var err error
for i := 0; i < 3; i++ {
if len(regions) == 0 {
return nil
}
for id, region := range regions {
splitErr := c.ScatterRegion(ctx, region)
if splitErr == nil {
delete(regions, id)
}
err = multierr.Append(err, splitErr)

}
}
return nil
}

func (c *TestClient) GetAllRegions() map[uint64]*restore.RegionInfo {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down Expand Up @@ -282,7 +314,18 @@ func TestScatterFinishInTime(t *testing.T) {
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func TestSplitAndScatter(t *testing.T) {
client := initTestClient()
t.Run("BatchScatter", func(t *testing.T) {
client := initTestClient()
client.InstallBatchScatterSupport()
runTestSplitAndScatterWith(t, client)
})
t.Run("BackwardCompatibility", func(t *testing.T) {
client := initTestClient()
runTestSplitAndScatterWith(t, client)
})
}

func runTestSplitAndScatterWith(t *testing.T, client *TestClient) {
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)
Expand Down Expand Up @@ -320,7 +363,6 @@ func TestSplitAndScatter(t *testing.T) {
t.Fatalf("region %d has not been scattered: %#v", key, regions[key])
}
}

}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
Expand Down

0 comments on commit efb5330

Please sign in to comment.