diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index f959deafa55bf..467fab1827d41 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -16,6 +16,7 @@ package tikv import ( "context" + "math" "sync" "sync/atomic" "time" @@ -23,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -209,7 +211,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } -func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { +func (c *batchCommandsClient) reCreateStreamingClient(err error) error { // Hold the lock to forbid batchSendLoop using the old client. c.clientLock.Lock() defer c.clientLock.Unlock() @@ -224,14 +226,14 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { zap.String("target", c.target), ) c.client = streamClient - return true + return nil } logutil.BgLogger().Error( "batchRecvLoop re-create streaming fail", zap.String("target", c.target), zap.Error(err), ) - return false + return err } func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { @@ -249,23 +251,27 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { for { resp, err := c.recv() if err != nil { + logutil.BgLogger().Error( + "batchRecvLoop error when receive", + zap.String("target", c.target), + zap.Error(err), + ) + + b := NewBackoffer(context.Background(), math.MaxInt32) now := time.Now() for { // try to re-create the streaming in the loop. if c.isStopped() { return } - logutil.BgLogger().Error( - "batchRecvLoop error when receive", - zap.String("target", c.target), - zap.Error(err), - ) - if c.reCreateStreamingClient(err) { + err1 := c.reCreateStreamingClient(err) + if err1 == nil { break } - - // TODO: Use a more smart backoff strategy. - time.Sleep(time.Second) + err2 := b.Backoff(boTiKVRPC, err1) + // As timeout is set to math.MaxUint32, err2 should always be nil. + // This line is added to make the 'make errcheck' pass. + terror.Log(err2) } metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) continue