From 540dd060e49f8c694806c13fc95054ae37bdafa0 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 2 Aug 2019 15:28:20 +0800 Subject: [PATCH] tikv: forbid to try to get a connection forever (#11391) (#11531) --- store/tikv/client_batch.go | 19 +++++++++++++++---- store/tikv/client_fail_test.go | 1 + store/tikv/client_test.go | 23 +++++++++++++++++++++++ 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index 937f8221e2bef..36318dd9b8e1f 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -408,15 +408,26 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) { // Choose a connection by round-robbin. - var cli *batchCommandsClient - for { + var cli *batchCommandsClient = nil + var target string = "" + for i := 0; i < len(a.batchCommandsClients); i++ { a.index = (a.index + 1) % uint32(len(a.batchCommandsClients)) - cli = a.batchCommandsClients[a.index] + target = a.batchCommandsClients[a.index].target // The lock protects the batchCommandsClient from been closed while it's inuse. - if cli.tryLockForSend() { + if a.batchCommandsClients[a.index].tryLockForSend() { + cli = a.batchCommandsClients[a.index] break } } + if cli == nil { + logutil.Logger(context.Background()).Warn("no available connections", zap.String("target", target)) + for _, entry := range entries { + // Please ensure the error is handled in region cache correctly. + entry.err = errors.New("no available connections") + close(entry.res) + } + return + } defer cli.unlockForSend() maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(len(requests))) diff --git a/store/tikv/client_fail_test.go b/store/tikv/client_fail_test.go index 96f64bd571629..ad49b5040da1b 100644 --- a/store/tikv/client_fail_test.go +++ b/store/tikv/client_fail_test.go @@ -61,6 +61,7 @@ func (s *testClientFailSuite) TestPanicInRecvLoop(c *C) { time.Sleep(time.Second) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gotErrorInRecvLoop"), IsNil) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/panicInFailPendingRequests"), IsNil) + time.Sleep(time.Second) req := &tikvrpc.Request{ Type: tikvrpc.CmdEmpty, diff --git a/store/tikv/client_test.go b/store/tikv/client_test.go index c7e197bbe351a..5dd9567bcc721 100644 --- a/store/tikv/client_test.go +++ b/store/tikv/client_test.go @@ -15,6 +15,7 @@ package tikv import ( "context" + "fmt" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv/tikvrpc" ) func TestT(t *testing.T) { @@ -97,3 +99,24 @@ func (s *testClientSuite) TestCancelTimeoutRetErr(c *C) { _, err = sendBatchRequest(context.Background(), "", a, req, 0) c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded) } + +func (s *testClientSuite) TestSendWhenReconnect(c *C) { + server, port := startMockTikvService() + c.Assert(port > 0, IsTrue) + + rpcClient := newRPCClient(config.Security{}) + addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) + conn, err := rpcClient.getConnArray(addr) + c.Assert(err, IsNil) + + // Suppose all connections are re-establishing. + for _, client := range conn.batchConn.batchCommandsClients { + client.lockForRecreate() + } + + req := &tikvrpc.Request{Type: tikvrpc.CmdEmpty, Empty: &tikvpb.BatchCommandsEmptyRequest{}} + _, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second) + c.Assert(err.Error() == "no available connections", IsTrue) + conn.Close() + server.Stop() +}