From 6e9007c51e518209fde6aa27717cac7f18af7331 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 29 May 2019 14:02:23 +0800 Subject: [PATCH 1/3] store/tikv: remove canceled requests before sending --- store/tikv/client.go | 22 ++++++++++++++++++++++ store/tikv/client_test.go | 25 +++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/store/tikv/client.go b/store/tikv/client.go index 09a34912463bf..4bb22a01b6a66 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -336,6 +336,10 @@ type batchCommandsEntry struct { err error } +func (b *batchCommandsEntry) isCanceled() bool { + return atomic.LoadInt32(&b.canceled) == 1 +} + const idleTimeout = 3 * time.Minute // fetchAllPendingRequests fetches all pending requests from the channel. @@ -476,6 +480,7 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { bestBatchWaitSize += 1 } + length = removeCanceledRequests(&entries, &requests) maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length)) for i := 0; i < length; i++ { requestID := uint64(i) + maxBatchID - uint64(length) @@ -506,6 +511,23 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { } } +// removeCanceledRequests removes canceled requests before sending. +func removeCanceledRequests( + entries *[]*batchCommandsEntry, + requests *[]*tikvpb.BatchCommandsRequest_Request) int { + validEntries := (*entries)[:0] + validRequets := (*requests)[:0] + for _, e := range *entries { + if !e.isCanceled() { + validEntries = append(validEntries, e) + validRequets = append(validRequets, e.req) + } + } + *entries = validEntries + *requests = validRequets + return len(*entries) +} + // rpcClient is RPC client struct. // TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV. // Since we use shared client connection to communicate to the same TiKV, it's possible diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index b673e9b674358..e52498726bef1 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -17,6 +17,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" ) @@ -52,3 +53,27 @@ func (s *testClientSuite) TestConn(c *C) { c.Assert(err, NotNil) c.Assert(conn3, IsNil) } + +func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { + req := new(tikvpb.BatchCommandsRequest_Request) + entries := []*batchCommandsEntry{ + {canceled: 1, req: req}, + {canceled: 0, req: req}, + {canceled: 1, req: req}, + {canceled: 1, req: req}, + {canceled: 0, req: req}, + } + entryPtr := &entries[0] + requests := make([]*tikvpb.BatchCommandsRequest_Request, len(entries)) + for i := range entries { + requests[i] = entries[i].req + } + length := removeCanceledRequests(&entries, &requests) + c.Assert(length, Equals, 2) + for _, e := range entries { + c.Assert(e.isCanceled(), IsFalse) + } + c.Assert(len(requests), Equals, 2) + newEntryPtr := &entries[0] + c.Assert(entryPtr, Equals, newEntryPtr) +} From c605454a0036b87cfd199dcbf8096001964f7ee8 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 29 May 2019 14:24:35 +0800 Subject: [PATCH 2/3] continue if all are canceled. --- store/tikv/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/store/tikv/client.go b/store/tikv/client.go index 4bb22a01b6a66..eed1a7ed7a060 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -481,6 +481,9 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { } length = removeCanceledRequests(&entries, &requests) + if length == 0 { + continue // All requests are canceled. + } maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length)) for i := 0; i < length; i++ { requestID := uint64(i) + maxBatchID - uint64(length) From c6622506318618de618c407f135b9ca099ec39cc Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Fri, 31 May 2019 12:50:56 +0800 Subject: [PATCH 3/3] return 0 when backoff context is done. --- store/tikv/backoff.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index c34d5eb65643a..be374e3fc5145 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -110,12 +110,12 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs in } select { case <-time.After(time.Duration(realSleep) * time.Millisecond): + attempts++ + lastSleep = sleep + return lastSleep case <-ctx.Done(): + return 0 } - - attempts++ - lastSleep = sleep - return lastSleep } }