diff --git a/client/client.go b/client/client.go index 92cbd3d523f..8c8299daeab 100644 --- a/client/client.go +++ b/client/client.go @@ -268,6 +268,15 @@ func WithForwardingOption(enableForwarding bool) ClientOption { } } +// WithTSOServerProxyOption configures the client to use TSO server proxy, +// i.e., the client will send TSO requests to the API leader (the TSO server +// proxy) which will forward the requests to the TSO servers. +func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { + return func(c *client) { + c.option.useTSOServerProxy = useTSOServerProxy + } +} + // WithMaxErrorRetry configures the client max retry times when connect meets error. func WithMaxErrorRetry(count int) ClientOption { return func(c *client) { @@ -648,6 +657,11 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { c.Lock() defer c.Unlock() + if c.option.useTSOServerProxy { + // If we are using TSO server proxy, we always use PD_SVC_MODE. + newMode = pdpb.ServiceMode_PD_SVC_MODE + } + if newMode == c.serviceMode { return } diff --git a/client/option.go b/client/option.go index 2a6c285cfb7..0109bfc4ed0 100644 --- a/client/option.go +++ b/client/option.go @@ -51,12 +51,13 @@ const ( // It provides the ability to change some PD client's options online from the outside. type option struct { // Static options. - gRPCDialOptions []grpc.DialOption - timeout time.Duration - maxRetryTimes int - enableForwarding bool - metricsLabels prometheus.Labels - initMetrics bool + gRPCDialOptions []grpc.DialOption + timeout time.Duration + maxRetryTimes int + enableForwarding bool + useTSOServerProxy bool + metricsLabels prometheus.Labels + initMetrics bool // Dynamic options. dynamicOptions [dynamicOptionCount]atomic.Value diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index b4101bda270..3726373779e 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -62,6 +62,7 @@ var ( maxTSOSendIntervalMilliseconds = flag.Int("max-send-interval-ms", 0, "max tso send interval in milliseconds, 60s by default") keyspaceID = flag.Uint("keyspace-id", 0, "the id of the keyspace to access") keyspaceName = flag.String("keyspace-name", "", "the name of the keyspace to access") + useTSOServerProxy = flag.Bool("use-tso-server-proxy", false, "whether send tso requests to tso server proxy instead of tso service directly") wg sync.WaitGroup ) @@ -424,6 +425,9 @@ func createPDClient(ctx context.Context) (pd.Client, error) { ) opts := make([]pd.ClientOption, 0) + if *useTSOServerProxy { + opts = append(opts, pd.WithTSOServerProxyOption(true)) + } opts = append(opts, pd.WithGRPCDialOptions( grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: keepaliveTime,