From d91886f29a272737541bec41e100ea879bc63b44 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Thu, 30 Nov 2023 16:50:48 +0800 Subject: [PATCH] *: support forward GetMinTS (#7482) close tikv/pd#7467 Signed-off-by: Cabinfever_B Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 17 ++++------------- server/grpc_service.go | 11 +++++++---- tests/integrations/mcs/testutil.go | 2 +- tests/integrations/tso/client_test.go | 15 +++++++++++++-- 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index b320be6d3d5..7053ed2be96 100644 --- a/client/client.go +++ b/client/client.go @@ -813,17 +813,6 @@ func (c *client) backupClientConn() (*grpc.ClientConn, string) { return nil, "" } -func (c *client) getClient() pdpb.PDClient { - if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 { - backupClientConn, addr := c.backupClientConn() - if backupClientConn != nil { - log.Debug("[pd] use follower client", zap.String("addr", addr)) - return pdpb.NewPDClient(backupClientConn) - } - } - return c.leaderClient() -} - func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) { if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 { backupClientConn, addr := c.backupClientConn() @@ -892,16 +881,18 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e default: return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode") } - + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) // Call GetMinTS API to get the minimal TS from the API leader. - protoClient := c.getClient() + protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { + cancel() return 0, 0, errs.ErrClientGetProtoClient } resp, err := protoClient.GetMinTS(ctx, &pdpb.GetMinTSRequest{ Header: c.requestHeader(), }) + cancel() if err != nil { if strings.Contains(err.Error(), "Unimplemented") { // If the method is not supported, we fallback to GetTS. diff --git a/server/grpc_service.go b/server/grpc_service.go index b0384a7d629..bb20fcbe484 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -271,10 +271,13 @@ func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoR func (s *GrpcServer) GetMinTS( ctx context.Context, request *pdpb.GetMinTSRequest, ) (*pdpb.GetMinTSResponse, error) { - if err := s.validateRequest(request.GetHeader()); err != nil { - return &pdpb.GetMinTSResponse{ - Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) { + return pdpb.NewPDClient(client).GetMinTS(ctx, request) + } + if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil { + return nil, err + } else if rsp != nil { + return rsp.(*pdpb.GetMinTSResponse), nil } var ( diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index bbedd65209d..d23da905f78 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -106,7 +106,7 @@ func WaitForMultiKeyspacesTSOAvailable( clients := make([]pd.Client, 0, len(keyspaceIDs)) for _, keyspaceID := range keyspaceIDs { - cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints) + cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints, pd.WithForwardingOption(true)) re.NotNil(cli) clients = append(clients, cli) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 8ff20d68f52..b021e73a2f9 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -98,7 +98,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.keyspaceIDs = make([]uint32, 0) if suite.legacy { - client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}) + client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}, pd.WithForwardingOption(true)) re.NoError(err) innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -263,7 +263,9 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { // TestGetMinTS tests the correctness of GetMinTS. func (suite *tsoClientTestSuite) TestGetMinTS() { re := suite.Require() - suite.waitForAllKeyspaceGroupsInServing(re) + if !suite.legacy { + suite.waitForAllKeyspaceGroupsInServing(re) + } var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) @@ -293,6 +295,15 @@ func (suite *tsoClientTestSuite) TestGetMinTS() { } } wg.Wait() + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)")) + time.Sleep(time.Second) + testutil.Eventually(re, func() bool { + var err error + _, _, err = suite.clients[0].GetMinTS(suite.ctx) + return err == nil + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1")) } // More details can be found in this issue: https://github.com/tikv/pd/issues/4884