Skip to content

Commit

Permalink
internal/client: remove the lock for idle connection recycle (tikv#278)
Browse files Browse the repository at this point in the history
Signed-off-by: tiancaiamao <tiancaiamao@gmail.com>
  • Loading branch information
tiancaiamao committed Aug 31, 2021
1 parent daddf73 commit 9c105c5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
17 changes: 9 additions & 8 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,7 @@ type RPCClient struct {
security config.Security

idleNotify uint32
// recycleMu protect the conns from being modified during a connArray is taken out and used.
// That means recycleIdleConnArray() will wait until nobody doing sendBatchRequest()
recycleMu sync.RWMutex

// Periodically check whether there is any connection that is idle and then close and remove these connections.
// Implement background cleanup.
isClosed bool
Expand Down Expand Up @@ -286,6 +284,13 @@ func (c *RPCClient) getConnArray(addr string, enableBatch bool, opt ...func(cfg
return nil, err
}
}

// An idle connArray will not change to active again, this avoid the race condition
// that recycling idle connection close an active connection unexpectedly (idle -> active).
if array.batchConn != nil && array.isIdle() {
return nil, errors.Errorf("rpcClient is idle")
}

return array, nil
}

Expand Down Expand Up @@ -352,7 +357,6 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
ctx = opentracing.ContextWithSpan(ctx, span1)
}

start := time.Now()
if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) {
c.recycleMu.Lock()
c.recycleIdleConnArray()
Expand All @@ -361,15 +365,12 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R

// TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request.
enableBatch := req.StoreTp != tikvrpc.TiDB && req.StoreTp != tikvrpc.TiFlash
c.recycleMu.RLock()
defer c.recycleMu.RUnlock()
connArray, err := c.getConnArray(addr, enableBatch)
if err != nil {
return nil, errors.Trace(err)
}
metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds())

start = time.Now()
start := time.Now()
defer func() {
stmtExec := ctx.Value(util.ExecDetailsKey)
if stmtExec != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ func sendBatchRequest(
}

func (c *RPCClient) recycleIdleConnArray() {
start := time.Now()
var addrs []string
c.RLock()
for _, conn := range c.conns {
Expand All @@ -811,8 +812,11 @@ func (c *RPCClient) recycleIdleConnArray() {
zap.String("target", addr))
}
c.Unlock()

if conn != nil {
conn.Close()
}
}

metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds())
}

0 comments on commit 9c105c5

Please sign in to comment.