Skip to content

Commit

Permalink
client: use ServiceClient for discovery (#7611)
Browse files Browse the repository at this point in the history
ref #7576

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CabinfeverB and ti-chi-bot[bot] authored Dec 26, 2023
1 parent 8950c3a commit 4d985b2
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 204 deletions.
103 changes: 6 additions & 97 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,22 @@ package pd
import (
"context"
"fmt"
"math/rand"
"runtime/trace"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -217,9 +210,6 @@ func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
var LeaderHealthCheckInterval = time.Second

var (
// errUnmatchedClusterID is returned when found a PD with a different cluster ID.
errUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
Expand Down Expand Up @@ -316,7 +306,6 @@ type client struct {

// For internal usage.
updateTokenConnectionCh chan struct{}
leaderNetworkFailure int32

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -575,10 +564,6 @@ func (c *client) setup() error {

// Create dispatchers
c.createTokenDispatcher()

// Start the daemons.
c.wg.Add(1)
go c.leaderCheckLoop()
return nil
}

Expand Down Expand Up @@ -719,46 +704,6 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error {
return nil
}

func (c *client) leaderCheckLoop() {
defer c.wg.Done()

leaderCheckLoopCtx, leaderCheckLoopCancel := context.WithCancel(c.ctx)
defer leaderCheckLoopCancel()

ticker := time.NewTicker(LeaderHealthCheckInterval)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
c.checkLeaderHealth(leaderCheckLoopCtx)
}
}
}

func (c *client) checkLeaderHealth(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
healthCli := healthpb.NewHealthClient(client)
resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
failpoint.Inject("unreachableNetwork1", func() {
resp = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
rpcErr, ok := status.FromError(err)
if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0))
}
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
}
}

func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
start := time.Now()
defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }()
Expand All @@ -778,50 +723,14 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
return resp.GetMembers(), nil
}

// leaderClient gets the client of current PD leader.
func (c *client) leaderClient() pdpb.PDClient {
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
return pdpb.NewPDClient(client)
}
return nil
}

// backupClientConn gets a grpc client connection of the current reachable and healthy
// backup service endpoints randomly. Backup service endpoints are followers in a
// quorum-based cluster or secondaries in a primary/secondary configured cluster.
func (c *client) backupClientConn() (*grpc.ClientConn, string) {
addrs := c.pdSvcDiscovery.GetBackupAddrs()
if len(addrs) < 1 {
return nil, ""
}
var (
cc *grpc.ClientConn
err error
)
for i := 0; i < len(addrs); i++ {
addr := addrs[rand.Intn(len(addrs))]
if cc, err = c.pdSvcDiscovery.GetOrCreateGRPCConn(addr); err != nil {
continue
}
healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout)
resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
return cc, addr
}
}
return nil, ""
}

// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 {
backupClientConn, addr := c.backupClientConn()
if backupClientConn != nil {
log.Debug("[pd] use follower client", zap.String("addr", addr))
return pdpb.NewPDClient(backupClientConn), grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
}
serviceClient := c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
return nil, ctx
}
return c.leaderClient(), ctx
return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true)
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
Expand Down
Loading

0 comments on commit 4d985b2

Please sign in to comment.