Skip to content

Commit

Permalink
client: remove unnecessary use of a blocked channel (#4079)
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
JmPotato and ti-chi-bot authored Sep 7, 2021
1 parent b86da74 commit d9c6086
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,11 @@ func (c *client) createTsoStream(ctx context.Context, cancel context.CancelFunc,
return stream, err
}

func (c *client) checkAllocator(dispatcherCtx context.Context, forwardCancel context.CancelFunc, dc, forwardedHostTrim, addrTrim, url string, streamCh streamCh, changedCh chan bool) {
func (c *client) checkAllocator(dispatcherCtx context.Context, forwardCancel context.CancelFunc, dc, forwardedHostTrim, addrTrim, url string, streamCh streamCh) {
defer func() {
// cancel the forward stream
forwardCancel()
close(streamCh)
close(changedCh)
requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(0)
}()
cc, u := c.getAllocatorClientConnByDCLocation(dc)
Expand Down Expand Up @@ -455,7 +454,6 @@ func (c *client) checkAllocator(dispatcherCtx context.Context, forwardCancel con
cancel: cancel,
stream: stream,
}
<-changedCh
healthCancel()
return
}
Expand Down Expand Up @@ -504,7 +502,6 @@ func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoD
requests = make([]*tsoRequest, maxMergeTSORequests+1)
needUpdate = false
streamCh streamCh
changedCh chan bool
connectionCtx connectionContext
)
defer func() {
Expand All @@ -531,7 +528,7 @@ func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoD
}

connectionCtx, err = c.tryConnect(dispatcherCtx, dc)
stream, cancel, streamCh, changedCh = connectionCtx.stream, connectionCtx.cancel, connectionCtx.streamCh, connectionCtx.changeCh
stream, cancel, streamCh = connectionCtx.stream, connectionCtx.cancel, connectionCtx.streamCh

if err != nil || stream == nil {
select {
Expand Down Expand Up @@ -576,11 +573,11 @@ func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoD
}
opts = extractSpanReference(requests[:pendingPlus1], opts[:0])
select {
case s, ok := <-streamCh:
// The connection should be switched to the new stream.
case newStream, ok := <-streamCh:
if ok {
stream = s.stream
changedCh <- true
cancel = s.cancel
stream = newStream.stream
cancel = newStream.cancel
log.Info("tso stream has changed back to pd or tso allocator leader")
}
default:
Expand Down Expand Up @@ -609,10 +606,11 @@ func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoD
}

type connectionContext struct {
stream pdpb.PD_TsoClient
cancel context.CancelFunc
// Current stream to send gRPC requests, maybe a leader or a follower.
stream pdpb.PD_TsoClient
cancel context.CancelFunc
// streamCh is used to pass the new stream connection, e.g, the connection of a leader is recovered.
streamCh streamCh
changeCh chan bool
}

func (c *client) tryConnect(dispatcherCtx context.Context, dc string) (connectionContext, error) {
Expand All @@ -633,7 +631,7 @@ func (c *client) tryConnect(dispatcherCtx context.Context, dc string) (connectio
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
return connectionContext{stream, cancel, nil, nil}, nil
return connectionContext{stream, cancel, nil}, nil
}

if err != nil && c.enableForwarding {
Expand Down Expand Up @@ -668,19 +666,18 @@ func (c *client) tryConnect(dispatcherCtx context.Context, dc string) (connectio
// create the follower stream
cctx, cancel := context.WithCancel(dispatcherCtx)
cctx = grpcutil.BuildForwardContext(cctx, forwardedHost)
stream, err1 := c.createTsoStream(cctx, cancel, followerClient)
if err1 == nil {
stream, err = c.createTsoStream(cctx, cancel, followerClient)
if err == nil {
streamCh := make(streamCh)
changedCh := make(chan bool)
forwardedHostTrim := trimHTTPPrefix(forwardedHost)
addrTrim := trimHTTPPrefix(addr)
// the goroutine is used to check the network and change back to the original stream
go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addrTrim, url, streamCh, changedCh)
go c.checkAllocator(dispatcherCtx, cancel, dc, forwardedHostTrim, addrTrim, url, streamCh)
requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(1)
return connectionContext{stream, cancel, streamCh, changedCh}, nil
return connectionContext{stream, cancel, streamCh}, nil
}
cancel()
return connectionContext{}, err1
return connectionContext{}, err
}
}
return connectionContext{}, err
Expand Down

0 comments on commit d9c6086

Please sign in to comment.