Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore: use new ScatterRegions API #30899

Merged
merged 6 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func newTestClient(
}
}

// ScatterRegions scatters regions in a batch.
func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error {
return nil
}

func (c *testClient) GetAllRegions() map[uint64]*restore.RegionInfo {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
59 changes: 50 additions & 9 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/tikv/pd/pkg/codec"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// Constants for split retry machinery.
Expand Down Expand Up @@ -112,6 +114,7 @@ SplitRegions:
regionMap[region.Region.GetId()] = region
}
for regionID, keys := range splitKeyMap {
log.Info("get split keys for region", zap.Int("len", len(keys)), zap.Uint64("region", regionID))
var newRegions []*RegionInfo
region := regionMap[regionID]
log.Info("split regions",
Expand Down Expand Up @@ -142,6 +145,7 @@ SplitRegions:
logutil.Keys(keys), rtree.ZapRanges(ranges))
continue SplitRegions
}
log.Info("scattered regions", zap.Int("count", len(newRegions)))
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
Expand Down Expand Up @@ -294,8 +298,6 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe
log.Info("trying to scatter regions...", zap.Int("remain", len(newRegionSet)))
var errs error
for _, region := range newRegionSet {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
err := rs.client.ScatterRegion(ctx, region)
if err == nil {
// it is safe accroding to the Go language spec.
Expand Down Expand Up @@ -328,15 +330,54 @@ func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRe

}

// isUnsupportedError checks whether we should fallback to ScatterRegion API when meeting the error.
func isUnsupportedError(err error) bool {
s, ok := status.FromError(errors.Cause(err))
if !ok {
// Not a gRPC error. Something other went wrong.
return false
}
// In two conditions, we fallback to ScatterRegion:
// (1) If the RPC endpoint returns UNIMPLEMENTED. (This is just for making test cases not be so magic.)
// (2) If the Message is "region 0 not found":
// In fact, PD reuses the gRPC endpoint `ScatterRegion` for the batch version of scattering.
// When the request contains the field `regionIDs`, it would use the batch version,
// Otherwise, it uses the old version and scatter the region with `regionID` in the request.
// When facing 4.x, BR(which uses v5.x PD clients and call `ScatterRegions`!) would set `regionIDs`
// which would be ignored by protocol buffers, and leave the `regionID` be zero.
// Then the older version of PD would try to search the region with ID 0.
// (Then it consistently fails, and returns "region 0 not found".)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(heh what a convoluted way to maintain backward compatibility)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, we cannot restore to 4.x clusters via 5.x BR unless the user passing --check-requirement=false.

However, by the definition of compatibility, we must be compatible with all pervious versions and up to 3 minor versions in the future...🤔

return s.Code() == codes.Unimplemented ||
strings.Contains(s.Message(), "region 0 not found")
}

// ScatterRegions scatter the regions.
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*RegionInfo) {
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
for _, region := range newRegions {
// Wait for a while until the regions successfully split.
rs.waitForSplit(ctx, region.Region.Id)
}

err := utils.WithRetry(ctx, func() error {
err := rs.client.ScatterRegions(ctx, newRegions)
if isUnsupportedError(err) {
log.Warn("batch scatter isn't supported, rollback to old method", logutil.ShortError(err))
rs.ScatterRegionsWithBackoffer(
ctx, newRegions,
// backoff about 6s, or we give up scattering this region.
&exponentialBackoffer{
attempt: 7,
baseBackoff: 100 * time.Millisecond,
})
return nil
}
return err
// the retry is for the temporary network errors during sending request.
}, &exponentialBackoffer{attempt: 3, baseBackoff: 500 * time.Millisecond})
Comment on lines +369 to +376
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this cause a retry of 3×7 = 21 times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At most 2 + 7 = 9 times, because the isUnsupportedError(err) branch always returns nil.


if err != nil {
log.Warn("failed to batch scatter region", logutil.ShortError(err))
}
}

func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type SplitClient interface {
BatchSplitRegionsWithOrigin(ctx context.Context, regionInfo *RegionInfo, keys [][]byte) (*RegionInfo, []*RegionInfo, error)
// ScatterRegion scatters a specified region.
ScatterRegion(ctx context.Context, regionInfo *RegionInfo) error
// ScatterRegions scatters regions in a batch.
ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// ScanRegion gets a list of regions, starts from the region that contains key.
Expand Down Expand Up @@ -114,6 +116,24 @@ func (c *pdClient) needScatter(ctx context.Context) bool {
return c.needScatterVal
}

// ScatterRegions scatters regions in a batch.
func (c *pdClient) ScatterRegions(ctx context.Context, regionInfo []*RegionInfo) error {
c.mu.Lock()
Copy link
Member

@joccau joccau Dec 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if protect c.client.ScatterRegions(ctx, regionsID), move c.mu.Lock() to before line:127
if protect resp.GetHeader(), move lock() to before line:131
if no use, remove it ?
Suggest~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it can be removed, seems it was guard for c.storeCache. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thankfully, this extra mutex won't make things wrong, because the c.storeCache is accessed at the vary start of restoring.

defer c.mu.Unlock()
regionsID := make([]uint64, 0, len(regionInfo))
for _, v := range regionInfo {
regionsID = append(regionsID, v.Region.Id)
}
resp, err := c.client.ScatterRegions(ctx, regionsID)
if err != nil {
return err
}
if pbErr := resp.GetHeader().GetError(); pbErr.GetType() != pdpb.ErrorType_OK {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "pd returns error during batch scattering: %s", pbErr)
}
return nil
}

func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
58 changes: 50 additions & 8 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/placement"
"go.uber.org/multierr"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type TestClient struct {
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
injectInScatter func(*restore.RegionInfo) error
mu sync.RWMutex
stores map[uint64]*metapb.Store
regions map[uint64]*restore.RegionInfo
regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions
nextRegionID uint64
injectInScatter func(*restore.RegionInfo) error
supportBatchScatter bool

scattered map[uint64]bool
}
Expand All @@ -55,6 +57,36 @@ func NewTestClient(
}
}

func (c *TestClient) InstallBatchScatterSupport() {
c.supportBatchScatter = true
}

// ScatterRegions scatters regions in a batch.
func (c *TestClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error {
if !c.supportBatchScatter {
return status.Error(codes.Unimplemented, "Ah, yep")
}
regions := map[uint64]*restore.RegionInfo{}
for _, region := range regionInfo {
regions[region.Region.Id] = region
}
var err error
for i := 0; i < 3; i++ {
if len(regions) == 0 {
return nil
}
for id, region := range regions {
splitErr := c.ScatterRegion(ctx, region)
if splitErr == nil {
delete(regions, id)
}
err = multierr.Append(err, splitErr)

}
}
return nil
}

func (c *TestClient) GetAllRegions() map[uint64]*restore.RegionInfo {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down Expand Up @@ -282,7 +314,18 @@ func TestScatterFinishInTime(t *testing.T) {
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func TestSplitAndScatter(t *testing.T) {
client := initTestClient()
t.Run("BatchScatter", func(t *testing.T) {
client := initTestClient()
client.InstallBatchScatterSupport()
runTestSplitAndScatterWith(t, client)
})
t.Run("BackwardCompatibility", func(t *testing.T) {
client := initTestClient()
runTestSplitAndScatterWith(t, client)
})
}

func runTestSplitAndScatterWith(t *testing.T, client *TestClient) {
ranges := initRanges()
rewriteRules := initRewriteRules()
regionSplitter := restore.NewRegionSplitter(client)
Expand Down Expand Up @@ -320,7 +363,6 @@ func TestSplitAndScatter(t *testing.T) {
t.Fatalf("region %d has not been scattered: %#v", key, regions[key])
}
}

}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
Expand Down