From 0c5af78012567e7b4291cdf438fe0801aac71b1c Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 19 Jun 2019 12:53:41 +0800 Subject: [PATCH] tikv: avoid switch peer when batchRequest be cancelled (#10822) --- store/tikv/client.go | 12 ++++++------ store/tikv/client_test.go | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/store/tikv/client.go b/store/tikv/client.go index d4df0be5f4100..f8c0feb9f4d3e 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -35,10 +35,8 @@ import ( "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" "google.golang.org/grpc" - gcodes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - gstatus "google.golang.org/grpc/status" ) // MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than @@ -618,8 +616,9 @@ func sendBatchRequest( select { case connArray.batchCommandsCh <- entry: case <-ctx1.Done(): - logutil.Logger(context.Background()).Warn("send request is timeout", zap.String("to", addr)) - return nil, errors.Trace(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout")) + logutil.Logger(context.Background()).Warn("send request is cancelled", + zap.String("to", addr), zap.String("cause", ctx1.Err().Error())) + return nil, errors.Trace(ctx1.Err()) } select { @@ -630,8 +629,9 @@ func sendBatchRequest( return tikvrpc.FromBatchCommandsResponse(res), nil case <-ctx1.Done(): atomic.StoreInt32(&entry.canceled, 1) - logutil.Logger(context.Background()).Warn("send request is canceled", zap.String("to", addr)) - return nil, errors.Trace(gstatus.Error(gcodes.DeadlineExceeded, "Canceled or timeout")) + logutil.Logger(context.Background()).Warn("wait response is cancelled", + zap.String("to", addr), zap.String("cause", ctx1.Err().Error())) + return nil, errors.Trace(ctx1.Err()) } } diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index e52498726bef1..4d4d2fa1e7d38 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -14,9 +14,12 @@ package tikv import ( + "context" "testing" + "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" ) @@ -77,3 +80,16 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) { newEntryPtr := &entries[0] c.Assert(entryPtr, Equals, newEntryPtr) } + +func (s *testClientSuite) TestCancelTimeoutRetErr(c *C) { + req := new(tikvpb.BatchCommandsRequest_Request) + a := &connArray{batchCommandsCh: make(chan *batchCommandsEntry, 1)} + + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + _, err := sendBatchRequest(ctx, "", a, req, 2*time.Second) + c.Assert(errors.Cause(err), Equals, context.Canceled) + + _, err = sendBatchRequest(context.Background(), "", a, req, 0) + c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded) +}