Skip to content

Commit

Permalink
*: support forward GetMinTS (#7482)
Browse files Browse the repository at this point in the history
close #7467

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CabinfeverB and ti-chi-bot[bot] authored Nov 30, 2023
1 parent 5aacd66 commit d91886f
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 20 deletions.
17 changes: 4 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,17 +813,6 @@ func (c *client) backupClientConn() (*grpc.ClientConn, string) {
return nil, ""
}

func (c *client) getClient() pdpb.PDClient {
if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 {
backupClientConn, addr := c.backupClientConn()
if backupClientConn != nil {
log.Debug("[pd] use follower client", zap.String("addr", addr))
return pdpb.NewPDClient(backupClientConn)
}
}
return c.leaderClient()
}

func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
if c.option.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 {
backupClientConn, addr := c.backupClientConn()
Expand Down Expand Up @@ -892,16 +881,18 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
default:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
}

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
// Call GetMinTS API to get the minimal TS from the API leader.
protoClient := c.getClient()
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
return 0, 0, errs.ErrClientGetProtoClient
}

resp, err := protoClient.GetMinTS(ctx, &pdpb.GetMinTSRequest{
Header: c.requestHeader(),
})
cancel()
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we fallback to GetTS.
Expand Down
11 changes: 7 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,13 @@ func (s *GrpcServer) GetClusterInfo(ctx context.Context, _ *pdpb.GetClusterInfoR
func (s *GrpcServer) GetMinTS(
ctx context.Context, request *pdpb.GetMinTSRequest,
) (*pdpb.GetMinTSResponse, error) {
if err := s.validateRequest(request.GetHeader()); err != nil {
return &pdpb.GetMinTSResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
return pdpb.NewPDClient(client).GetMinTS(ctx, request)
}
if rsp, err := s.unaryMiddleware(ctx, request, fn); err != nil {
return nil, err
} else if rsp != nil {
return rsp.(*pdpb.GetMinTSResponse), nil
}

var (
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func WaitForMultiKeyspacesTSOAvailable(

clients := make([]pd.Client, 0, len(keyspaceIDs))
for _, keyspaceID := range keyspaceIDs {
cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints)
cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints, pd.WithForwardingOption(true))
re.NotNil(cli)
clients = append(clients, cli)

Expand Down
15 changes: 13 additions & 2 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (suite *tsoClientTestSuite) SetupSuite() {
suite.keyspaceIDs = make([]uint32, 0)

if suite.legacy {
client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{})
client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}, pd.WithForwardingOption(true))
re.NoError(err)
innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery })
re.True(ok)
Expand Down Expand Up @@ -263,7 +263,9 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() {
// TestGetMinTS tests the correctness of GetMinTS.
func (suite *tsoClientTestSuite) TestGetMinTS() {
re := suite.Require()
suite.waitForAllKeyspaceGroupsInServing(re)
if !suite.legacy {
suite.waitForAllKeyspaceGroupsInServing(re)
}

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber * len(suite.clients))
Expand Down Expand Up @@ -293,6 +295,15 @@ func (suite *tsoClientTestSuite) TestGetMinTS() {
}
}
wg.Wait()

re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)"))
time.Sleep(time.Second)
testutil.Eventually(re, func() bool {
var err error
_, _, err = suite.clients[0].GetMinTS(suite.ctx)
return err == nil
})
re.NoError(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork1"))
}

// More details can be found in this issue: https://github.com/tikv/pd/issues/4884
Expand Down

0 comments on commit d91886f

Please sign in to comment.