diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 566bc4c69..5a83c395d 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -13,6 +13,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/google/btree" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" kvproto "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -31,6 +32,8 @@ import ( pd "github.com/tikv/pd/client" "go.uber.org/zap" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/pingcap/br/pkg/conn" "github.com/pingcap/br/pkg/glue" @@ -43,6 +46,7 @@ import ( // ClientMgr manages connections needed by backup. type ClientMgr interface { GetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error) + ResetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error) GetPDClient() pd.Client GetTiKV() tikv.Storage GetLockResolver() *tikv.LockResolver @@ -59,6 +63,7 @@ type Checksum struct { // Maximum total sleep time(in ms) for kv/cop commands. const ( backupFineGrainedMaxBackoff = 80000 + backupRetryTimes = 5 ) // Client is a client instructs TiKV how to do a backup. @@ -776,6 +781,10 @@ func (bc *Client) handleFineGrained( respCh <- response } return nil + }, + func() (kvproto.BackupClient, error) { + log.Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID)) + return bc.mgr.ResetBackupClient(ctx, storeID) }) if err != nil { return 0, err @@ -791,34 +800,68 @@ func SendBackup( client kvproto.BackupClient, req kvproto.BackupRequest, respFn func(*kvproto.BackupResponse) error, + resetFn func() (kvproto.BackupClient, error), ) error { - log.Info("try backup", - zap.Stringer("StartKey", utils.WrapKey(req.StartKey)), - zap.Stringer("EndKey", utils.WrapKey(req.EndKey)), - zap.Uint64("storeID", storeID), - ) - bcli, err := client.Backup(ctx, &req) - if err != nil { - log.Error("fail to backup", zap.Uint64("StoreID", storeID)) - return err - } - for { - resp, err := bcli.Recv() + var errReset error +backupLoop: + for retry := 0; retry < backupRetryTimes; retry++ { + log.Info("try backup", + zap.Stringer("StartKey", utils.WrapKey(req.StartKey)), + zap.Stringer("EndKey", utils.WrapKey(req.EndKey)), + zap.Uint64("storeID", storeID), + zap.Int("retry time", retry), + ) + bcli, err := client.Backup(ctx, &req) + failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { + if val.(bool) { + log.Debug("failpoint reset-retryable-error injected.") + err = status.Errorf(codes.Unavailable, "Unavailable error") + } + }) if err != nil { - if err == io.EOF { - log.Info("backup streaming finish", - zap.Uint64("StoreID", storeID)) - break + if isRetryableError(err) { + time.Sleep(3 * time.Second) + client, errReset = resetFn() + if errReset != nil { + return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+ + "please check the tikv status", storeID) + } + continue } - return errors.Annotatef(err, "Store: %d close the connection", storeID) + log.Error("fail to backup", zap.Uint64("StoreID", storeID), + zap.Int("retry time", retry)) + return errors.Trace(err) } - // TODO: handle errors in the resp. - log.Info("range backuped", - zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())), - zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey()))) - err = respFn(resp) - if err != nil { - return err + + for { + resp, err := bcli.Recv() + if err != nil { + if err == io.EOF { + log.Info("backup streaming finish", + zap.Uint64("StoreID", storeID), + zap.Int("retry time", retry)) + break backupLoop + } + if isRetryableError(err) { + time.Sleep(3 * time.Second) + // current tikv is unavailable + client, errReset = resetFn() + if errReset != nil { + return errors.Annotatef(errReset, "failed to reset recv connection on store:%d "+ + "please check the tikv status", storeID) + } + break + } + return errors.Annotatef(err, "failed to connect to store: %d with retry times:%d", storeID, retry) + } + // TODO: handle errors in the resp. + log.Info("range backuped", + zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())), + zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey()))) + err = respFn(resp) + if err != nil { + return err + } } } return nil @@ -922,3 +965,8 @@ func CollectChecksums(backupMeta *kvproto.BackupMeta) ([]Checksum, error) { return checksums, nil } + +// isRetryableError represents whether we should retry reset grpc connection. +func isRetryableError(err error) bool { + return status.Code(err) == codes.Unavailable || status.Code(err) == codes.Canceled +} diff --git a/pkg/backup/push.go b/pkg/backup/push.go index fd1903ab9..7dc7d8942 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -62,6 +62,10 @@ func (push *pushDown) pushBackup( // Forward all responses (including error). push.respCh <- resp return nil + }, + func() (backup.BackupClient, error) { + log.Warn("reset the connection in push", zap.Uint64("storeID", storeID)) + return push.mgr.ResetBackupClient(ctx, storeID) }) if err != nil { push.errCh <- err diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index a39dda5e4..ee8aef11e 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -36,12 +36,14 @@ import ( ) const ( - dialTimeout = 5 * time.Second + dialTimeout = 30 * time.Second clusterVersionPrefix = "pd/api/v1/config/cluster-version" regionCountPrefix = "pd/api/v1/stats/region" schdulerPrefix = "pd/api/v1/schedulers" maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response scheduleConfigPrefix = "pd/api/v1/config/schedule" + + resetRetryTimes = 3 ) // Mgr manages connections to a TiDB cluster. @@ -334,6 +336,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl ctx, addr, opt, + grpc.WithBlock(), grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Duration(keepAlive) * time.Second, @@ -344,8 +347,6 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl if err != nil { return nil, errors.WithStack(err) } - // Cache the conn. - mgr.grpcClis.clis[storeID] = conn return conn, nil } @@ -363,6 +364,43 @@ func (mgr *Mgr) GetBackupClient(ctx context.Context, storeID uint64) (backup.Bac if err != nil { return nil, errors.Trace(err) } + // Cache the conn. + mgr.grpcClis.clis[storeID] = conn + return backup.NewBackupClient(conn), nil +} + +// ResetBackupClient reset the connection for backup client. +func (mgr *Mgr) ResetBackupClient(ctx context.Context, storeID uint64) (backup.BackupClient, error) { + mgr.grpcClis.mu.Lock() + defer mgr.grpcClis.mu.Unlock() + + if conn, ok := mgr.grpcClis.clis[storeID]; ok { + // Find a cached backup client. + log.Info("Reset backup client", zap.Uint64("storeID", storeID)) + err := conn.Close() + if err != nil { + log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID)) + } + delete(mgr.grpcClis.clis, storeID) + } + var ( + conn *grpc.ClientConn + err error + ) + for retry := 0; retry < resetRetryTimes; retry++ { + conn, err = mgr.getGrpcConnLocked(ctx, storeID) + if err != nil { + log.Warn("failed to reset grpc connection, retry it", + zap.Int("retry time", retry), zap.Error(err)) + time.Sleep(time.Duration(retry+3) * time.Second) + continue + } + mgr.grpcClis.clis[storeID] = conn + break + } + if err != nil { + return nil, errors.Trace(err) + } return backup.NewBackupClient(conn), nil } diff --git a/tests/br_full/run.sh b/tests/br_full/run.sh index 6fe00a17d..b846db32d 100755 --- a/tests/br_full/run.sh +++ b/tests/br_full/run.sh @@ -28,13 +28,19 @@ for i in $(seq $DB_COUNT); do row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}') done +# backup full and kill tikv to test reset connection +echo "backup with limit start..." +export GO_FAILPOINTS="github.com/pingcap/br/pkg/backup/reset-retryable-error=1*return(true)" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-limit" --concurrency 4 +export GO_FAILPOINTS="" + # backup full echo "backup with lz4 start..." -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --ratelimit 5 --concurrency 4 --compression lz4 +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}') echo "backup with zstd start..." -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --ratelimit 5 --concurrency 4 --compression zstd --compression-level 6 +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --concurrency 4 --compression zstd --compression-level 6 size_zstd=$(du -d 0 $TEST_DIR/$DB-zstd | awk '{print $1}') if [ "$size_lz4" -le "$size_zstd" ]; then @@ -42,7 +48,7 @@ if [ "$size_lz4" -le "$size_zstd" ]; then exit -1 fi -for ct in lz4 zstd; do +for ct in limit lz4 zstd; do for i in $(seq $DB_COUNT); do run_sql "DROP DATABASE $DB${i};" done