From 4d985b2a24a1431900997db9b699d6031a26676a Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Tue, 26 Dec 2023 16:13:57 +0800 Subject: [PATCH] client: use ServiceClient for discovery (#7611) ref tikv/pd#7576 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 103 +--------- client/pd_service_discovery.go | 251 +++++++++++++++++++---- client/tso_service_discovery.go | 5 + tests/integrations/client/client_test.go | 150 +++++++------- 4 files changed, 305 insertions(+), 204 deletions(-) diff --git a/client/client.go b/client/client.go index 0ae362d06a8..4e03e5e3507 100644 --- a/client/client.go +++ b/client/client.go @@ -17,29 +17,22 @@ package pd import ( "context" "fmt" - "math/rand" "runtime/trace" "strings" "sync" - "sync/atomic" "time" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" - "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/tlsutil" "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - healthpb "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/status" ) const ( @@ -217,9 +210,6 @@ func WithAllowFollowerHandle() GetRegionOption { return func(op *GetRegionOp) { op.allowFollowerHandle = true } } -// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. -var LeaderHealthCheckInterval = time.Second - var ( // errUnmatchedClusterID is returned when found a PD with a different cluster ID. errUnmatchedClusterID = errors.New("[pd] unmatched cluster id") @@ -316,7 +306,6 @@ type client struct { // For internal usage. updateTokenConnectionCh chan struct{} - leaderNetworkFailure int32 ctx context.Context cancel context.CancelFunc @@ -575,10 +564,6 @@ func (c *client) setup() error { // Create dispatchers c.createTokenDispatcher() - - // Start the daemons. - c.wg.Add(1) - go c.leaderCheckLoop() return nil } @@ -719,46 +704,6 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error { return nil } -func (c *client) leaderCheckLoop() { - defer c.wg.Done() - - leaderCheckLoopCtx, leaderCheckLoopCancel := context.WithCancel(c.ctx) - defer leaderCheckLoopCancel() - - ticker := time.NewTicker(LeaderHealthCheckInterval) - defer ticker.Stop() - - for { - select { - case <-c.ctx.Done(): - return - case <-ticker.C: - c.checkLeaderHealth(leaderCheckLoopCtx) - } - } -} - -func (c *client) checkLeaderHealth(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) - defer cancel() - if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { - healthCli := healthpb.NewHealthClient(client) - resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) - failpoint.Inject("unreachableNetwork1", func() { - resp = nil - err = status.New(codes.Unavailable, "unavailable").Err() - }) - rpcErr, ok := status.FromError(err) - if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING { - atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1)) - } else { - atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0)) - } - } else { - atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1)) - } -} - func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { start := time.Now() defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() @@ -778,50 +723,14 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { return resp.GetMembers(), nil } -// leaderClient gets the client of current PD leader. -func (c *client) leaderClient() pdpb.PDClient { - if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil { - return pdpb.NewPDClient(client) - } - return nil -} - -// backupClientConn gets a grpc client connection of the current reachable and healthy -// backup service endpoints randomly. Backup service endpoints are followers in a -// quorum-based cluster or secondaries in a primary/secondary configured cluster. -func (c *client) backupClientConn() (*grpc.ClientConn, string) { - addrs := c.pdSvcDiscovery.GetBackupAddrs() - if len(addrs) < 1 { - return nil, "" - } - var ( - cc *grpc.ClientConn - err error - ) - for i := 0; i < len(addrs); i++ { - addr := addrs[rand.Intn(len(addrs))] - if cc, err = c.pdSvcDiscovery.GetOrCreateGRPCConn(addr); err != nil { - continue - } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) - resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) - healthCancel() - if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { - return cc, addr - } - } - return nil, "" -} - +// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns +// follower pd client and the context which holds forward information. func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) { - if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 { - backupClientConn, addr := c.backupClientConn() - if backupClientConn != nil { - log.Debug("[pd] use follower client", zap.String("addr", addr)) - return pdpb.NewPDClient(backupClientConn), grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - } + serviceClient := c.pdSvcDiscovery.GetServiceClient() + if serviceClient == nil { + return nil, ctx } - return c.leaderClient(), ctx + return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true) } func (c *client) GetTSAsync(ctx context.Context) TSFuture { diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 33dda1ad282..f9bdf888b9f 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -46,6 +46,16 @@ const ( updateMemberBackOffBaseTime = 100 * time.Millisecond ) +// MemberHealthCheckInterval might be changed in the unit to shorten the testing time. +var MemberHealthCheckInterval = time.Second + +type apiKind int + +const ( + forwardAPIKind apiKind = iota + apiKindCount +) + type serviceType int const ( @@ -81,6 +91,10 @@ type ServiceDiscovery interface { // endpoints. Backup service endpoints are followers in a quorum-based cluster or // secondaries in a primary/secondary configured cluster. GetBackupAddrs() []string + // GetServiceClient tries to get the leader/primary ServiceClient. + // If the leader ServiceClient meets network problem, + // it returns a follower/secondary ServiceClient which can forward the request to leader. + GetServiceClient() ServiceClient // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) // ScheduleCheckMemberChanged is used to trigger a check to see if there is any membership change @@ -134,12 +148,16 @@ type pdServiceClient struct { } func newPDServiceClient(addr, leaderAddr string, conn *grpc.ClientConn, isLeader bool) ServiceClient { - return &pdServiceClient{ + cli := &pdServiceClient{ addr: addr, conn: conn, isLeader: isLeader, leaderAddr: leaderAddr, } + if conn == nil { + cli.networkFailure.Store(true) + } + return cli } // GetAddress implements ServiceClient. @@ -150,7 +168,7 @@ func (c *pdServiceClient) GetAddress() string { return c.addr } -// BuildGRPCContext implements ServiceClient. +// BuildGRPCTargetContext implements ServiceClient. func (c *pdServiceClient) BuildGRPCTargetContext(ctx context.Context, toLeader bool) context.Context { if c == nil || c.isLeader { return ctx @@ -169,7 +187,7 @@ func (c *pdServiceClient) IsConnectedToLeader() bool { return c.isLeader } -// NetworkAvailable implements ServiceClient. +// Available implements ServiceClient. func (c *pdServiceClient) Available() bool { if c == nil { return false @@ -183,9 +201,11 @@ func (c *pdServiceClient) checkNetworkAvailable(ctx context.Context) { } healthCli := healthpb.NewHealthClient(c.conn) resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) - failpoint.Inject("unreachableNetwork1", func() { - resp = nil - err = status.New(codes.Unavailable, "unavailable").Err() + failpoint.Inject("unreachableNetwork1", func(val failpoint.Value) { + if val, ok := val.(string); (ok && val == c.GetAddress()) || !ok { + resp = nil + err = status.New(codes.Unavailable, "unavailable").Err() + } }) rpcErr, ok := status.FromError(err) if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING { @@ -217,6 +237,10 @@ func (c *pdServiceClient) NeedRetry(pdErr *pdpb.Error, err error) bool { type errFn func(pdErr *pdpb.Error) bool +func emptyErrorFn(pdErr *pdpb.Error) bool { + return false +} + func regionAPIErrorFn(pdErr *pdpb.Error) bool { return pdErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND } @@ -243,6 +267,7 @@ func (c *pdServiceAPIClient) Available() bool { return c.ServiceClient.Available() && !c.unavailable.Load() } +// markAsAvailable is used to try to mark the client as available if unavailable status is expired. func (c *pdServiceAPIClient) markAsAvailable() { if !c.unavailable.Load() { return @@ -273,7 +298,7 @@ func (c *pdServiceAPIClient) NeedRetry(pdErr *pdpb.Error, err error) bool { // pdServiceBalancerNode is a balancer node for PD service. // It extends the pdServiceClient and adds additional fields for the next polling client in the chain. type pdServiceBalancerNode struct { - ServiceClient + *pdServiceAPIClient next *pdServiceBalancerNode } @@ -283,8 +308,14 @@ type pdServiceBalancer struct { mu sync.Mutex now *pdServiceBalancerNode totalNode int + errFn errFn } +func newPDServiceBalancer(fn errFn) *pdServiceBalancer { + return &pdServiceBalancer{ + errFn: fn, + } +} func (c *pdServiceBalancer) set(clients []ServiceClient) { c.mu.Lock() defer c.mu.Unlock() @@ -293,14 +324,14 @@ func (c *pdServiceBalancer) set(clients []ServiceClient) { } c.totalNode = len(clients) head := &pdServiceBalancerNode{ - ServiceClient: clients[0], + pdServiceAPIClient: newPDServiceAPIClient(clients[0], c.errFn).(*pdServiceAPIClient), } head.next = head last := head for i := 1; i < c.totalNode; i++ { next := &pdServiceBalancerNode{ - ServiceClient: clients[i], - next: head, + pdServiceAPIClient: newPDServiceAPIClient(clients[i], c.errFn).(*pdServiceAPIClient), + next: head, } head = next last.next = head @@ -308,6 +339,15 @@ func (c *pdServiceBalancer) set(clients []ServiceClient) { c.now = head } +func (c *pdServiceBalancer) check() { + c.mu.Lock() + defer c.mu.Unlock() + for i := 0; i < c.totalNode; i++ { + c.now.markAsAvailable() + c.next() + } +} + func (c *pdServiceBalancer) next() { c.now = c.now.next } @@ -352,9 +392,12 @@ type pdServiceDiscovery struct { urls atomic.Value // Store as []string // PD leader URL - leader atomic.Value // Store as string + leader atomic.Value // Store as pdServiceClient // PD follower URLs - followers atomic.Value // Store as []string + followers sync.Map // Store as map[string]pdServiceClient + apiCandidateNodes [apiKindCount]*pdServiceBalancer + // PD follower URLs. Only for tso. + followerAddresses atomic.Value // Store as []string clusterID uint64 // addr -> a gRPC connection @@ -402,6 +445,7 @@ func newPDServiceDiscovery( ctx: ctx, cancel: cancel, wg: wg, + apiCandidateNodes: [apiKindCount]*pdServiceBalancer{newPDServiceBalancer(emptyErrorFn)}, serviceModeUpdateCb: serviceModeUpdateCb, updateKeyspaceIDCb: updateKeyspaceIDCb, keyspaceID: keyspaceID, @@ -439,9 +483,10 @@ func (c *pdServiceDiscovery) Init() error { log.Warn("[pd] failed to check service mode and will check later", zap.Error(err)) } - c.wg.Add(2) + c.wg.Add(3) go c.updateMemberLoop() go c.updateServiceModeLoop() + go c.memberHealthCheckLoop() c.isInitialized = true return nil @@ -518,6 +563,46 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() { } } } +func (c *pdServiceDiscovery) memberHealthCheckLoop() { + defer c.wg.Done() + + memberCheckLoopCtx, memberCheckLoopCancel := context.WithCancel(c.ctx) + defer memberCheckLoopCancel() + + ticker := time.NewTicker(MemberHealthCheckInterval) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.checkLeaderHealth(memberCheckLoopCtx) + c.checkFollowerHealth(memberCheckLoopCtx) + } + } +} + +func (c *pdServiceDiscovery) checkLeaderHealth(ctx context.Context) { + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + defer cancel() + leader := c.getLeaderServiceClient() + leader.checkNetworkAvailable(ctx) +} + +func (c *pdServiceDiscovery) checkFollowerHealth(ctx context.Context) { + c.followers.Range(func(key, value any) bool { + // To ensure that the leader's healthy check is not delayed, shorten the duration. + ctx, cancel := context.WithTimeout(ctx, MemberHealthCheckInterval/3) + defer cancel() + serviceClient := value.(*pdServiceClient) + serviceClient.checkNetworkAvailable(ctx) + return true + }) + for _, balancer := range c.apiCandidateNodes { + balancer.check() + } +} // Close releases all resources. func (c *pdServiceDiscovery) Close() { @@ -606,12 +691,45 @@ func (c *pdServiceDiscovery) GetServingAddr() string { return c.getLeaderAddr() } -// GetBackupAddrs gets the addresses of the current reachable and healthy followers -// in a quorum-based cluster. +// GetBackupAddrs gets the addresses of the current reachable followers +// in a quorum-based cluster. Used for tso currently. func (c *pdServiceDiscovery) GetBackupAddrs() []string { return c.getFollowerAddrs() } +// getLeaderServiceClient returns the leader ServiceClient. +func (c *pdServiceDiscovery) getLeaderServiceClient() *pdServiceClient { + leader := c.leader.Load() + if leader == nil { + return nil + } + return leader.(*pdServiceClient) +} + +// getServiceClientByKind returns ServiceClient of the specific kind. +func (c *pdServiceDiscovery) getServiceClientByKind(kind apiKind) ServiceClient { + client := c.apiCandidateNodes[kind].get() + if client == nil { + return nil + } + return client +} + +// GetServiceClient returns the leader/primary ServiceClient if it is healthy. +func (c *pdServiceDiscovery) GetServiceClient() ServiceClient { + leaderClient := c.getLeaderServiceClient() + if c.option.enableForwarding && !leaderClient.Available() { + if followerClient := c.getServiceClientByKind(forwardAPIKind); followerClient != nil { + log.Debug("[pd] use follower client", zap.String("addr", followerClient.GetAddress())) + return followerClient + } + } + if leaderClient == nil { + return nil + } + return leaderClient +} + // ScheduleCheckMemberChanged is used to check if there is any membership // change among the leader and the followers. func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() { @@ -657,16 +775,12 @@ func (c *pdServiceDiscovery) SetTSOGlobalServAddrUpdatedCallback(callback tsoGlo // getLeaderAddr returns the leader address. func (c *pdServiceDiscovery) getLeaderAddr() string { - leaderAddr := c.leader.Load() - if leaderAddr == nil { - return "" - } - return leaderAddr.(string) + return c.getLeaderServiceClient().GetAddress() } // getFollowerAddrs returns the follower address. func (c *pdServiceDiscovery) getFollowerAddrs() []string { - followerAddrs := c.followers.Load() + followerAddrs := c.followerAddresses.Load() if followerAddrs == nil { return []string{} } @@ -764,8 +878,7 @@ func (c *pdServiceDiscovery) updateMember() error { } c.updateURLs(members.GetMembers()) - c.updateFollowers(members.GetMembers(), members.GetLeader()) - if err := c.switchLeader(members.GetLeader().GetClientUrls()); err != nil { + if err := c.updateServiceClient(members.GetMembers(), members.GetLeader()); err != nil { return err } @@ -837,42 +950,104 @@ func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) { log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) } -func (c *pdServiceDiscovery) switchLeader(addrs []string) error { +func (c *pdServiceDiscovery) switchLeader(addrs []string) (bool, error) { // FIXME: How to safely compare leader urls? For now, only allows one client url. addr := addrs[0] - oldLeader := c.getLeaderAddr() - if addr == oldLeader { - return nil + oldLeader := c.getLeaderServiceClient() + if addr == oldLeader.GetAddress() && oldLeader.GetClientConn() != nil { + return false, nil } - if _, err := c.GetOrCreateGRPCConn(addr); err != nil { - log.Warn("[pd] failed to connect leader", zap.String("leader", addr), errs.ZapError(err)) + newConn, err := c.GetOrCreateGRPCConn(addr) + // If gRPC connect is created successfully or leader is new, still saves. + if addr != oldLeader.GetAddress() || newConn != nil { + // Set PD leader and Global TSO Allocator (which is also the PD leader) + leaderClient := newPDServiceClient(addr, addr, newConn, true) + c.leader.Store(leaderClient) } - // Set PD leader and Global TSO Allocator (which is also the PD leader) - c.leader.Store(addr) // Run callbacks if c.tsoGlobalAllocLeaderUpdatedCb != nil { if err := c.tsoGlobalAllocLeaderUpdatedCb(addr); err != nil { - return err + return true, err } } for _, cb := range c.leaderSwitchedCbs { cb() } - log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader)) - return nil + log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader.GetAddress())) + return true, err } -func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leader *pdpb.Member) { - var addrs []string +func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leader *pdpb.Member) (changed bool) { + followers := make(map[string]*pdServiceClient) + c.followers.Range(func(key, value any) bool { + followers[key.(string)] = value.(*pdServiceClient) + return true + }) + var followerAddrs []string for _, member := range members { if member.GetMemberId() != leader.GetMemberId() { if len(member.GetClientUrls()) > 0 { - addrs = append(addrs, member.GetClientUrls()...) + followerAddrs = append(followerAddrs, member.GetClientUrls()...) + + // FIXME: How to safely compare urls(also for leader)? For now, only allows one client url. + addr := member.GetClientUrls()[0] + if client, ok := c.followers.Load(addr); ok { + if client.(*pdServiceClient).GetClientConn() == nil { + conn, err := c.GetOrCreateGRPCConn(addr) + if err != nil || conn == nil { + log.Warn("[pd] failed to connect follower", zap.String("follower", addr), errs.ZapError(err)) + continue + } + follower := newPDServiceClient(addr, leader.GetClientUrls()[0], conn, false) + c.followers.Store(addr, follower) + changed = true + } + delete(followers, addr) + } else { + changed = true + conn, err := c.GetOrCreateGRPCConn(addr) + follower := newPDServiceClient(addr, leader.GetClientUrls()[0], conn, false) + if err != nil || conn == nil { + log.Warn("[pd] failed to connect follower", zap.String("follower", addr), errs.ZapError(err)) + } + c.followers.LoadOrStore(addr, follower) + } } } } - c.followers.Store(addrs) + if len(followers) > 0 { + changed = true + for key := range followers { + c.followers.Delete(key) + } + } + c.followerAddresses.Store(followerAddrs) + return +} + +func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader *pdpb.Member) error { + leaderChanged, err := c.switchLeader(leader.GetClientUrls()) + followerChanged := c.updateFollowers(members, leader) + // don't need to recreate balancer if no changess. + if !followerChanged && !leaderChanged { + return err + } + // If error is not nil, still updates candidates. + clients := make([]ServiceClient, 0) + c.followers.Range(func(_, value any) bool { + clients = append(clients, value.(*pdServiceClient)) + return true + }) + leaderClient := c.getLeaderServiceClient() + if leaderClient != nil { + clients = append(clients, leaderClient) + } + // create candidate services for all kinds of request. + for i := 0; i < int(apiKindCount); i++ { + c.apiCandidateNodes[i].set(clients) + } + return err } func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*pdpb.Member) error { diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 5f14c406797..7caaacd0dfe 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -373,6 +373,11 @@ func (c *tsoServiceDiscovery) SetTSOGlobalServAddrUpdatedCallback(callback tsoGl c.globalAllocPrimariesUpdatedCb = callback } +// GetServiceClient implements ServiceDiscovery +func (c *tsoServiceDiscovery) GetServiceClient() ServiceClient { + return c.apiSvcDiscovery.GetServiceClient() +} + // getPrimaryAddr returns the primary address. func (c *tsoServiceDiscovery) getPrimaryAddr() string { c.keyspaceGroupSD.RLock() diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 1fd8d75dec4..e1e841342ad 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -518,18 +518,69 @@ func TestCustomTimeout(t *testing.T) { re.Less(time.Since(start), 2*time.Second) } -func TestGetRegionByFollowerForwarding(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - pd.LeaderHealthCheckInterval = 100 * time.Millisecond - cluster, err := tests.NewTestCluster(ctx, 3) +type followerForwardAndHandleTestSuite struct { + suite.Suite + ctx context.Context + clean context.CancelFunc + + cluster *tests.TestCluster + endpoints []string + regionID uint64 +} + +func TestFollowerForwardAndHandleTestSuite(t *testing.T) { + suite.Run(t, new(followerForwardAndHandleTestSuite)) +} + +func (suite *followerForwardAndHandleTestSuite) SetupSuite() { + re := suite.Require() + suite.ctx, suite.clean = context.WithCancel(context.Background()) + pd.MemberHealthCheckInterval = 100 * time.Millisecond + cluster, err := tests.NewTestCluster(suite.ctx, 3) re.NoError(err) - defer cluster.Destroy() + suite.cluster = cluster + suite.endpoints = runServer(re, cluster) + cluster.WaitLeader() + leader := cluster.GetLeaderServer() + grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) + suite.regionID = regionIDAllocator.alloc() + testutil.Eventually(re, func() bool { + regionHeartbeat, err := grpcPDClient.RegionHeartbeat(suite.ctx) + re.NoError(err) + region := &metapb.Region{ + Id: suite.regionID, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + Peers: peers, + } + req := &pdpb.RegionHeartbeatRequest{ + Header: newHeader(leader.GetServer()), + Region: region, + Leader: peers[0], + } + err = regionHeartbeat.Send(req) + re.NoError(err) + _, err = regionHeartbeat.Recv() + return err == nil + }) +} - endpoints := runServer(re, cluster) - cli := setupCli(re, ctx, endpoints, pd.WithForwardingOption(true)) +func (suite *followerForwardAndHandleTestSuite) TearDownTest() { +} + +func (suite *followerForwardAndHandleTestSuite) TearDownSuite() { + suite.cluster.Destroy() + suite.clean() +} + +func (suite *followerForwardAndHandleTestSuite) TestGetRegionByFollowerForwarding() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) + defer cancel() + cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true)) re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)")) time.Sleep(200 * time.Millisecond) r, err := cli.GetRegion(context.Background(), []byte("a")) @@ -544,17 +595,11 @@ func TestGetRegionByFollowerForwarding(t *testing.T) { } // case 1: unreachable -> normal -func TestGetTsoByFollowerForwarding1(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) +func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - pd.LeaderHealthCheckInterval = 100 * time.Millisecond - cluster, err := tests.NewTestCluster(ctx, 3) - re.NoError(err) - defer cluster.Destroy() - - endpoints := runServer(re, cluster) - cli := setupCli(re, ctx, endpoints, pd.WithForwardingOption(true)) + cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true)) re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) var lastTS uint64 @@ -564,7 +609,7 @@ func TestGetTsoByFollowerForwarding1(t *testing.T) { lastTS = tsoutil.ComposeTS(physical, logical) return true } - t.Log(err) + suite.T().Log(err) return false }) @@ -575,17 +620,11 @@ func TestGetTsoByFollowerForwarding1(t *testing.T) { } // case 2: unreachable -> leader transfer -> normal -func TestGetTsoByFollowerForwarding2(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) +func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - pd.LeaderHealthCheckInterval = 100 * time.Millisecond - cluster, err := tests.NewTestCluster(ctx, 3) - re.NoError(err) - defer cluster.Destroy() - - endpoints := runServer(re, cluster) - cli := setupCli(re, ctx, endpoints, pd.WithForwardingOption(true)) + cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true)) re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) var lastTS uint64 @@ -595,13 +634,13 @@ func TestGetTsoByFollowerForwarding2(t *testing.T) { lastTS = tsoutil.ComposeTS(physical, logical) return true } - t.Log(err) + suite.T().Log(err) return false }) lastTS = checkTS(re, cli, lastTS) - re.NoError(cluster.GetLeaderServer().ResignLeader()) - re.NotEmpty(cluster.WaitLeader()) + re.NoError(suite.cluster.GetLeaderServer().ResignLeader()) + re.NotEmpty(suite.cluster.WaitLeader()) lastTS = checkTS(re, cli, lastTS) re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork")) @@ -610,45 +649,18 @@ func TestGetTsoByFollowerForwarding2(t *testing.T) { } // case 3: network partition between client and follower A -> transfer leader to follower A -> normal -func TestGetTsoAndRegionByFollowerForwarding(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) +func (suite *followerForwardAndHandleTestSuite) TestGetTsoAndRegionByFollowerForwarding() { + re := suite.Require() + ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - pd.LeaderHealthCheckInterval = 100 * time.Millisecond - cluster, err := tests.NewTestCluster(ctx, 3) - re.NoError(err) - defer cluster.Destroy() - endpoints := runServer(re, cluster) - re.NotEmpty(cluster.WaitLeader()) + cluster := suite.cluster leader := cluster.GetLeaderServer() - grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr()) - testutil.Eventually(re, func() bool { - regionHeartbeat, err := grpcPDClient.RegionHeartbeat(ctx) - re.NoError(err) - regionID := regionIDAllocator.alloc() - region := &metapb.Region{ - Id: regionID, - RegionEpoch: &metapb.RegionEpoch{ - ConfVer: 1, - Version: 1, - }, - Peers: peers, - } - req := &pdpb.RegionHeartbeatRequest{ - Header: newHeader(leader.GetServer()), - Region: region, - Leader: peers[0], - } - err = regionHeartbeat.Send(req) - re.NoError(err) - _, err = regionHeartbeat.Recv() - return err == nil - }) + follower := cluster.GetServer(cluster.GetFollower()) re.NoError(failpoint.Enable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr()))) - cli := setupCli(re, ctx, endpoints, pd.WithForwardingOption(true)) + cli := setupCli(re, ctx, suite.endpoints, pd.WithForwardingOption(true)) var lastTS uint64 testutil.Eventually(re, func() bool { physical, logical, err := cli.GetTS(context.TODO()) @@ -656,7 +668,7 @@ func TestGetTsoAndRegionByFollowerForwarding(t *testing.T) { lastTS = tsoutil.ComposeTS(physical, logical) return true } - t.Log(err) + suite.T().Log(err) return false }) lastTS = checkTS(re, cli, lastTS) @@ -672,7 +684,7 @@ func TestGetTsoAndRegionByFollowerForwarding(t *testing.T) { lastTS = tsoutil.ComposeTS(physical, logical) return true } - t.Log(err) + suite.T().Log(err) return false }) lastTS = checkTS(re, cli, lastTS) @@ -691,7 +703,7 @@ func TestGetTsoAndRegionByFollowerForwarding(t *testing.T) { lastTS = tsoutil.ComposeTS(physical, logical) return true } - t.Log(err) + suite.T().Log(err) return false }) lastTS = checkTS(re, cli, lastTS)