Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup: add retry when tikv is down (#508)
Browse files Browse the repository at this point in the history
* backup: add retry when tikv is down

* fix build

* address comment && refine some log code

* refine reset code

* add cancel error to retry

* remove cancel error retry

* add integration test

* add failpoint to test

* fix build

* fix failpoint test

* address comment
  • Loading branch information
3pointer authored Sep 22, 2020
1 parent 5af97a1 commit cde9f30
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 30 deletions.
96 changes: 72 additions & 24 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand All @@ -31,6 +32,8 @@ import (
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
Expand All @@ -43,6 +46,7 @@ import (
// ClientMgr manages connections needed by backup.
type ClientMgr interface {
GetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error)
ResetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error)
GetPDClient() pd.Client
GetTiKV() tikv.Storage
GetLockResolver() *tikv.LockResolver
Expand All @@ -59,6 +63,7 @@ type Checksum struct {
// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
backupRetryTimes = 5
)

// Client is a client instructs TiKV how to do a backup.
Expand Down Expand Up @@ -776,6 +781,10 @@ func (bc *Client) handleFineGrained(
respCh <- response
}
return nil
},
func() (kvproto.BackupClient, error) {
log.Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID))
return bc.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
return 0, err
Expand All @@ -791,34 +800,68 @@ func SendBackup(
client kvproto.BackupClient,
req kvproto.BackupRequest,
respFn func(*kvproto.BackupResponse) error,
resetFn func() (kvproto.BackupClient, error),
) error {
log.Info("try backup",
zap.Stringer("StartKey", utils.WrapKey(req.StartKey)),
zap.Stringer("EndKey", utils.WrapKey(req.EndKey)),
zap.Uint64("storeID", storeID),
)
bcli, err := client.Backup(ctx, &req)
if err != nil {
log.Error("fail to backup", zap.Uint64("StoreID", storeID))
return err
}
for {
resp, err := bcli.Recv()
var errReset error
backupLoop:
for retry := 0; retry < backupRetryTimes; retry++ {
log.Info("try backup",
zap.Stringer("StartKey", utils.WrapKey(req.StartKey)),
zap.Stringer("EndKey", utils.WrapKey(req.EndKey)),
zap.Uint64("storeID", storeID),
zap.Int("retry time", retry),
)
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
log.Debug("failpoint reset-retryable-error injected.")
err = status.Errorf(codes.Unavailable, "Unavailable error")
}
})
if err != nil {
if err == io.EOF {
log.Info("backup streaming finish",
zap.Uint64("StoreID", storeID))
break
if isRetryableError(err) {
time.Sleep(3 * time.Second)
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
continue
}
return errors.Annotatef(err, "Store: %d close the connection", storeID)
log.Error("fail to backup", zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
return errors.Trace(err)
}
// TODO: handle errors in the resp.
log.Info("range backuped",
zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())),
zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey())))
err = respFn(resp)
if err != nil {
return err

for {
resp, err := bcli.Recv()
if err != nil {
if err == io.EOF {
log.Info("backup streaming finish",
zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
break backupLoop
}
if isRetryableError(err) {
time.Sleep(3 * time.Second)
// current tikv is unavailable
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset recv connection on store:%d "+
"please check the tikv status", storeID)
}
break
}
return errors.Annotatef(err, "failed to connect to store: %d with retry times:%d", storeID, retry)
}
// TODO: handle errors in the resp.
log.Info("range backuped",
zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())),
zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey())))
err = respFn(resp)
if err != nil {
return err
}
}
}
return nil
Expand Down Expand Up @@ -922,3 +965,8 @@ func CollectChecksums(backupMeta *kvproto.BackupMeta) ([]Checksum, error) {

return checksums, nil
}

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {
return status.Code(err) == codes.Unavailable || status.Code(err) == codes.Canceled
}
4 changes: 4 additions & 0 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (push *pushDown) pushBackup(
// Forward all responses (including error).
push.respCh <- resp
return nil
},
func() (backup.BackupClient, error) {
log.Warn("reset the connection in push", zap.Uint64("storeID", storeID))
return push.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
push.errCh <- err
Expand Down
44 changes: 41 additions & 3 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ import (
)

const (
dialTimeout = 5 * time.Second
dialTimeout = 30 * time.Second
clusterVersionPrefix = "pd/api/v1/config/cluster-version"
regionCountPrefix = "pd/api/v1/stats/region"
schdulerPrefix = "pd/api/v1/schedulers"
maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response
scheduleConfigPrefix = "pd/api/v1/config/schedule"

resetRetryTimes = 3
)

// Mgr manages connections to a TiDB cluster.
Expand Down Expand Up @@ -334,6 +336,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
ctx,
addr,
opt,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Expand All @@ -344,8 +347,6 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
if err != nil {
return nil, errors.WithStack(err)
}
// Cache the conn.
mgr.grpcClis.clis[storeID] = conn
return conn, nil
}

Expand All @@ -363,6 +364,43 @@ func (mgr *Mgr) GetBackupClient(ctx context.Context, storeID uint64) (backup.Bac
if err != nil {
return nil, errors.Trace(err)
}
// Cache the conn.
mgr.grpcClis.clis[storeID] = conn
return backup.NewBackupClient(conn), nil
}

// ResetBackupClient reset the connection for backup client.
func (mgr *Mgr) ResetBackupClient(ctx context.Context, storeID uint64) (backup.BackupClient, error) {
mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()

if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
log.Info("Reset backup client", zap.Uint64("storeID", storeID))
err := conn.Close()
if err != nil {
log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID))
}
delete(mgr.grpcClis.clis, storeID)
}
var (
conn *grpc.ClientConn
err error
)
for retry := 0; retry < resetRetryTimes; retry++ {
conn, err = mgr.getGrpcConnLocked(ctx, storeID)
if err != nil {
log.Warn("failed to reset grpc connection, retry it",
zap.Int("retry time", retry), zap.Error(err))
time.Sleep(time.Duration(retry+3) * time.Second)
continue
}
mgr.grpcClis.clis[storeID] = conn
break
}
if err != nil {
return nil, errors.Trace(err)
}
return backup.NewBackupClient(conn), nil
}

Expand Down
12 changes: 9 additions & 3 deletions tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,27 @@ for i in $(seq $DB_COUNT); do
row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done

# backup full and kill tikv to test reset connection
echo "backup with limit start..."
export GO_FAILPOINTS="github.com/pingcap/br/pkg/backup/reset-retryable-error=1*return(true)"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-limit" --concurrency 4
export GO_FAILPOINTS=""

# backup full
echo "backup with lz4 start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --ratelimit 5 --concurrency 4 --compression lz4
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')

echo "backup with zstd start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --ratelimit 5 --concurrency 4 --compression zstd --compression-level 6
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --concurrency 4 --compression zstd --compression-level 6
size_zstd=$(du -d 0 $TEST_DIR/$DB-zstd | awk '{print $1}')

if [ "$size_lz4" -le "$size_zstd" ]; then
echo "full backup lz4 size $size_lz4 is small than backup with zstd $size_zstd"
exit -1
fi

for ct in lz4 zstd; do
for ct in limit lz4 zstd; do
for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done
Expand Down

0 comments on commit cde9f30

Please sign in to comment.