diff --git a/client/http/client.go b/client/http/client.go index 53da340b10c..184ae2222a7 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -57,6 +57,8 @@ type clientInner struct { ctx context.Context cancel context.CancelFunc + updateMembersInfoNotifier chan struct{} + sync.RWMutex pdAddrs []string leaderAddrIdx int @@ -73,7 +75,12 @@ type clientInner struct { func newClientInner(source string) *clientInner { ctx, cancel := context.WithCancel(context.Background()) - return &clientInner{ctx: ctx, cancel: cancel, leaderAddrIdx: -1, source: source} + return &clientInner{ + ctx: ctx, + cancel: cancel, + updateMembersInfoNotifier: make(chan struct{}, 1), + leaderAddrIdx: -1, + source: source} } func (ci *clientInner) init() { @@ -97,6 +104,13 @@ func (ci *clientInner) close() { } } +func (ci *clientInner) scheduleUpdateMembersInfo() { + select { + case ci.updateMembersInfoNotifier <- struct{}{}: + default: + } +} + // getPDAddrs returns the current PD addresses and the index of the leader address. func (ci *clientInner) getPDAddrs() ([]string, int) { ci.RLock() @@ -158,6 +172,8 @@ func (ci *clientInner) requestWithRetry( if err == nil { return nil } + // Schedule the members info update if the leader request failed to get the latest leader as soon as possible. + ci.scheduleUpdateMembersInfo() log.Debug("[pd] request leader addr failed", zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err)) } @@ -169,11 +185,13 @@ func (ci *clientInner) requestWithRetry( addr = ci.pdAddrs[idx] err = ci.doRequest(ctx, addr, reqInfo, headerOpts...) if err == nil { - break + return nil } log.Debug("[pd] request follower addr failed", zap.String("source", ci.source), zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err)) } + // Schedule the members info update if all the requests failed to get the latest members as soon as possible. + ci.scheduleUpdateMembersInfo() return err } @@ -258,17 +276,17 @@ func (ci *clientInner) doRequest( } func (ci *clientInner) membersInfoUpdater(ctx context.Context) { - ci.updateMembersInfo(ctx) log.Info("[pd] http client member info updater started", zap.String("source", ci.source)) ticker := time.NewTicker(defaultMembersInfoUpdateInterval) defer ticker.Stop() for { + ci.updateMembersInfo(ctx) select { case <-ctx.Done(): log.Info("[pd] http client member info updater stopped", zap.String("source", ci.source)) return case <-ticker.C: - ci.updateMembersInfo(ctx) + case <-ci.updateMembersInfoNotifier: } } }