Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup: fix retry of fine-grained backup (#43252) #43487

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 15 additions & 25 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ type Checksum struct {
// ProgressUnit represents the unit of progress.
type ProgressUnit string

// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
// backupFineGrainedMaxBackoff is 1 hour.
// given it begins the fine-grained backup, there must be some problems in the cluster.
// We need to be more patient.
backupFineGrainedMaxBackoff = 3600000
backupRetryTimes = 5
// RangeUnit represents the progress updated counter when a range finished.
RangeUnit ProgressUnit = "range"
Expand Down Expand Up @@ -989,13 +991,13 @@ func (bc *Client) findTargetPeer(ctx context.Context, key []byte, isRawKv bool,
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
if err != nil || region == nil {
log.Error("find region failed", zap.Error(err), zap.Reflect("region", region))
logutil.CL(ctx).Error("find region failed", zap.Error(err), zap.Reflect("region", region))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
if len(targetStoreIds) == 0 {
if region.Leader != nil {
log.Info("find leader",
logutil.CL(ctx).Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
}
Expand All @@ -1008,17 +1010,17 @@ func (bc *Client) findTargetPeer(ctx context.Context, key []byte, isRawKv bool,
}
if len(candidates) > 0 {
peer := candidates[rand.Intn(len(candidates))]
log.Info("find target peer for backup",
logutil.CL(ctx).Info("find target peer for backup",
zap.Reflect("Peer", peer), logutil.Key("key", key))
return peer, nil
}
}

log.Warn("fail to find a target peer", logutil.Key("key", key))
logutil.CL(ctx).Warn("fail to find a target peer", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(1000*i))
continue
}
log.Error("can not find a valid target peer", logutil.Key("key", key))
logutil.CL(ctx).Error("can not find a valid target peer", logutil.Key("key", key))
if len(targetStoreIds) == 0 {
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find a valid leader for key %s", key)
}
Expand Down Expand Up @@ -1053,7 +1055,7 @@ func (bc *Client) fineGrainedBackup(
}
})

bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff)
bo := utils.AdaptTiKVBackoffer(ctx, backupFineGrainedMaxBackoff, berrors.ErrUnknown)
for {
// Step1, check whether there is any incomplete range
incomplete := pr.Res.GetIncompleteRange(req.StartKey, req.EndKey)
Expand All @@ -1066,14 +1068,10 @@ func (bc *Client) fineGrainedBackup(
errCh := make(chan error, 4)
retry := make(chan rtree.Range, 4)

max := &struct {
ms int
mu sync.Mutex
}{}
wg := new(sync.WaitGroup)
for i := 0; i < 4; i++ {
wg.Add(1)
fork, _ := bo.Fork()
fork, _ := bo.Inner().Fork()
go func(boFork *tikv.Backoffer) {
defer wg.Done()
for rg := range retry {
Expand All @@ -1085,11 +1083,7 @@ func (bc *Client) fineGrainedBackup(
return
}
if backoffMs != 0 {
max.mu.Lock()
if max.ms < backoffMs {
max.ms = backoffMs
}
max.mu.Unlock()
bo.RequestBackOff(backoffMs)
}
}
}(fork)
Expand Down Expand Up @@ -1146,15 +1140,11 @@ func (bc *Client) fineGrainedBackup(
}

// Step3. Backoff if needed, then repeat.
max.mu.Lock()
ms := max.ms
max.mu.Unlock()
if ms != 0 {
if ms := bo.NextSleepInMS(); ms != 0 {
log.Info("handle fine grained", zap.Int("backoffMs", ms))
// TODO: fill a meaningful error.
err := bo.BackoffWithMaxSleepTxnLockFast(ms, berrors.ErrUnknown)
err := bo.BackOff()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "at fine-grained backup, remained ranges = %d", pr.Res.Len())
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_library(
"//util",
"//util/sqlexec",
"@com_github_cheggaaa_pb_v3//:pb",
"@com_github_cznic_mathutil//:mathutil",
"@com_github_docker_go_units//:go-units",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
Expand All @@ -48,6 +49,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
Expand Down Expand Up @@ -81,13 +83,14 @@ go_test(
"misc_test.go",
"progress_test.go",
"register_test.go",
"retry_test.go",
"safe_point_test.go",
"schema_test.go",
"sensitive_test.go",
],
embed = [":utils"],
flaky = True,
shard_count = 28,
shard_count = 29,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand All @@ -109,6 +112,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
Expand Down
76 changes: 76 additions & 0 deletions br/pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ package utils
import (
"context"
"strings"
"sync"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/terror"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/multierr"
)

Expand Down Expand Up @@ -84,3 +87,76 @@ func FallBack2CreateTable(err error) bool {
}
return false
}

// RetryWithBackoffer is a simple context for a "mixed" retry.
// Some of TiDB APIs, say, `ResolveLock` requires a `tikv.Backoffer` as argument.
// But the `tikv.Backoffer` isn't pretty customizable, it has some sorts of predefined configuration but
// we cannot create new one. So we are going to mix up the flavour of `tikv.Backoffer` and our homemade
// back off strategy. That is what the `RetryWithBackoffer` did.
type RetryWithBackoffer struct {
bo *tikv.Backoffer

totalBackoff int
maxBackoff int
baseErr error

mu sync.Mutex
nextBackoff int
}

// AdaptTiKVBackoffer creates an "ad-hoc" backoffer, which wraps a backoffer and provides some new functions:
// When backing off, we can manually provide it a specified sleep duration instead of directly provide a retry.Config
// Which is sealed in the "client-go/internal".
func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *RetryWithBackoffer {
return &RetryWithBackoffer{
bo: tikv.NewBackoffer(ctx, maxSleepMs),
maxBackoff: maxSleepMs,
baseErr: baseErr,
}
}

// NextSleepInMS returns the time `BackOff` will sleep in ms of the state.
func (r *RetryWithBackoffer) NextSleepInMS() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.nextBackoff
}

// TotalSleepInMS returns the total sleeped time in ms.
func (r *RetryWithBackoffer) TotalSleepInMS() int {
return r.totalBackoff + r.bo.GetTotalSleep()
}

// MaxSleepInMS returns the max sleep time for the retry context in ms.
func (r *RetryWithBackoffer) MaxSleepInMS() int {
return r.maxBackoff
}

// BackOff executes the back off: sleep for a precalculated backoff time.
// See `RequestBackOff` for more details.
func (r *RetryWithBackoffer) BackOff() error {
r.mu.Lock()
nextBo := r.nextBackoff
r.nextBackoff = 0
r.mu.Unlock()

if r.TotalSleepInMS() > r.maxBackoff {
return errors.Annotatef(r.baseErr, "backoff exceeds the max backoff time %s", time.Duration(r.maxBackoff)*time.Millisecond)
}
time.Sleep(time.Duration(nextBo) * time.Millisecond)
r.totalBackoff += nextBo
return nil
}

// RequestBackOff register the intent of backing off at least n milliseconds.
// That intent will be fulfilled when calling `BackOff`.
func (r *RetryWithBackoffer) RequestBackOff(ms int) {
r.mu.Lock()
r.nextBackoff = mathutil.Max(r.nextBackoff, ms)
r.mu.Unlock()
}

// Inner returns the reference to the inner `backoffer`.
func (r *RetryWithBackoffer) Inner() *tikv.Backoffer {
return r.bo
}
49 changes: 49 additions & 0 deletions br/pkg/utils/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package utils_test

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)

func TestRetryAdapter(t *testing.T) {
req := require.New(t)

begin := time.Now()
bo := utils.AdaptTiKVBackoffer(context.Background(), 200, errors.New("everything is alright"))
// This should sleep for 100ms.
bo.Inner().Backoff(tikv.BoTiKVRPC(), errors.New("TiKV is in a deep dream"))
sleeped := bo.TotalSleepInMS()
req.GreaterOrEqual(sleeped, 50)
req.LessOrEqual(sleeped, 150)
requestedBackOff := [...]int{10, 20, 5, 0, 42, 48}
wg := new(sync.WaitGroup)
wg.Add(len(requestedBackOff))
for _, bms := range requestedBackOff {
bms := bms
go func() {
bo.RequestBackOff(bms)
wg.Done()
}()
}
wg.Wait()
req.Equal(bo.NextSleepInMS(), 48)
req.NoError(bo.BackOff())
req.Equal(bo.TotalSleepInMS(), sleeped+48)

bo.RequestBackOff(150)
req.NoError(bo.BackOff())

bo.RequestBackOff(150)
req.ErrorContains(bo.BackOff(), "everything is alright", "total = %d / %d", bo.TotalSleepInMS(), bo.MaxSleepInMS())

req.Greater(time.Since(begin), 200*time.Millisecond)
}