Skip to content

Commit

Permalink
Merge branch 'master' into client/backoffer_append_err
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Mar 28, 2024
2 parents afb5af2 + bc92c13 commit 9a554a4
Show file tree
Hide file tree
Showing 102 changed files with 2,956 additions and 1,000 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ on:
- release-5.*
- release-6.*
- release-7.*
- release-8.*
pull_request:
branches:
- master
- release-4.0
- release-5.*
- release-6.*
- release-7.*
- release-8.*
concurrency:
group: ${{ github.ref }}-${{ github.workflow }}
cancel-in-progress: true
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/tso-function-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ on:
- release-5.*
- release-6.*
- release-7.*
- release-8.*
pull_request:
branches:
- master
- release-5.*
- release-6.*
- release-7.*
- release-8.*
concurrency:
group: ${{ github.ref }}-${{ github.workflow }}
cancel-in-progress: true
Expand Down
101 changes: 72 additions & 29 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type Client interface {
GetClusterID(ctx context.Context) uint64
// GetAllMembers gets the members Info from PD
GetAllMembers(ctx context.Context) ([]*pdpb.Member, error)
// GetLeaderAddr returns current leader's address. It returns "" before
// GetLeaderURL returns current leader's URL. It returns "" before
// syncing leader from server.
GetLeaderAddr() string
GetLeaderURL() string
// GetRegion gets a region and its leader Peer from PD by key.
// The region may expire after split. Caller is responsible for caching and
// taking care of region change.
Expand Down Expand Up @@ -575,7 +575,7 @@ func (c *client) setup() error {
}

// Register callbacks
c.pdSvcDiscovery.AddServingAddrSwitchedCallback(c.scheduleUpdateTokenConnection)
c.pdSvcDiscovery.AddServingURLSwitchedCallback(c.scheduleUpdateTokenConnection)

// Create dispatchers
c.createTokenDispatcher()
Expand Down Expand Up @@ -606,12 +606,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
log.Info("[pd] changing service mode",
zap.String("old-mode", c.serviceMode.String()),
zap.String("new-mode", newMode.String()))
c.resetTSOClientLocked(newMode)
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

// Reset a new TSO client.
func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) {
// Re-create a new TSO client.
var (
newTSOCli *tsoClient
newTSOSvcDiscovery ServiceDiscovery
)
switch newMode {
switch mode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
Expand Down Expand Up @@ -649,11 +659,6 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
// We are switching from API service mode to PD service mode, so delete the old tso microservice discovery.
oldTSOSvcDiscovery.Close()
}
oldMode := c.serviceMode
c.serviceMode = newMode
log.Info("[pd] service mode changed",
zap.String("old-mode", oldMode.String()),
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
Expand All @@ -662,6 +667,13 @@ func (c *client) getTSOClient() *tsoClient {
return c.tsoClient
}

// ResetTSOClient resets the TSO client, only for test.
func (c *client) ResetTSOClient() {
c.Lock()
defer c.Unlock()
c.resetTSOClientLocked(c.serviceMode)
}

func (c *client) getServiceMode() pdpb.ServiceMode {
c.RLock()
defer c.RUnlock()
Expand All @@ -680,9 +692,9 @@ func (c *client) GetClusterID(context.Context) uint64 {
return c.pdSvcDiscovery.GetClusterID()
}

// GetLeaderAddr returns the leader address.
func (c *client) GetLeaderAddr() string {
return c.pdSvcDiscovery.GetServingAddr()
// GetLeaderURL returns the leader URL.
func (c *client) GetLeaderURL() string {
return c.pdSvcDiscovery.GetServingURL()
}

// GetServiceDiscovery returns the client-side service discovery object
Expand Down Expand Up @@ -745,7 +757,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
// follower pd client and the context which holds forward information.
func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
serviceClient := c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true)
Expand All @@ -762,7 +774,7 @@ func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower
}
}
serviceClient = c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
Expand All @@ -779,26 +791,52 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
defer span.Finish()
}

req := c.getTSORequest(ctx, dcLocation)
if err := c.dispatchTSORequestWithRetry(req); err != nil {
req.tryDone(err)
}
return req
}

func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest {
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
// Set needed fields in the request before using it.
req.start = time.Now()
req.clientCtx = c.ctx
req.requestCtx = ctx
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
return req
}

if tsoClient == nil {
req.done <- errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
return req
}
const (
dispatchRetryDelay = 50 * time.Millisecond
dispatchRetryCount = 2
)

if err := tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
// Wait for a while and try again
time.Sleep(50 * time.Millisecond)
if err = tsoClient.dispatchRequest(ctx, dcLocation, req); err != nil {
req.done <- err
func (c *client) dispatchTSORequestWithRetry(req *tsoRequest) error {
var (
retryable bool
err error
)
for i := 0; i < dispatchRetryCount; i++ {
// Do not delay for the first time.
if i > 0 {
time.Sleep(dispatchRetryDelay)
}
// Get the tsoClient each time, as it may be initialized or switched during the process.
tsoClient := c.getTSOClient()
if tsoClient == nil {
err = errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
continue
}
retryable, err = tsoClient.dispatchRequest(req)
if !retryable {
break
}
}
return req
return err
}

func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
Expand Down Expand Up @@ -1402,9 +1440,14 @@ func IsLeaderChange(err error) bool {
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
)

func trimHTTPPrefix(str string) string {
str = strings.TrimPrefix(str, "http://")
str = strings.TrimPrefix(str, "https://")
str = strings.TrimPrefix(str, httpSchemePrefix)
str = strings.TrimPrefix(str, httpsSchemePrefix)
return str
}

Expand Down
6 changes: 6 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func TestUpdateURLs(t *testing.T) {
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members)
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[2:])
re.Equal(getURLs([]*pdpb.Member{members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[3:])
re.Equal(getURLs([]*pdpb.Member{members[3]}), cli.GetServiceURLs())
}

const testClientURL = "tmp://test.url:5255"
Expand Down
7 changes: 4 additions & 3 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

Expand All @@ -54,7 +55,7 @@ const (
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...grpc.DialOption) (*grpc.ClientConn, error) {
opt := grpc.WithInsecure() //nolint
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
if tlsCfg != nil {
creds := credentials.NewTLS(tlsCfg)
opt = grpc.WithTransportCredentials(creds)
Expand Down Expand Up @@ -83,8 +84,8 @@ func GetClientConn(ctx context.Context, addr string, tlsCfg *tls.Config, do ...g

// BuildForwardContext creates a context with receiver metadata information.
// It is used in client side.
func BuildForwardContext(ctx context.Context, addr string) context.Context {
md := metadata.Pairs(ForwardMetadataKey, addr)
func BuildForwardContext(ctx context.Context, url string) context.Context {
md := metadata.Pairs(ForwardMetadataKey, url)
return metadata.NewOutgoingContext(ctx, md)
}

Expand Down
13 changes: 13 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ const (
operators = "/pd/api/v1/operators"
// Micro Service
microServicePrefix = "/pd/api/v2/ms"
// Keyspace
KeyspaceConfig = "/pd/api/v2/keyspaces/%s/config"
GetKeyspaceMetaByName = "/pd/api/v2/keyspaces/%s"
)

// RegionByID returns the path of PD HTTP API to get region by ID.
Expand Down Expand Up @@ -201,3 +204,13 @@ func MicroServiceMembers(service string) string {
func MicroServicePrimary(service string) string {
return fmt.Sprintf("%s/primary/%s", microServicePrefix, service)
}

// GetUpdateKeyspaceConfigURL returns the path of PD HTTP API to update keyspace config.
func GetUpdateKeyspaceConfigURL(keyspaceName string) string {
return fmt.Sprintf(KeyspaceConfig, keyspaceName)
}

// GetKeyspaceMetaByNameURL returns the path of PD HTTP API to get keyspace meta by keyspace name.
func GetKeyspaceMetaByNameURL(keyspaceName string) string {
return fmt.Sprintf(GetKeyspaceMetaByName, keyspaceName)
}
27 changes: 16 additions & 11 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ func (ci *clientInner) requestWithRetry(
return errs.ErrClientNoAvailableMember
}
for _, cli := range clients {
addr := cli.GetHTTPAddress()
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
url := cli.GetURL()
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request addr failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("addr", addr), zap.Error(err))
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
}
return err
}
Expand All @@ -160,19 +160,19 @@ func noNeedRetry(statusCode int) bool {

func (ci *clientInner) doRequest(
ctx context.Context,
addr string, reqInfo *requestInfo,
url string, reqInfo *requestInfo,
headerOpts ...HeaderOption,
) (int, error) {
var (
source = ci.source
callerID = reqInfo.callerID
name = reqInfo.name
url = reqInfo.getURL(addr)
method = reqInfo.method
body = reqInfo.body
res = reqInfo.res
respHandler = reqInfo.respHandler
)
url = reqInfo.getURL(url)
logFields := []zap.Field{
zap.String("source", source),
zap.String("name", name),
Expand Down Expand Up @@ -305,7 +305,8 @@ func NewClient(
}
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init service discovery failed", zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
log.Error("[pd] init service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
return nil
}
c.inner.init(sd)
Expand Down Expand Up @@ -382,9 +383,8 @@ func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client {
}
}

// newClientWithoutInitServiceDiscovery creates a PD HTTP client
// with the given PD addresses and TLS config without init service discovery.
func newClientWithoutInitServiceDiscovery(
// newClientWithMockServiceDiscovery creates a new PD HTTP client with a mock PD service discovery.
func newClientWithMockServiceDiscovery(
source string,
pdAddrs []string,
opts ...ClientOption,
Expand All @@ -395,7 +395,12 @@ func newClientWithoutInitServiceDiscovery(
for _, opt := range opts {
opt(c)
}
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
sd := pd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init mock service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
return nil
}
c.inner.init(sd)
return c
}
Loading

0 comments on commit 9a554a4

Please sign in to comment.