Skip to content

Commit

Permalink
Merge branch 'master' into add_log_defer
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Mar 10, 2023
2 parents 88a02b4 + 3a17dab commit 1e8c9cc
Show file tree
Hide file tree
Showing 17 changed files with 855 additions and 661 deletions.
188 changes: 75 additions & 113 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ type Client interface {
KeyspaceClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
// TSOClient is the client of TSO service
TSOClient
// Close closes the client.
Close()
}
Expand Down Expand Up @@ -242,30 +240,19 @@ func WithMaxErrorRetry(count int) ClientOption {
var _ Client = (*client)(nil)

type client struct {
bc BaseClient
tsobc BaseClient
tsoStreamBuilderFactory
// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding dc-location TSO channel.
tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest
// dc-location -> deadline
tsDeadline sync.Map // Same as map[string]chan deadline
// dc-location -> *lastTSO
lastTSMap sync.Map // Same as map[string]*lastTSO

keyspaceID uint32
// svcDiscovery is for pd service discovery
svcDiscovery ServiceDiscovery
tsoClient *tsoClient
tokenDispatcher *tokenDispatcher

// For internal usage.
checkTSDeadlineCh chan struct{}
checkTSODispatcherCh chan struct{}
updateTSOConnectionCtxsCh chan struct{}
updateTokenConnectionCh chan struct{}
leaderNetworkFailure int32
wg sync.WaitGroup
updateTokenConnectionCh chan struct{}
leaderNetworkFailure int32

ctx context.Context
cancel context.CancelFunc

wg sync.WaitGroup
option *option
}

Expand All @@ -289,10 +276,15 @@ func NewClient(svrAddrs []string, security SecurityOption, opts ...ClientOption)
func NewClientWithContext(ctx context.Context, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", svrAddrs))
c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security)
c.tsoStreamBuilderFactory = &pdTSOStreamBuilderFactory{}
c.bc = newPDBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option)
c.tsobc = c.bc
if err := c.setup(opts...); err != nil {
// Inject the client options.
for _, opt := range opts {
opt(c)
}

c.svcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option)
c.tsoClient = newTSOClient(clientCtx, clientCancel, &c.wg, c.option, c.keyspaceID, c.svcDiscovery, c.svcDiscovery.(tsoAllocatorEventSource), &pdTSOStreamBuilderFactory{})
if err := c.setup(); err != nil {
c.cancel()
return nil, err
}
return c, nil
Expand All @@ -303,18 +295,21 @@ func NewClientWithContext(ctx context.Context, svrAddrs []string, security Secur
// Merge NewClientWithContext with this API after we let client detect service mode provided on the server side.
// Before that, internal tools call this function to use mcs service.
func NewTSOClientWithContext(ctx context.Context, keyspaceID uint32, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd(tso)] create tso client with endpoints", zap.Strings("pd(api)-address", svrAddrs))
log.Info("[tso] create tso client with endpoints", zap.Strings("pd(api)-address", svrAddrs))
c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security)
c.tsoStreamBuilderFactory = &tsoTSOStreamBuilderFactory{}
c.bc = newPDBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option)
c.tsobc = newTSOMcsClient(clientCtx, clientCancel, &c.wg, MetaStorageClient(c), keyspaceID, addrsToUrls(svrAddrs), tlsCfg, c.option)
if err := c.setup(opts...); err != nil {
return nil, err
// Inject the client options.
for _, opt := range opts {
opt(c)
}
if err := c.tsobc.Init(); err != nil {

c.keyspaceID = keyspaceID
c.svcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option)
tsoSvcDiscovery := newTSOServiceDiscovery(clientCtx, clientCancel, &c.wg, MetaStorageClient(c), keyspaceID, addrsToUrls(svrAddrs), tlsCfg, c.option)
c.tsoClient = newTSOClient(clientCtx, clientCancel, &c.wg, c.option, c.keyspaceID, tsoSvcDiscovery, tsoSvcDiscovery.(tsoAllocatorEventSource), &tsoTSOStreamBuilderFactory{})
if err := c.setup(); err != nil {
c.cancel()
return nil, err
}
c.updateTSODispatcher()
return c, nil
}

Expand All @@ -331,43 +326,46 @@ func createClient(ctx context.Context, security *SecurityOption) (*client, conte

clientCtx, clientCancel := context.WithCancel(ctx)
c := &client{
checkTSDeadlineCh: make(chan struct{}),
checkTSODispatcherCh: make(chan struct{}, 1),
updateTSOConnectionCtxsCh: make(chan struct{}, 1),
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
option: newOption(),
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
option: newOption(),
}

return c, clientCtx, clientCancel, tlsCfg
}

func (c *client) setup(opts ...ClientOption) error {
// Inject the client options.
for _, opt := range opts {
opt(c)
}
func (c *client) setup() error {
// Init the client base.
if err := c.bc.Init(); err != nil {
if err := c.svcDiscovery.Init(); err != nil {
return err
}

// Register callbacks
c.tsobc.AddTSOAllocatorServingAddrSwitchedCallback(c.scheduleCheckTSODispatcher)
c.tsobc.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs)
c.bc.AddServingAddrSwitchedCallback(c.scheduleUpdateTokenConnection)
c.svcDiscovery.AddServingAddrSwitchedCallback(c.scheduleUpdateTokenConnection)

// Create dispatchers
c.updateTSODispatcher()
c.createTokenDispatcher()

// Start the daemons.
c.wg.Add(3)
go c.tsLoop()
go c.tsCancelLoop()
c.wg.Add(1)
go c.leaderCheckLoop()
return nil

return c.tsoClient.setup()
}

func (c *client) Close() {
c.cancel()
c.wg.Wait()

c.tsoClient.Close()
c.svcDiscovery.Close()

if c.tokenDispatcher != nil {
tokenErr := errors.WithStack(errClosing)
c.tokenDispatcher.tokenBatchController.revokePendingTokenRequest(tokenErr)
c.tokenDispatcher.dispatcherCancel()
}
}

func (c *client) scheduleUpdateTokenConnection() {
Expand All @@ -379,17 +377,17 @@ func (c *client) scheduleUpdateTokenConnection() {

// GetClusterID returns the ClusterID.
func (c *client) GetClusterID(ctx context.Context) uint64 {
return c.bc.GetClusterID(ctx)
return c.svcDiscovery.GetClusterID(ctx)
}

// GetLeaderAddr returns the leader address.
func (c *client) GetLeaderAddr() string {
return c.bc.GetServingAddr()
return c.svcDiscovery.GetServingAddr()
}

// GetBaseClient returns BaseClient which contains service discovery client logic
func (c *client) GetBaseClient() BaseClient {
return c.bc
// GetServiceDiscovery returns the client-side service discovery object
func (c *client) GetServiceDiscovery() ServiceDiscovery {
return c.svcDiscovery
}

// UpdateOption updates the client option.
Expand Down Expand Up @@ -437,7 +435,7 @@ func (c *client) leaderCheckLoop() {
func (c *client) checkLeaderHealth(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()
if client := c.bc.GetServingEndpointClientConn(); client != nil {
if client := c.svcDiscovery.GetServingEndpointClientConn(); client != nil {
healthCli := healthpb.NewHealthClient(client)
resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
rpcErr, ok := status.FromError(err)
Expand Down Expand Up @@ -468,33 +466,9 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
return resp.GetMembers(), nil
}

func (c *client) Close() {
c.cancel()
c.wg.Wait()

c.tsoDispatcher.Range(func(_, dispatcherInterface interface{}) bool {
if dispatcherInterface != nil {
dispatcher := dispatcherInterface.(*tsoDispatcher)
tsoErr := errors.WithStack(errClosing)
dispatcher.tsoBatchController.revokePendingTSORequest(tsoErr)
dispatcher.dispatcherCancel()
}
return true
})

c.bc.Close()
c.tsobc.Close()

if c.tokenDispatcher != nil {
tokenErr := errors.WithStack(errClosing)
c.tokenDispatcher.tokenBatchController.revokePendingTokenRequest(tokenErr)
c.tokenDispatcher.dispatcherCancel()
}
}

// leaderClient gets the client of current PD leader.
func (c *client) leaderClient() pdpb.PDClient {
if client := c.bc.GetServingEndpointClientConn(); client != nil {
if client := c.svcDiscovery.GetServingEndpointClientConn(); client != nil {
return pdpb.NewPDClient(client)
}
return nil
Expand All @@ -504,7 +478,7 @@ func (c *client) leaderClient() pdpb.PDClient {
// backup service endpoints randomly. Backup service endpoints are followers in a
// quorum-based cluster or secondaries in a primary/secondary configured cluster.
func (c *client) backupClientConn() (*grpc.ClientConn, string) {
addrs := c.bc.GetBackupAddrs()
addrs := c.svcDiscovery.GetBackupAddrs()
if len(addrs) < 1 {
return nil, ""
}
Expand All @@ -514,7 +488,7 @@ func (c *client) backupClientConn() (*grpc.ClientConn, string) {
)
for i := 0; i < len(addrs); i++ {
addr := addrs[rand.Intn(len(addrs))]
if cc, err = c.bc.GetOrCreateGRPCConn(addr); err != nil {
if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil {
continue
}
healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout)
Expand All @@ -538,27 +512,6 @@ func (c *client) getClient() pdpb.PDClient {
return c.leaderClient()
}

type tsoRequest struct {
start time.Time
clientCtx context.Context
requestCtx context.Context
done chan error
physical int64
logical int64
dcLocation string
keyspaceID uint32
}

var tsoReqPool = sync.Pool{
New: func() interface{} {
return &tsoRequest{
done: make(chan error, 1),
physical: 0,
logical: 0,
}
},
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}
Expand All @@ -568,15 +521,18 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
span = opentracing.StartSpan("GetLocalTSAsync", opentracing.ChildOf(span.Context()))
ctx = opentracing.ContextWithSpan(ctx, span)
}

req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
req.start = time.Now()
req.keyspaceID = c.keyspaceID
req.dcLocation = dcLocation
if err := c.dispatchRequest(dcLocation, req); err != nil {

if err := c.tsoClient.dispatchRequest(dcLocation, req); err != nil {
// Wait for a while and try again
time.Sleep(50 * time.Millisecond)
if err = c.dispatchRequest(dcLocation, req); err != nil {
if err = c.tsoClient.dispatchRequest(dcLocation, req); err != nil {
req.done <- err
}
}
Expand Down Expand Up @@ -652,7 +608,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs

var resp *pdpb.GetRegionResponse
for _, url := range memberURLs {
conn, err := c.bc.GetOrCreateGRPCConn(url)
conn, err := c.svcDiscovery.GetOrCreateGRPCConn(url)
if err != nil {
log.Error("[pd] can't get grpc connection", zap.String("member-URL", url), errs.ZapError(err))
continue
Expand All @@ -673,7 +629,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs

if resp == nil {
cmdFailDurationGetRegion.Observe(time.Since(start).Seconds())
c.bc.ScheduleCheckMemberChanged()
c.svcDiscovery.ScheduleCheckMemberChanged()
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
}
Expand Down Expand Up @@ -1015,7 +971,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.bc.GetClusterID(c.ctx),
ClusterId: c.svcDiscovery.GetClusterID(c.ctx),
}
}

Expand Down Expand Up @@ -1183,10 +1139,16 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.bc.ScheduleCheckMemberChanged()
c.svcDiscovery.ScheduleCheckMemberChanged()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
}
return nil
}

// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
return c.tsoClient.GetTSOAllocators()
}
4 changes: 2 additions & 2 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestUpdateURLs(t *testing.T) {
}
return
}
cli := &pdBaseClient{option: newOption()}
cli := &pdServiceDiscovery{option: newOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetURLs())
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestGRPCDialOption(t *testing.T) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &pdBaseClient{
cli := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
Expand Down
6 changes: 3 additions & 3 deletions client/keyspace_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type KeyspaceClient interface {

// keyspaceClient returns the KeyspaceClient from current PD leader.
func (c *client) keyspaceClient() keyspacepb.KeyspaceClient {
if client := c.bc.GetServingEndpointClientConn(); client != nil {
if client := c.svcDiscovery.GetServingEndpointClientConn(); client != nil {
return keyspacepb.NewKeyspaceClient(client)
}
return nil
Expand All @@ -63,7 +63,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key

if err != nil {
cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds())
c.bc.ScheduleCheckMemberChanged()
c.svcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

Expand Down Expand Up @@ -142,7 +142,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp

if err != nil {
cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds())
c.bc.ScheduleCheckMemberChanged()
c.svcDiscovery.ScheduleCheckMemberChanged()
return nil, err
}

Expand Down
Loading

0 comments on commit 1e8c9cc

Please sign in to comment.