Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

backup: add retry when tikv is down #508

Merged
merged 11 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 72 additions & 24 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand All @@ -31,6 +32,8 @@ import (
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
Expand All @@ -43,6 +46,7 @@ import (
// ClientMgr manages connections needed by backup.
type ClientMgr interface {
GetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error)
ResetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error)
GetPDClient() pd.Client
GetTiKV() tikv.Storage
GetLockResolver() *tikv.LockResolver
Expand All @@ -59,6 +63,7 @@ type Checksum struct {
// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
backupRetryTimes = 5
)

// Client is a client instructs TiKV how to do a backup.
Expand Down Expand Up @@ -776,6 +781,10 @@ func (bc *Client) handleFineGrained(
respCh <- response
}
return nil
},
func() (kvproto.BackupClient, error) {
log.Warn("reset the connection in handleFineGrained", zap.Uint64("storeID", storeID))
return bc.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
return 0, err
Expand All @@ -791,34 +800,68 @@ func SendBackup(
client kvproto.BackupClient,
req kvproto.BackupRequest,
respFn func(*kvproto.BackupResponse) error,
resetFn func() (kvproto.BackupClient, error),
) error {
log.Info("try backup",
zap.Stringer("StartKey", utils.WrapKey(req.StartKey)),
zap.Stringer("EndKey", utils.WrapKey(req.EndKey)),
zap.Uint64("storeID", storeID),
)
bcli, err := client.Backup(ctx, &req)
if err != nil {
log.Error("fail to backup", zap.Uint64("StoreID", storeID))
return err
}
for {
resp, err := bcli.Recv()
var errReset error
backupLoop:
for retry := 0; retry < backupRetryTimes; retry++ {
log.Info("try backup",
zap.Stringer("StartKey", utils.WrapKey(req.StartKey)),
zap.Stringer("EndKey", utils.WrapKey(req.EndKey)),
zap.Uint64("storeID", storeID),
zap.Int("retry time", retry),
)
bcli, err := client.Backup(ctx, &req)
YuJuncen marked this conversation as resolved.
Show resolved Hide resolved
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
log.Debug("failpoint reset-retryable-error injected.")
err = status.Errorf(codes.Unavailable, "Unavailable error")
}
})
if err != nil {
if err == io.EOF {
log.Info("backup streaming finish",
zap.Uint64("StoreID", storeID))
break
if isRetryableError(err) {
time.Sleep(3 * time.Second)
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset backup connection on store:%d "+
"please check the tikv status", storeID)
}
continue
}
return errors.Annotatef(err, "Store: %d close the connection", storeID)
log.Error("fail to backup", zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
return errors.Trace(err)
}
// TODO: handle errors in the resp.
log.Info("range backuped",
zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())),
zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey())))
err = respFn(resp)
if err != nil {
return err

for {
resp, err := bcli.Recv()
if err != nil {
if err == io.EOF {
log.Info("backup streaming finish",
zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
break backupLoop
}
if isRetryableError(err) {
time.Sleep(3 * time.Second)
// current tikv is unavailable
client, errReset = resetFn()
if errReset != nil {
return errors.Annotatef(errReset, "failed to reset recv connection on store:%d "+
"please check the tikv status", storeID)
}
break
}
return errors.Annotatef(err, "failed to connect to store: %d with retry times:%d", storeID, retry)
}
// TODO: handle errors in the resp.
log.Info("range backuped",
zap.Stringer("StartKey", utils.WrapKey(resp.GetStartKey())),
zap.Stringer("EndKey", utils.WrapKey(resp.GetEndKey())))
err = respFn(resp)
if err != nil {
return err
}
}
}
return nil
Expand Down Expand Up @@ -922,3 +965,8 @@ func CollectChecksums(backupMeta *kvproto.BackupMeta) ([]Checksum, error) {

return checksums, nil
}

// isRetryableError represents whether we should retry reset grpc connection.
func isRetryableError(err error) bool {
return status.Code(err) == codes.Unavailable || status.Code(err) == codes.Canceled
}
4 changes: 4 additions & 0 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (push *pushDown) pushBackup(
// Forward all responses (including error).
push.respCh <- resp
return nil
},
func() (backup.BackupClient, error) {
log.Warn("reset the connection in push", zap.Uint64("storeID", storeID))
return push.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
push.errCh <- err
Expand Down
44 changes: 41 additions & 3 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ import (
)

const (
dialTimeout = 5 * time.Second
dialTimeout = 30 * time.Second
clusterVersionPrefix = "pd/api/v1/config/cluster-version"
regionCountPrefix = "pd/api/v1/stats/region"
schdulerPrefix = "pd/api/v1/schedulers"
maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response
scheduleConfigPrefix = "pd/api/v1/config/schedule"

resetRetryTimes = 3
)

// Mgr manages connections to a TiDB cluster.
Expand Down Expand Up @@ -334,6 +336,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
ctx,
addr,
opt,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Expand All @@ -344,8 +347,6 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
if err != nil {
return nil, errors.WithStack(err)
}
// Cache the conn.
mgr.grpcClis.clis[storeID] = conn
return conn, nil
}

Expand All @@ -363,6 +364,43 @@ func (mgr *Mgr) GetBackupClient(ctx context.Context, storeID uint64) (backup.Bac
if err != nil {
return nil, errors.Trace(err)
}
// Cache the conn.
mgr.grpcClis.clis[storeID] = conn
return backup.NewBackupClient(conn), nil
}

// ResetBackupClient reset the connection for backup client.
func (mgr *Mgr) ResetBackupClient(ctx context.Context, storeID uint64) (backup.BackupClient, error) {
mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()

if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
log.Info("Reset backup client", zap.Uint64("storeID", storeID))
err := conn.Close()
if err != nil {
log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID))
}
delete(mgr.grpcClis.clis, storeID)
YuJuncen marked this conversation as resolved.
Show resolved Hide resolved
}
var (
conn *grpc.ClientConn
err error
)
for retry := 0; retry < resetRetryTimes; retry++ {
conn, err = mgr.getGrpcConnLocked(ctx, storeID)
if err != nil {
log.Warn("failed to reset grpc connection, retry it",
zap.Int("retry time", retry), zap.Error(err))
time.Sleep(time.Duration(retry+3) * time.Second)
continue
}
mgr.grpcClis.clis[storeID] = conn
break
}
if err != nil {
return nil, errors.Trace(err)
}
return backup.NewBackupClient(conn), nil
}

Expand Down
12 changes: 9 additions & 3 deletions tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,27 @@ for i in $(seq $DB_COUNT); do
row_count_ori[${i}]=$(run_sql "SELECT COUNT(*) FROM $DB${i}.$TABLE;" | awk '/COUNT/{print $2}')
done

# backup full and kill tikv to test reset connection
echo "backup with limit start..."
export GO_FAILPOINTS="github.com/pingcap/br/pkg/backup/reset-retryable-error=1*return(true)"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-limit" --concurrency 4
export GO_FAILPOINTS=""

# backup full
echo "backup with lz4 start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --ratelimit 5 --concurrency 4 --compression lz4
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')

echo "backup with zstd start..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --ratelimit 5 --concurrency 4 --compression zstd --compression-level 6
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-zstd" --concurrency 4 --compression zstd --compression-level 6
size_zstd=$(du -d 0 $TEST_DIR/$DB-zstd | awk '{print $1}')

if [ "$size_lz4" -le "$size_zstd" ]; then
echo "full backup lz4 size $size_lz4 is small than backup with zstd $size_zstd"
exit -1
fi

for ct in lz4 zstd; do
for ct in limit lz4 zstd; do
for i in $(seq $DB_COUNT); do
run_sql "DROP DATABASE $DB${i};"
done
Expand Down