From 16f3da2c3482d0dfd50d41706e7067d56920ebed Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 18 Jul 2019 13:10:15 +0800 Subject: [PATCH 1/5] store/tikv: refine streaming client re-create log and use a smarter backoff strategy. --- store/tikv/client_batch.go | 47 +++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index f959deafa55bf..c8c7d0a780610 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -16,6 +16,7 @@ package tikv import ( "context" + "math" "sync" "sync/atomic" "time" @@ -209,7 +210,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } -func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { +func (c *batchCommandsClient) reCreateStreamingClient(err error, skipLog bool) error { // Hold the lock to forbid batchSendLoop using the old client. c.clientLock.Lock() defer c.clientLock.Unlock() @@ -224,14 +225,16 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) bool { zap.String("target", c.target), ) c.client = streamClient - return true + return err + } + if !skipLog { + logutil.BgLogger().Error( + "batchRecvLoop re-create streaming fail", + zap.String("target", c.target), + zap.Error(err), + ) } - logutil.BgLogger().Error( - "batchRecvLoop re-create streaming fail", - zap.String("target", c.target), - zap.Error(err), - ) - return false + return err } func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { @@ -249,23 +252,31 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { for { resp, err := c.recv() if err != nil { + b := NewBackoffer(context.Background(), math.MaxInt32) now := time.Now() - for { // try to re-create the streaming in the loop. + skipLog := false + for count := 0; ; count++ { // try to re-create the streaming in the loop. if c.isStopped() { return } - logutil.BgLogger().Error( - "batchRecvLoop error when receive", - zap.String("target", c.target), - zap.Error(err), - ) - if c.reCreateStreamingClient(err) { - break + if count <= 10 { + logutil.BgLogger().Error( + "batchRecvLoop error when receive", + zap.String("target", c.target), + zap.Error(err), + ) + if count == 10 { + logutil.BgLogger().Error("meet error for 10 times and don't show the same log any more") + skipLog = true + } } - // TODO: Use a more smart backoff strategy. - time.Sleep(time.Second) + err1 := c.reCreateStreamingClient(err, skipLog) + if err1 == nil { + break + } + b.Backoff(boTiKVRPC, err1) } metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) continue From e872fe94f0595a32f0a7add3a02d0850274e290c Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 18 Jul 2019 13:31:13 +0800 Subject: [PATCH 2/5] make golint happy --- store/tikv/client_batch.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index c8c7d0a780610..beecaab7c6257 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -276,7 +276,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { if err1 == nil { break } - b.Backoff(boTiKVRPC, err1) + if err := b.Backoff(boTiKVRPC, err1); err != nil { + logutil.BgLogger().Error("backoff error", zap.Error(err)) + } } metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) continue From 5610bff4ec9ca671b89d58bc6e45e27d9009fb11 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Jul 2019 10:41:35 +0800 Subject: [PATCH 3/5] address comment --- store/tikv/client_batch.go | 36 +++++++++++------------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index beecaab7c6257..df372e6b1d001 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -210,7 +210,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } -func (c *batchCommandsClient) reCreateStreamingClient(err error, skipLog bool) error { +func (c *batchCommandsClient) reCreateStreamingClient(err error) error { // Hold the lock to forbid batchSendLoop using the old client. c.clientLock.Lock() defer c.clientLock.Unlock() @@ -227,13 +227,6 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error, skipLog bool) e c.client = streamClient return err } - if !skipLog { - logutil.BgLogger().Error( - "batchRecvLoop re-create streaming fail", - zap.String("target", c.target), - zap.Error(err), - ) - } return err } @@ -252,32 +245,25 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { for { resp, err := c.recv() if err != nil { + logutil.BgLogger().Error( + "batchRecvLoop error when receive", + zap.String("target", c.target), + zap.Error(err), + ) + b := NewBackoffer(context.Background(), math.MaxInt32) now := time.Now() - skipLog := false - for count := 0; ; count++ { // try to re-create the streaming in the loop. + for { // try to re-create the streaming in the loop. if c.isStopped() { return } - if count <= 10 { - logutil.BgLogger().Error( - "batchRecvLoop error when receive", - zap.String("target", c.target), - zap.Error(err), - ) - if count == 10 { - logutil.BgLogger().Error("meet error for 10 times and don't show the same log any more") - skipLog = true - } - } - - err1 := c.reCreateStreamingClient(err, skipLog) + err1 := c.reCreateStreamingClient(err) if err1 == nil { break } - if err := b.Backoff(boTiKVRPC, err1); err != nil { - logutil.BgLogger().Error("backoff error", zap.Error(err)) + if err2 := b.Backoff(boTiKVRPC, err1); err2 != nil { + logutil.BgLogger().Error("backoff error", zap.Error(err2)) } } metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) From 8c94499e9ad0187996659a89987c4091ce5fc002 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Jul 2019 10:59:04 +0800 Subject: [PATCH 4/5] address comment --- store/tikv/client_batch.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index df372e6b1d001..5c5c5ba1e9d3f 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -262,9 +263,10 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { if err1 == nil { break } - if err2 := b.Backoff(boTiKVRPC, err1); err2 != nil { - logutil.BgLogger().Error("backoff error", zap.Error(err2)) - } + err2 := b.Backoff(boTiKVRPC, err1) + // As timeout is set to math.MaxUint32, err2 should always be nil. + // This line is added to make the 'make errcheck' pass. + terror.Log(err2) } metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds()) continue From 616f7865a3031a76ef5c890e8682d01e355280e6 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 22 Jul 2019 11:07:31 +0800 Subject: [PATCH 5/5] address comment --- store/tikv/client_batch.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 5c5c5ba1e9d3f..467fab1827d41 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -226,8 +226,13 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error { zap.String("target", c.target), ) c.client = streamClient - return err + return nil } + logutil.BgLogger().Error( + "batchRecvLoop re-create streaming fail", + zap.String("target", c.target), + zap.Error(err), + ) return err }