diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index c34d5eb65643a..be374e3fc5145 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -110,12 +110,12 @@ func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs in } select { case <-time.After(time.Duration(realSleep) * time.Millisecond): + attempts++ + lastSleep = sleep + return lastSleep case <-ctx.Done(): + return 0 } - - attempts++ - lastSleep = sleep - return lastSleep } } diff --git a/store/tikv/client.go b/store/tikv/client.go index 09a34912463bf..eed1a7ed7a060 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -336,6 +336,10 @@ type batchCommandsEntry struct { err error } +func (b *batchCommandsEntry) isCanceled() bool { + return atomic.LoadInt32(&b.canceled) == 1 +} + const idleTimeout = 3 * time.Minute // fetchAllPendingRequests fetches all pending requests from the channel. @@ -476,6 +480,10 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { bestBatchWaitSize += 1 } + length = removeCanceledRequests(&entries, &requests) + if length == 0 { + continue // All requests are canceled. + } maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length)) for i := 0; i < length; i++ { requestID := uint64(i) + maxBatchID - uint64(length) @@ -506,6 +514,23 @@ func (a *connArray) batchSendLoop(cfg config.TiKVClient) { } } +// removeCanceledRequests removes canceled requests before sending. +func removeCanceledRequests( + entries *[]*batchCommandsEntry, + requests *[]*tikvpb.BatchCommandsRequest_Request) int { + validEntries := (*entries)[:0] + validRequets := (*requests)[:0] + for _, e := range *entries { + if !e.isCanceled() { + validEntries = append(validEntries, e) + validRequets = append(validRequets, e.req) + } + } + *entries = validEntries + *requests = validRequets + return len(*entries) +} + // rpcClient is RPC client struct. // TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV. // Since we use shared client connection to communicate to the same TiKV, it's possible diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index b673e9b674358..e52498726bef1 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -17,6 +17,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" ) @@ -52,3 +53,27 @@ func (s *testClientSuite) TestConn(c *C) { c.Assert(err, NotNil) c.Assert(conn3, IsNil) } + +func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { + req := new(tikvpb.BatchCommandsRequest_Request) + entries := []*batchCommandsEntry{ + {canceled: 1, req: req}, + {canceled: 0, req: req}, + {canceled: 1, req: req}, + {canceled: 1, req: req}, + {canceled: 0, req: req}, + } + entryPtr := &entries[0] + requests := make([]*tikvpb.BatchCommandsRequest_Request, len(entries)) + for i := range entries { + requests[i] = entries[i].req + } + length := removeCanceledRequests(&entries, &requests) + c.Assert(length, Equals, 2) + for _, e := range entries { + c.Assert(e.isCanceled(), IsFalse) + } + c.Assert(len(requests), Equals, 2) + newEntryPtr := &entries[0] + c.Assert(entryPtr, Equals, newEntryPtr) +}