Skip to content

Commit

Permalink
store/tikv: reduce the lock contend between sending message an… (#11372)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and zz-jason committed Jul 23, 2019
1 parent 8dd4a27 commit 5847d3a
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 70 deletions.
15 changes: 7 additions & 8 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,15 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
return errors.Trace(err)
}
batchClient := &batchCommandsClient{
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
tikvTransportLayerLoad: &a.tikvTransportLayerLoad,
closed: 0,
target: a.target,
conn: conn,
client: streamClient,
batched: sync.Map{},
idAlloc: 0,
closed: 0,
}
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
go batchClient.batchRecvLoop(cfg.TiKVClient)
go batchClient.batchRecvLoop(cfg.TiKVClient, &a.tikvTransportLayerLoad)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout)
Expand Down
164 changes: 104 additions & 60 deletions store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,31 +152,57 @@ func fetchMorePendingRequests(
}
}

type tryLock struct {
sync.RWMutex
reCreating bool
}

func (l *tryLock) tryLockForSend() bool {
l.RLock()
if l.reCreating {
l.RUnlock()
return false
}
return true
}

func (l *tryLock) unlockForSend() {
l.RUnlock()
}

func (l *tryLock) lockForRecreate() {
l.Lock()
l.reCreating = true
l.Unlock()

}

func (l *tryLock) unlockForRecreate() {
l.Lock()
l.reCreating = false
l.Unlock()
}

type batchCommandsClient struct {
// The target host.
target string

conn *grpc.ClientConn
client tikvpb.Tikv_BatchCommandsClient
batched sync.Map
idAlloc uint64
tikvTransportLayerLoad *uint64
conn *grpc.ClientConn
client tikvpb.Tikv_BatchCommandsClient
batched sync.Map
idAlloc uint64

// closed indicates the batch client is closed explicitly or not.
closed int32
// clientLock protects client when re-create the streaming.
clientLock sync.Mutex
// tryLock protects client when re-create the streaming.
tryLock
}

func (c *batchCommandsClient) isStopped() bool {
return atomic.LoadInt32(&c.closed) != 0
}

func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries []*batchCommandsEntry) {
// Use the lock to protect the stream client won't be replaced by RecvLoop,
// and new added request won't be removed by `failPendingRequests`.
c.clientLock.Lock()
defer c.clientLock.Unlock()
for i, requestID := range request.RequestIds {
c.batched.Store(requestID, entries[i])
}
Expand Down Expand Up @@ -211,10 +237,7 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
})
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
// Hold the lock to forbid batchSendLoop using the old client.
c.clientLock.Lock()
defer c.clientLock.Unlock()
func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error {
c.failPendingRequests(err) // fail all pending requests.

// Re-establish a application layer stream. TCP layer is handled by gRPC.
Expand All @@ -226,6 +249,7 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
zap.String("target", c.target),
)
c.client = streamClient

return nil
}
logutil.Logger(context.Background()).Error(
Expand All @@ -236,15 +260,15 @@ func (c *batchCommandsClient) reCreateStreamingClient(err error) error {
return err
}

func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient, tikvTransportLayerLoad *uint64) {
defer func() {
if r := recover(); r != nil {
metrics.PanicCounter.WithLabelValues(metrics.LabelBatchRecvLoop).Inc()
logutil.Logger(context.Background()).Error("batchRecvLoop",
zap.Reflect("r", r),
zap.Stack("stack"))
logutil.Logger(context.Background()).Info("restart batchRecvLoop")
go c.batchRecvLoop(cfg)
go c.batchRecvLoop(cfg, tikvTransportLayerLoad)
}
}()

Expand All @@ -257,21 +281,9 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
zap.Error(err),
)

b := NewBackoffer(context.Background(), math.MaxInt32)
now := time.Now()
for { // try to re-create the streaming in the loop.
if c.isStopped() {
return
}

err1 := c.reCreateStreamingClient(err)
if err1 == nil {
break
}
err2 := b.Backoff(boTiKVRPC, err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
if stopped := c.reCreateStreamingClient(err); stopped {
return
}
metrics.TiKVBatchClientUnavailable.Observe(time.Since(now).Seconds())
continue
Expand All @@ -294,14 +306,37 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) {
c.batched.Delete(requestID)
}

tikvTransportLayerLoad := resp.GetTransportLayerLoad()
if tikvTransportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 {
transportLayerLoad := resp.GetTransportLayerLoad()
if transportLayerLoad > 0.0 && cfg.MaxBatchWaitTime > 0 {
// We need to consider TiKV load only if batch-wait strategy is enabled.
atomic.StoreUint64(c.tikvTransportLayerLoad, tikvTransportLayerLoad)
atomic.StoreUint64(tikvTransportLayerLoad, transportLayerLoad)
}
}
}

func (c *batchCommandsClient) reCreateStreamingClient(err error) (stopped bool) {
// Forbids the batchSendLoop using the old client.
c.lockForRecreate()
defer c.unlockForRecreate()

b := NewBackoffer(context.Background(), math.MaxInt32)
for { // try to re-create the streaming in the loop.
if c.isStopped() {
return true
}
err1 := c.reCreateStreamingClientOnce(err)
if err1 == nil {
break
}

err2 := b.Backoff(boTiKVRPC, err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
}
return false
}

type batchCommandsEntry struct {
req *tikvpb.BatchCommandsRequest_Request
res chan *tikvpb.BatchCommandsResponse_Response
Expand Down Expand Up @@ -335,10 +370,6 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {

var bestBatchWaitSize = cfg.BatchWaitSize
for {
// Choose a connection by round-robbin.
next := atomic.AddUint32(&a.index, 1) % uint32(len(a.batchCommandsClients))
batchCommandsClient := a.batchCommandsClients[next]

entries = entries[:0]
requests = requests[:0]
requestIDs = requestIDs[:0]
Expand All @@ -347,9 +378,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
a.fetchAllPendingRequests(int(cfg.MaxBatchSize), &entries, &requests)

if len(entries) < int(cfg.MaxBatchSize) && cfg.MaxBatchWaitTime > 0 {
tikvTransportLayerLoad := atomic.LoadUint64(batchCommandsClient.tikvTransportLayerLoad)
// If the target TiKV is overload, wait a while to collect more requests.
if uint(tikvTransportLayerLoad) >= cfg.OverloadThreshold {
if atomic.LoadUint64(&a.tikvTransportLayerLoad) >= uint64(cfg.OverloadThreshold) {
fetchMorePendingRequests(
a.batchCommandsCh, int(cfg.MaxBatchSize), int(bestBatchWaitSize),
cfg.MaxBatchWaitTime, &entries, &requests,
Expand All @@ -367,23 +397,40 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
bestBatchWaitSize += 1
}

length = removeCanceledRequests(&entries, &requests)
if length == 0 {
entries, requests = removeCanceledRequests(entries, requests)
if len(entries) == 0 {
continue // All requests are canceled.
}
maxBatchID := atomic.AddUint64(&batchCommandsClient.idAlloc, uint64(length))
for i := 0; i < length; i++ {
requestID := uint64(i) + maxBatchID - uint64(length)
requestIDs = append(requestIDs, requestID)
}

req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
a.getClientAndSend(entries, requests, requestIDs)
}
}

func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*tikvpb.BatchCommandsRequest_Request, requestIDs []uint64) {
// Choose a connection by round-robbin.
var cli *batchCommandsClient
for {
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
cli = a.batchCommandsClients[a.index]
// The lock protects the batchCommandsClient from been closed while it's inuse.
if cli.tryLockForSend() {
break
}
}
defer cli.unlockForSend()

batchCommandsClient.send(req, entries)
maxBatchID := atomic.AddUint64(&cli.idAlloc, uint64(len(requests)))
for i := 0; i < len(requests); i++ {
requestID := uint64(i) + maxBatchID - uint64(len(requests))
requestIDs = append(requestIDs, requestID)
}
req := &tikvpb.BatchCommandsRequest{
Requests: requests,
RequestIds: requestIDs,
}

cli.send(req, entries)
return
}

func (a *batchConn) Close() {
Expand All @@ -399,20 +446,17 @@ func (a *batchConn) Close() {
}

// removeCanceledRequests removes canceled requests before sending.
func removeCanceledRequests(
entries *[]*batchCommandsEntry,
requests *[]*tikvpb.BatchCommandsRequest_Request) int {
validEntries := (*entries)[:0]
validRequets := (*requests)[:0]
for _, e := range *entries {
func removeCanceledRequests(entries []*batchCommandsEntry,
requests []*tikvpb.BatchCommandsRequest_Request) ([]*batchCommandsEntry, []*tikvpb.BatchCommandsRequest_Request) {
validEntries := entries[:0]
validRequets := requests[:0]
for _, e := range entries {
if !e.isCanceled() {
validEntries = append(validEntries, e)
validRequets = append(validRequets, e.req)
}
}
*entries = validEntries
*requests = validRequets
return len(*entries)
return validEntries, validRequets
}

func sendBatchRequest(
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (s *testClientSuite) TestRemoveCanceledRequests(c *C) {
for i := range entries {
requests[i] = entries[i].req
}
length := removeCanceledRequests(&entries, &requests)
c.Assert(length, Equals, 2)
entries, requests = removeCanceledRequests(entries, requests)
c.Assert(len(entries), Equals, 2)
for _, e := range entries {
c.Assert(e.isCanceled(), IsFalse)
}
Expand Down

0 comments on commit 5847d3a

Please sign in to comment.