Skip to content

Commit

Permalink
tikv: forbid to try to get a connection forever (#11391)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Jul 24, 2019
1 parent 99e4dd4 commit 2225d5f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
19 changes: 15 additions & 4 deletions store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,26 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {

func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) {
// Choose a connection by round-robbin.
var cli *batchCommandsClient
for {
var cli *batchCommandsClient = nil
var target string = ""
for i := 0; i < len(a.batchCommandsClients); i++ {
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
cli = a.batchCommandsClients[a.index]
target = a.batchCommandsClients[a.index].target
// The lock protects the batchCommandsClient from been closed while it's inuse.
if cli.tryLockForSend() {
if a.batchCommandsClients[a.index].tryLockForSend() {
cli = a.batchCommandsClients[a.index]
break
}
}
if cli == nil {
logutil.BgLogger().Warn("no available connections", zap.String("target", target))
for _, entry := range entries {
// Please ensure the error is handled in region cache correctly.
entry.err = errors.New("no available connections")
close(entry.res)
}
return
}
defer cli.unlockForSend()

maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(len(requests)))
Expand Down
23 changes: 23 additions & 0 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ package tikv

import (
"context"
"fmt"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -98,3 +100,24 @@ func (s *testClientSuite) TestCancelTimeoutRetErr(c *C) {
_, err = sendBatchRequest(context.Background(), "", a, req, 0)
c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded)
}

func (s *testClientSuite) TestSendWhenReconnect(c *C) {
server, port := startMockTikvService()
c.Assert(port > 0, IsTrue)

rpcClient := newRPCClient(config.Security{})
addr := fmt.Sprintf("%s:%d", "127.0.0.1", port)
conn, err := rpcClient.getConnArray(addr)
c.Assert(err, IsNil)

// Suppose all connections are re-establishing.
for _, client := range conn.batchConn.batchCommandsClients {
client.lockForRecreate()
}

req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, 100*time.Second)
c.Assert(err.Error() == "no available connections", IsTrue)
conn.Close()
server.Stop()
}

0 comments on commit 2225d5f

Please sign in to comment.