From b5cfaa6dd98507fb672d222f875bd854a17403b6 Mon Sep 17 00:00:00 2001 From: Nathan Date: Thu, 9 Mar 2023 19:37:13 +0800 Subject: [PATCH 1/2] rule_manager: generate default rules based on whether enable witness (#6127) ref tikv/pd#5220, close tikv/pd#5220 rule_manager: generate default rules based on whether enable witness Signed-off-by: Wenbo Zhang Co-authored-by: Ti Chi Robot --- server/config/persist_options.go | 7 +++ server/schedule/checker/rule_checker_test.go | 2 +- server/schedule/config/config.go | 1 + server/schedule/placement/rule_manager.go | 52 ++++++++++++++----- .../schedule/placement/rule_manager_test.go | 47 ++++++++++++----- 5 files changed, 84 insertions(+), 25 deletions(-) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 7ddd73f99104..ef6e4b4ba9a0 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -172,6 +172,13 @@ func (o *PersistOptions) SetPlacementRulesCacheEnabled(enabled bool) { o.SetReplicationConfig(v) } +// SetWitnessEnabled set EanbleWitness +func (o *PersistOptions) SetWitnessEnabled(enabled bool) { + v := o.GetScheduleConfig().Clone() + v.EnableWitness = enabled + o.SetScheduleConfig(v) +} + // GetStrictlyMatchLabel returns whether check label strict. func (o *PersistOptions) GetStrictlyMatchLabel() bool { return o.GetReplicationConfig().StrictlyMatchLabel diff --git a/server/schedule/checker/rule_checker_test.go b/server/schedule/checker/rule_checker_test.go index 322540c4ac35..1f5adb2fa1df 100644 --- a/server/schedule/checker/rule_checker_test.go +++ b/server/schedule/checker/rule_checker_test.go @@ -435,7 +435,7 @@ func (suite *ruleCheckerTestSuite) TestFixRuleWitness5() { }, }) suite.Error(err) - suite.Equal(errs.ErrRuleContent.FastGenByArgs(fmt.Sprintf("define multiple witness by count %d", 2)).Error(), err.Error()) + suite.Equal(errs.ErrRuleContent.FastGenByArgs(fmt.Sprintf("define too many witness by count %d", 2)).Error(), err.Error()) } func (suite *ruleCheckerTestSuite) TestFixRuleWitness6() { diff --git a/server/schedule/config/config.go b/server/schedule/config/config.go index 52972322381a..6b35337a1db7 100644 --- a/server/schedule/config/config.go +++ b/server/schedule/config/config.go @@ -85,6 +85,7 @@ type Config interface { SetSplitMergeInterval(time.Duration) SetMaxReplicas(int) SetPlacementRulesCacheEnabled(bool) + SetWitnessEnabled(bool) } // StoreConfig is the interface that wraps the StoreConfig related methods. diff --git a/server/schedule/placement/rule_manager.go b/server/schedule/placement/rule_manager.go index d6706d319425..2842b4503f9b 100644 --- a/server/schedule/placement/rule_manager.go +++ b/server/schedule/placement/rule_manager.go @@ -81,17 +81,45 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string) error } if len(m.ruleConfig.rules) == 0 { // migrate from old config. - defaultRule := &Rule{ - GroupID: "pd", - ID: "default", - Role: Voter, - Count: maxReplica, - LocationLabels: locationLabels, - } - if err := m.storage.SaveRule(defaultRule.StoreKey(), defaultRule); err != nil { - return err + var defaultRules []*Rule + if m.conf != nil && m.conf.IsWitnessAllowed() && maxReplica >= 3 { + // Because maxReplica is actually always an odd number, so directly divided by 2 + witnessCount := maxReplica / 2 + defaultRules = append(defaultRules, + []*Rule{ + { + GroupID: "pd", + ID: "default", + Role: Voter, + Count: maxReplica - witnessCount, + LocationLabels: locationLabels, + }, + { + GroupID: "pd", + ID: "witness", + Role: Voter, + Count: witnessCount, + IsWitness: true, + LocationLabels: locationLabels, + }, + }..., + ) + } else { + defaultRules = append(defaultRules, &Rule{ + GroupID: "pd", + ID: "default", + Role: Voter, + Count: maxReplica, + LocationLabels: locationLabels, + }) + } + for _, defaultRule := range defaultRules { + if err := m.storage.SaveRule(defaultRule.StoreKey(), defaultRule); err != nil { + // TODO: Need to delete the previously successfully saved Rules? + return err + } + m.ruleConfig.setRule(defaultRule) } - m.ruleConfig.setRule(defaultRule) } m.ruleConfig.adjust() ruleList, err := buildRuleList(m.ruleConfig) @@ -206,8 +234,8 @@ func (m *RuleManager) adjustRule(r *Rule, groupID string) (err error) { if r.Role == Leader && r.Count > 1 { return errs.ErrRuleContent.FastGenByArgs(fmt.Sprintf("define multiple leaders by count %d", r.Count)) } - if r.IsWitness && r.Count > 1 { - return errs.ErrRuleContent.FastGenByArgs(fmt.Sprintf("define multiple witness by count %d", r.Count)) + if r.IsWitness && r.Count > m.conf.GetMaxReplicas()/2 { + return errs.ErrRuleContent.FastGenByArgs(fmt.Sprintf("define too many witness by count %d", r.Count)) } for _, c := range r.LabelConstraints { if !validateOp(c.Op) { diff --git a/server/schedule/placement/rule_manager_test.go b/server/schedule/placement/rule_manager_test.go index 93d6ff155095..894f78f1fefa 100644 --- a/server/schedule/placement/rule_manager_test.go +++ b/server/schedule/placement/rule_manager_test.go @@ -28,11 +28,12 @@ import ( "github.com/tikv/pd/pkg/storage/kv" ) -func newTestManager(t *testing.T) (endpoint.RuleStorage, *RuleManager) { +func newTestManager(t *testing.T, enableWitness bool) (endpoint.RuleStorage, *RuleManager) { re := require.New(t) store := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil) var err error manager := NewRuleManager(store, nil, mockconfig.NewTestOptions()) + manager.conf.SetWitnessEnabled(enableWitness) err = manager.Initialize(3, []string{"zone", "rack", "host"}) re.NoError(err) return store, manager @@ -40,7 +41,7 @@ func newTestManager(t *testing.T) (endpoint.RuleStorage, *RuleManager) { func TestDefault(t *testing.T) { re := require.New(t) - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) rules := manager.GetAllRules() re.Len(rules, 1) re.Equal("pd", rules[0].GroupID) @@ -52,9 +53,31 @@ func TestDefault(t *testing.T) { re.Equal([]string{"zone", "rack", "host"}, rules[0].LocationLabels) } +func TestDefault2(t *testing.T) { + re := require.New(t) + _, manager := newTestManager(t, true) + rules := manager.GetAllRules() + re.Len(rules, 2) + re.Equal("pd", rules[0].GroupID) + re.Equal("default", rules[0].ID) + re.Equal(0, rules[0].Index) + re.Empty(rules[0].StartKey) + re.Empty(rules[0].EndKey) + re.Equal(Voter, rules[0].Role) + re.Equal([]string{"zone", "rack", "host"}, rules[0].LocationLabels) + re.Equal("pd", rules[1].GroupID) + re.Equal("witness", rules[1].ID) + re.Equal(0, rules[1].Index) + re.Empty(rules[1].StartKey) + re.Empty(rules[1].EndKey) + re.Equal(Voter, rules[1].Role) + re.True(rules[1].IsWitness) + re.Equal([]string{"zone", "rack", "host"}, rules[1].LocationLabels) +} + func TestAdjustRule(t *testing.T) { re := require.New(t) - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) rules := []Rule{ {GroupID: "group", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, {GroupID: "", ID: "id", StartKeyHex: "123abc", EndKeyHex: "123abf", Role: "voter", Count: 3}, @@ -106,7 +129,7 @@ func TestAdjustRule(t *testing.T) { func TestLeaderCheck(t *testing.T) { re := require.New(t) - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) re.Regexp(".*needs at least one leader or voter.*", manager.SetRule(&Rule{GroupID: "pd", ID: "default", Role: "learner", Count: 3}).Error()) re.Regexp(".*define multiple leaders by count 2.*", manager.SetRule(&Rule{GroupID: "g2", ID: "33", Role: "leader", Count: 2}).Error()) re.Regexp(".*multiple leader replicas.*", manager.Batch([]RuleOp{ @@ -123,7 +146,7 @@ func TestLeaderCheck(t *testing.T) { func TestSaveLoad(t *testing.T) { re := require.New(t) - store, manager := newTestManager(t) + store, manager := newTestManager(t, false) rules := []*Rule{ {GroupID: "pd", ID: "default", Role: "voter", Count: 5}, {GroupID: "foo", ID: "baz", StartKeyHex: "", EndKeyHex: "abcd", Role: "voter", Count: 1}, @@ -144,7 +167,7 @@ func TestSaveLoad(t *testing.T) { func TestSetAfterGet(t *testing.T) { re := require.New(t) - store, manager := newTestManager(t) + store, manager := newTestManager(t, false) rule := manager.GetRule("pd", "default") rule.Count = 1 manager.SetRule(rule) @@ -166,7 +189,7 @@ func checkRules(t *testing.T, rules []*Rule, expect [][2]string) { func TestKeys(t *testing.T) { re := require.New(t) - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) rules := []*Rule{ {GroupID: "1", ID: "1", Role: "voter", Count: 1, StartKeyHex: "", EndKeyHex: ""}, {GroupID: "2", ID: "2", Role: "voter", Count: 1, StartKeyHex: "11", EndKeyHex: "ff"}, @@ -255,7 +278,7 @@ func TestKeys(t *testing.T) { } func TestDeleteByIDPrefix(t *testing.T) { - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) manager.SetRules([]*Rule{ {GroupID: "g1", ID: "foo1", Role: "voter", Count: 1}, {GroupID: "g2", ID: "foo1", Role: "voter", Count: 1}, @@ -275,7 +298,7 @@ func TestDeleteByIDPrefix(t *testing.T) { func TestRangeGap(t *testing.T) { re := require.New(t) - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) err := manager.DeleteRule("pd", "default") re.Error(err) @@ -298,7 +321,7 @@ func TestRangeGap(t *testing.T) { func TestGroupConfig(t *testing.T) { re := require.New(t) - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) pd1 := &RuleGroup{ID: "pd"} re.Equal(pd1, manager.GetRuleGroup("pd")) @@ -334,7 +357,7 @@ func TestGroupConfig(t *testing.T) { func TestRuleVersion(t *testing.T) { re := require.New(t) - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) rule1 := manager.GetRule("pd", "default") re.Equal(uint64(0), rule1.Version) // create new rule @@ -425,7 +448,7 @@ func TestCheckApplyRules(t *testing.T) { func TestCacheManager(t *testing.T) { re := require.New(t) - _, manager := newTestManager(t) + _, manager := newTestManager(t, false) manager.conf.SetPlacementRulesCacheEnabled(true) rules := addExtraRules(0) re.NoError(manager.SetRules(rules)) From 3a17daba7bbb89be17141122c8febafb45a28cd7 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Thu, 9 Mar 2023 18:15:13 -0800 Subject: [PATCH 2/2] Improve PD/TSO-MCS Service Discovery and TSO Client. (#6090) ref tikv/pd#5836 Further(final) refactor for PD/TSO-MCS Service Discovery and TSO Client. Signed-off-by: Bin Shi --- client/client.go | 188 +++----- client/client_test.go | 4 +- client/keyspace_client.go | 6 +- client/meta_storage_client.go | 4 +- ...base_client.go => pd_service_discovery.go} | 289 ++++++------ client/resource_manager_client.go | 8 +- client/tso_client.go | 444 +++++++----------- ...equest_dispatcher.go => tso_dispatcher.go} | 132 +++--- client/tso_service_discovery.go | 306 ++++++++++++ tests/client/Makefile | 2 +- tests/client/client_test.go | 18 +- .../resource_manager/resource_manager_test.go | 6 +- 12 files changed, 771 insertions(+), 636 deletions(-) rename client/{base_client.go => pd_service_discovery.go} (60%) rename client/{tso_request_dispatcher.go => tso_dispatcher.go} (81%) create mode 100644 client/tso_service_discovery.go diff --git a/client/client.go b/client/client.go index e38e60e0d61b..77a6bdf4c05e 100644 --- a/client/client.go +++ b/client/client.go @@ -143,8 +143,6 @@ type Client interface { KeyspaceClient // ResourceManagerClient manages resource group metadata and token assignment. ResourceManagerClient - // TSOClient is the client of TSO service - TSOClient // Close closes the client. Close() } @@ -242,30 +240,19 @@ func WithMaxErrorRetry(count int) ClientOption { var _ Client = (*client)(nil) type client struct { - bc BaseClient - tsobc BaseClient - tsoStreamBuilderFactory - // tsoDispatcher is used to dispatch different TSO requests to - // the corresponding dc-location TSO channel. - tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest - // dc-location -> deadline - tsDeadline sync.Map // Same as map[string]chan deadline - // dc-location -> *lastTSO - lastTSMap sync.Map // Same as map[string]*lastTSO - + keyspaceID uint32 + // svcDiscovery is for pd service discovery + svcDiscovery ServiceDiscovery + tsoClient *tsoClient tokenDispatcher *tokenDispatcher // For internal usage. - checkTSDeadlineCh chan struct{} - checkTSODispatcherCh chan struct{} - updateTSOConnectionCtxsCh chan struct{} - updateTokenConnectionCh chan struct{} - leaderNetworkFailure int32 - wg sync.WaitGroup + updateTokenConnectionCh chan struct{} + leaderNetworkFailure int32 ctx context.Context cancel context.CancelFunc - + wg sync.WaitGroup option *option } @@ -289,10 +276,15 @@ func NewClient(svrAddrs []string, security SecurityOption, opts ...ClientOption) func NewClientWithContext(ctx context.Context, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", svrAddrs)) c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security) - c.tsoStreamBuilderFactory = &pdTSOStreamBuilderFactory{} - c.bc = newPDBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option) - c.tsobc = c.bc - if err := c.setup(opts...); err != nil { + // Inject the client options. + for _, opt := range opts { + opt(c) + } + + c.svcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option) + c.tsoClient = newTSOClient(clientCtx, clientCancel, &c.wg, c.option, c.keyspaceID, c.svcDiscovery, c.svcDiscovery.(tsoAllocatorEventSource), &pdTSOStreamBuilderFactory{}) + if err := c.setup(); err != nil { + c.cancel() return nil, err } return c, nil @@ -303,18 +295,21 @@ func NewClientWithContext(ctx context.Context, svrAddrs []string, security Secur // Merge NewClientWithContext with this API after we let client detect service mode provided on the server side. // Before that, internal tools call this function to use mcs service. func NewTSOClientWithContext(ctx context.Context, keyspaceID uint32, svrAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { - log.Info("[pd(tso)] create tso client with endpoints", zap.Strings("pd(api)-address", svrAddrs)) + log.Info("[tso] create tso client with endpoints", zap.Strings("pd(api)-address", svrAddrs)) c, clientCtx, clientCancel, tlsCfg := createClient(ctx, &security) - c.tsoStreamBuilderFactory = &tsoTSOStreamBuilderFactory{} - c.bc = newPDBaseClient(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option) - c.tsobc = newTSOMcsClient(clientCtx, clientCancel, &c.wg, MetaStorageClient(c), keyspaceID, addrsToUrls(svrAddrs), tlsCfg, c.option) - if err := c.setup(opts...); err != nil { - return nil, err + // Inject the client options. + for _, opt := range opts { + opt(c) } - if err := c.tsobc.Init(); err != nil { + + c.keyspaceID = keyspaceID + c.svcDiscovery = newPDServiceDiscovery(clientCtx, clientCancel, &c.wg, addrsToUrls(svrAddrs), tlsCfg, c.option) + tsoSvcDiscovery := newTSOServiceDiscovery(clientCtx, clientCancel, &c.wg, MetaStorageClient(c), keyspaceID, addrsToUrls(svrAddrs), tlsCfg, c.option) + c.tsoClient = newTSOClient(clientCtx, clientCancel, &c.wg, c.option, c.keyspaceID, tsoSvcDiscovery, tsoSvcDiscovery.(tsoAllocatorEventSource), &tsoTSOStreamBuilderFactory{}) + if err := c.setup(); err != nil { + c.cancel() return nil, err } - c.updateTSODispatcher() return c, nil } @@ -331,43 +326,46 @@ func createClient(ctx context.Context, security *SecurityOption) (*client, conte clientCtx, clientCancel := context.WithCancel(ctx) c := &client{ - checkTSDeadlineCh: make(chan struct{}), - checkTSODispatcherCh: make(chan struct{}, 1), - updateTSOConnectionCtxsCh: make(chan struct{}, 1), - updateTokenConnectionCh: make(chan struct{}, 1), - ctx: clientCtx, - cancel: clientCancel, - option: newOption(), + updateTokenConnectionCh: make(chan struct{}, 1), + ctx: clientCtx, + cancel: clientCancel, + option: newOption(), } return c, clientCtx, clientCancel, tlsCfg } -func (c *client) setup(opts ...ClientOption) error { - // Inject the client options. - for _, opt := range opts { - opt(c) - } +func (c *client) setup() error { // Init the client base. - if err := c.bc.Init(); err != nil { + if err := c.svcDiscovery.Init(); err != nil { return err } // Register callbacks - c.tsobc.AddTSOAllocatorServingAddrSwitchedCallback(c.scheduleCheckTSODispatcher) - c.tsobc.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) - c.bc.AddServingAddrSwitchedCallback(c.scheduleUpdateTokenConnection) + c.svcDiscovery.AddServingAddrSwitchedCallback(c.scheduleUpdateTokenConnection) // Create dispatchers - c.updateTSODispatcher() c.createTokenDispatcher() // Start the daemons. - c.wg.Add(3) - go c.tsLoop() - go c.tsCancelLoop() + c.wg.Add(1) go c.leaderCheckLoop() - return nil + + return c.tsoClient.setup() +} + +func (c *client) Close() { + c.cancel() + c.wg.Wait() + + c.tsoClient.Close() + c.svcDiscovery.Close() + + if c.tokenDispatcher != nil { + tokenErr := errors.WithStack(errClosing) + c.tokenDispatcher.tokenBatchController.revokePendingTokenRequest(tokenErr) + c.tokenDispatcher.dispatcherCancel() + } } func (c *client) scheduleUpdateTokenConnection() { @@ -379,17 +377,17 @@ func (c *client) scheduleUpdateTokenConnection() { // GetClusterID returns the ClusterID. func (c *client) GetClusterID(ctx context.Context) uint64 { - return c.bc.GetClusterID(ctx) + return c.svcDiscovery.GetClusterID(ctx) } // GetLeaderAddr returns the leader address. func (c *client) GetLeaderAddr() string { - return c.bc.GetServingAddr() + return c.svcDiscovery.GetServingAddr() } -// GetBaseClient returns BaseClient which contains service discovery client logic -func (c *client) GetBaseClient() BaseClient { - return c.bc +// GetServiceDiscovery returns the client-side service discovery object +func (c *client) GetServiceDiscovery() ServiceDiscovery { + return c.svcDiscovery } // UpdateOption updates the client option. @@ -437,7 +435,7 @@ func (c *client) leaderCheckLoop() { func (c *client) checkLeaderHealth(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, c.option.timeout) defer cancel() - if client := c.bc.GetServingEndpointClientConn(); client != nil { + if client := c.svcDiscovery.GetServingEndpointClientConn(); client != nil { healthCli := healthpb.NewHealthClient(client) resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) rpcErr, ok := status.FromError(err) @@ -468,33 +466,9 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { return resp.GetMembers(), nil } -func (c *client) Close() { - c.cancel() - c.wg.Wait() - - c.tsoDispatcher.Range(func(_, dispatcherInterface interface{}) bool { - if dispatcherInterface != nil { - dispatcher := dispatcherInterface.(*tsoDispatcher) - tsoErr := errors.WithStack(errClosing) - dispatcher.tsoBatchController.revokePendingTSORequest(tsoErr) - dispatcher.dispatcherCancel() - } - return true - }) - - c.bc.Close() - c.tsobc.Close() - - if c.tokenDispatcher != nil { - tokenErr := errors.WithStack(errClosing) - c.tokenDispatcher.tokenBatchController.revokePendingTokenRequest(tokenErr) - c.tokenDispatcher.dispatcherCancel() - } -} - // leaderClient gets the client of current PD leader. func (c *client) leaderClient() pdpb.PDClient { - if client := c.bc.GetServingEndpointClientConn(); client != nil { + if client := c.svcDiscovery.GetServingEndpointClientConn(); client != nil { return pdpb.NewPDClient(client) } return nil @@ -504,7 +478,7 @@ func (c *client) leaderClient() pdpb.PDClient { // backup service endpoints randomly. Backup service endpoints are followers in a // quorum-based cluster or secondaries in a primary/secondary configured cluster. func (c *client) backupClientConn() (*grpc.ClientConn, string) { - addrs := c.bc.GetBackupAddrs() + addrs := c.svcDiscovery.GetBackupAddrs() if len(addrs) < 1 { return nil, "" } @@ -514,7 +488,7 @@ func (c *client) backupClientConn() (*grpc.ClientConn, string) { ) for i := 0; i < len(addrs); i++ { addr := addrs[rand.Intn(len(addrs))] - if cc, err = c.bc.GetOrCreateGRPCConn(addr); err != nil { + if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { continue } healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) @@ -538,27 +512,6 @@ func (c *client) getClient() pdpb.PDClient { return c.leaderClient() } -type tsoRequest struct { - start time.Time - clientCtx context.Context - requestCtx context.Context - done chan error - physical int64 - logical int64 - dcLocation string - keyspaceID uint32 -} - -var tsoReqPool = sync.Pool{ - New: func() interface{} { - return &tsoRequest{ - done: make(chan error, 1), - physical: 0, - logical: 0, - } - }, -} - func (c *client) GetTSAsync(ctx context.Context) TSFuture { return c.GetLocalTSAsync(ctx, globalDCLocation) } @@ -568,15 +521,18 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur span = opentracing.StartSpan("GetLocalTSAsync", opentracing.ChildOf(span.Context())) ctx = opentracing.ContextWithSpan(ctx, span) } + req := tsoReqPool.Get().(*tsoRequest) req.requestCtx = ctx req.clientCtx = c.ctx req.start = time.Now() + req.keyspaceID = c.keyspaceID req.dcLocation = dcLocation - if err := c.dispatchRequest(dcLocation, req); err != nil { + + if err := c.tsoClient.dispatchRequest(dcLocation, req); err != nil { // Wait for a while and try again time.Sleep(50 * time.Millisecond) - if err = c.dispatchRequest(dcLocation, req); err != nil { + if err = c.tsoClient.dispatchRequest(dcLocation, req); err != nil { req.done <- err } } @@ -652,7 +608,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs var resp *pdpb.GetRegionResponse for _, url := range memberURLs { - conn, err := c.bc.GetOrCreateGRPCConn(url) + conn, err := c.svcDiscovery.GetOrCreateGRPCConn(url) if err != nil { log.Error("[pd] can't get grpc connection", zap.String("member-URL", url), errs.ZapError(err)) continue @@ -673,7 +629,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs if resp == nil { cmdFailDurationGetRegion.Observe(time.Since(start).Seconds()) - c.bc.ScheduleCheckMemberChanged() + c.svcDiscovery.ScheduleCheckMemberChanged() errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs) return nil, errors.WithStack(errors.New(errorMsg)) } @@ -1015,7 +971,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R func (c *client) requestHeader() *pdpb.RequestHeader { return &pdpb.RequestHeader{ - ClusterId: c.bc.GetClusterID(c.ctx), + ClusterId: c.svcDiscovery.GetClusterID(c.ctx), } } @@ -1183,10 +1139,16 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e if err != nil || header.GetError() != nil { observer.Observe(time.Since(start).Seconds()) if err != nil { - c.bc.ScheduleCheckMemberChanged() + c.svcDiscovery.ScheduleCheckMemberChanged() return errors.WithStack(err) } return errors.WithStack(errors.New(header.GetError().String())) } return nil } + +// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map +// For test only. +func (c *client) GetTSOAllocators() *sync.Map { + return c.tsoClient.GetTSOAllocators() +} diff --git a/client/client_test.go b/client/client_test.go index 2ce9f660cbdb..43c0cc5c3083 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -55,7 +55,7 @@ func TestUpdateURLs(t *testing.T) { } return } - cli := &pdBaseClient{option: newOption()} + cli := &pdServiceDiscovery{option: newOption()} cli.urls.Store([]string{}) cli.updateURLs(members[1:]) re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetURLs()) @@ -90,7 +90,7 @@ func TestGRPCDialOption(t *testing.T) { start := time.Now() ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond) defer cancel() - cli := &pdBaseClient{ + cli := &pdServiceDiscovery{ checkMembershipCh: make(chan struct{}, 1), ctx: ctx, cancel: cancel, diff --git a/client/keyspace_client.go b/client/keyspace_client.go index 0cdcd264b1cf..9864ad8a2ed4 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -38,7 +38,7 @@ type KeyspaceClient interface { // keyspaceClient returns the KeyspaceClient from current PD leader. func (c *client) keyspaceClient() keyspacepb.KeyspaceClient { - if client := c.bc.GetServingEndpointClientConn(); client != nil { + if client := c.svcDiscovery.GetServingEndpointClientConn(); client != nil { return keyspacepb.NewKeyspaceClient(client) } return nil @@ -63,7 +63,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key if err != nil { cmdFailedDurationLoadKeyspace.Observe(time.Since(start).Seconds()) - c.bc.ScheduleCheckMemberChanged() + c.svcDiscovery.ScheduleCheckMemberChanged() return nil, err } @@ -142,7 +142,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp if err != nil { cmdFailedDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) - c.bc.ScheduleCheckMemberChanged() + c.svcDiscovery.ScheduleCheckMemberChanged() return nil, err } diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index 082e4956bb3e..0b0bd023c473 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -39,7 +39,7 @@ type MetaStorageClient interface { // metaStorageClient gets the meta storage client from current PD leader. func (c *client) metaStorageClient() meta_storagepb.MetaStorageClient { - if client := c.bc.GetServingEndpointClientConn(); client != nil { + if client := c.svcDiscovery.GetServingEndpointClientConn(); client != nil { return meta_storagepb.NewMetaStorageClient(client) } return nil @@ -214,7 +214,7 @@ func (c *client) respForMetaStorageErr(observer prometheus.Observer, start time. if err != nil || header.GetError() != nil { observer.Observe(time.Since(start).Seconds()) if err != nil { - c.bc.ScheduleCheckMemberChanged() + c.svcDiscovery.ScheduleCheckMemberChanged() return errors.WithStack(err) } return errors.WithStack(errors.New(header.GetError().String())) diff --git a/client/base_client.go b/client/pd_service_discovery.go similarity index 60% rename from client/base_client.go rename to client/pd_service_discovery.go index 80a35e3b0b20..ced0048641da 100644 --- a/client/base_client.go +++ b/client/pd_service_discovery.go @@ -16,7 +16,6 @@ package pd import ( "context" - "fmt" "reflect" "sort" "sync" @@ -39,9 +38,9 @@ const ( memberUpdateInterval = time.Minute ) -// BaseClient defines the general interface for service discovery on a quorum-based cluster -// or a primary/secondy configured cluster. -type BaseClient interface { +// ServiceDiscovery defines the general interface for service discovery on a quorum-based cluster +// or a primary/secondary configured cluster. +type ServiceDiscovery interface { // Init initialize the concrete client underlying Init() error // Close releases all resources @@ -51,11 +50,13 @@ type BaseClient interface { // GetURLs returns the URLs of the servers. GetURLs() []string // GetServingEndpointClientConn returns the grpc client connection of the serving endpoint - // which is the leader in a quorum-based cluster or the primary in a primary/secondy + // which is the leader in a quorum-based cluster or the primary in a primary/secondary // configured cluster. GetServingEndpointClientConn() *grpc.ClientConn + // GetClientConns returns the mapping {addr -> a gRPC connection} + GetClientConns() *sync.Map // GetServingAddr returns the serving endpoint which is the leader in a quorum-based cluster - // or the primary in a primary/secondy configured cluster. + // or the primary in a primary/secondary configured cluster. GetServingAddr() string // GetBackupAddrs gets the addresses of the current reachable and healthy backup service // endpoints randomly. Backup service endpoints are followers in a quorum-based cluster or @@ -65,10 +66,10 @@ type BaseClient interface { GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) // ScheduleCheckMemberChanged is used to trigger a check to see if there is any membership change // among the leader/followers in a quorum-based cluster or among the primary/secondaries in a - // primary/secondy configured cluster. + // primary/secondary configured cluster. ScheduleCheckMemberChanged() // CheckMemberChanged immediately check if there is any membership change among the leader/followers - // in a quorum-based cluster or among the primary/secondaries in a primary/secondy configured cluster. + // in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. CheckMemberChanged() error // AddServingAddrSwitchedCallback adds callbacks which will be called when the leader // in a quorum-based cluster or the primary in a primary/secondary configured cluster @@ -78,25 +79,27 @@ type BaseClient interface { // in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster // is changed. AddServiceAddrsSwitchedCallback(callbacks ...func()) +} + +type tsoLocalServAddrsUpdatedFunc func(map[string]string) error +type tsoGlobalServAddrUpdatedFunc func(string) error - // TODO: Separate the following TSO related service discovery methods from the above methods. - - // GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map - GetTSOAllocators() *sync.Map - // GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation - GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) - // GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection - // of the given dcLocation - GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) - // AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called - // when any global/local tso allocator service endpoint is switched. - AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) +type tsoAllocatorEventSource interface { + // SetTSOLocalServAddrsUpdatedCallback adds a callback which will be called when the local tso + // allocator leader list is updated. + SetTSOLocalServAddrsUpdatedCallback(callback tsoLocalServAddrsUpdatedFunc) + // SetTSOGlobalServAddrUpdatedCallback adds a callback which will be called when the global tso + // allocator leader is updated. + SetTSOGlobalServAddrUpdatedCallback(callback tsoGlobalServAddrUpdatedFunc) } -var _ BaseClient = (*pdBaseClient)(nil) +var _ ServiceDiscovery = (*pdServiceDiscovery)(nil) +var _ tsoAllocatorEventSource = (*pdServiceDiscovery)(nil) + +// pdServiceDiscovery is the service discovery client of PD/API service which is quorum based +type pdServiceDiscovery struct { + isInitialized bool -// pdBaseClient is the service discovery client of PD/API service which is quorum based -type pdBaseClient struct { urls atomic.Value // Store as []string // PD leader URL leader atomic.Value // Store as string @@ -106,33 +109,35 @@ type pdBaseClient struct { clusterID uint64 // addr -> a gRPC connection clientConns sync.Map // Store as map[string]*grpc.ClientConn - // dc-location -> TSO allocator leader URL - tsoAllocators sync.Map // Store as map[string]string - - // leaderSwitchedCallbacks will be called after the leader swichted - leaderSwitchedCallbacks []func() - // membersChangedCallbacks will be called after there is any membership - // change in the leader and followers - membersChangedCallbacks []func() - // tsoAllocatorLeaderSwitchedCallback will be called when any global/local - // tso allocator leader is switched. - tsoAllocatorLeaderSwitchedCallback []func() + + // leaderSwitchedCbs will be called after the leader swichted + leaderSwitchedCbs []func() + // membersChangedCbs will be called after there is any membership change in the + // leader and followers + membersChangedCbs []func() + // tsoLocalAllocLeadersUpdatedCb will be called when the local tso allocator + // leader list is updated. The input is a map {DC Localtion -> Leader Addr} + tsoLocalAllocLeadersUpdatedCb tsoLocalServAddrsUpdatedFunc + // tsoGlobalAllocLeaderUpdatedCb will be called when the global tso allocator + // leader is updated. + tsoGlobalAllocLeaderUpdatedCb tsoGlobalServAddrUpdatedFunc checkMembershipCh chan struct{} - wg *sync.WaitGroup - ctx context.Context - cancel context.CancelFunc + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + closeOnce sync.Once tlsCfg *tlsutil.TLSConfig // Client option. option *option } -// newPDBaseClient returns a new baseClient. -func newPDBaseClient(ctx context.Context, cancel context.CancelFunc, - wg *sync.WaitGroup, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) BaseClient { - bc := &pdBaseClient{ +// newPDServiceDiscovery returns a new baseClient. +func newPDServiceDiscovery(ctx context.Context, cancel context.CancelFunc, + wg *sync.WaitGroup, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) *pdServiceDiscovery { + pdsd := &pdServiceDiscovery{ checkMembershipCh: make(chan struct{}, 1), ctx: ctx, cancel: cancel, @@ -140,27 +145,32 @@ func newPDBaseClient(ctx context.Context, cancel context.CancelFunc, tlsCfg: tlsCfg, option: option, } - bc.urls.Store(urls) - return bc + pdsd.urls.Store(urls) + return pdsd } -func (c *pdBaseClient) Init() error { - if err := c.initRetry(c.initClusterID); err != nil { - c.cancel() - return err - } - if err := c.initRetry(c.updateMember); err != nil { - c.cancel() - return err +func (c *pdServiceDiscovery) Init() error { + if !c.isInitialized { + if err := c.initRetry(c.initClusterID); err != nil { + c.cancel() + return err + } + if err := c.initRetry(c.updateMember); err != nil { + c.cancel() + return err + } + log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) + + c.wg.Add(1) + go c.memberLoop() + + c.isInitialized = true } - log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID)) - c.wg.Add(1) - go c.memberLoop() return nil } -func (c *pdBaseClient) initRetry(f func() error) error { +func (c *pdServiceDiscovery) initRetry(f func() error) error { var err error for i := 0; i < c.option.maxRetryTimes; i++ { if err = f(); err == nil { @@ -175,7 +185,7 @@ func (c *pdBaseClient) initRetry(f func() error) error { return errors.WithStack(err) } -func (c *pdBaseClient) memberLoop() { +func (c *pdServiceDiscovery) memberLoop() { defer c.wg.Done() ctx, cancel := context.WithCancel(c.ctx) @@ -198,56 +208,59 @@ func (c *pdBaseClient) memberLoop() { } // Close releases all resources -func (c *pdBaseClient) Close() { - c.clientConns.Range(func(key, cc interface{}) bool { - if err := cc.(*grpc.ClientConn).Close(); err != nil { - log.Error("[pd] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) - } - c.clientConns.Delete(key) - return true +func (c *pdServiceDiscovery) Close() { + c.closeOnce.Do(func() { + log.Info("close pd service discovery") + c.clientConns.Range(func(key, cc interface{}) bool { + if err := cc.(*grpc.ClientConn).Close(); err != nil { + log.Error("[pd] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) + } + c.clientConns.Delete(key) + return true + }) }) } // GetClusterID returns the ClusterID. -func (c *pdBaseClient) GetClusterID(context.Context) uint64 { +func (c *pdServiceDiscovery) GetClusterID(context.Context) uint64 { return c.clusterID } // GetURLs returns the URLs of the servers. // For testing use. It should only be called when the client is closed. -func (c *pdBaseClient) GetURLs() []string { +func (c *pdServiceDiscovery) GetURLs() []string { return c.urls.Load().([]string) } -// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map -func (c *pdBaseClient) GetTSOAllocators() *sync.Map { - return &c.tsoAllocators -} - // GetServingAddr returns the grpc client connection of the serving endpoint -// which is the leader in a quorum-based cluster or the primary in a primary/secondy +// which is the leader in a quorum-based cluster or the primary in a primary/secondary // configured cluster. -func (c *pdBaseClient) GetServingEndpointClientConn() *grpc.ClientConn { +func (c *pdServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { if cc, ok := c.clientConns.Load(c.getLeaderAddr()); ok { return cc.(*grpc.ClientConn) } return nil } +// GetClientConns returns the mapping {addr -> a gRPC connection} +func (c *pdServiceDiscovery) GetClientConns() *sync.Map { + return &c.clientConns +} + // GetServingAddr returns the leader address -func (c *pdBaseClient) GetServingAddr() string { +func (c *pdServiceDiscovery) GetServingAddr() string { return c.getLeaderAddr() } // GetBackupAddrs gets the addresses of the current reachable and healthy followers // in a quorum-based cluster. -func (c *pdBaseClient) GetBackupAddrs() []string { +func (c *pdServiceDiscovery) GetBackupAddrs() []string { return c.getFollowerAddrs() } // ScheduleCheckMemberChanged is used to check if there is any membership // change among the leader and the followers. -func (c *pdBaseClient) ScheduleCheckMemberChanged() { +func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() { select { case c.checkMembershipCh <- struct{}{}: default: @@ -255,31 +268,37 @@ func (c *pdBaseClient) ScheduleCheckMemberChanged() { } // Immediately check if there is any membership change among the leader/followers in a -// quorum-based cluster or among the primary/secondaries in a primary/secondy configured cluster. -func (c *pdBaseClient) CheckMemberChanged() error { +// quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. +func (c *pdServiceDiscovery) CheckMemberChanged() error { return c.updateMember() } // AddServingAddrSwitchedCallback adds callbacks which will be called // when the leader is switched. -func (c *pdBaseClient) AddServingAddrSwitchedCallback(callbacks ...func()) { - c.leaderSwitchedCallbacks = append(c.leaderSwitchedCallbacks, callbacks...) +func (c *pdServiceDiscovery) AddServingAddrSwitchedCallback(callbacks ...func()) { + c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, callbacks...) } // AddServiceAddrsSwitchedCallback adds callbacks which will be called when // any leader/follower is changed. -func (c *pdBaseClient) AddServiceAddrsSwitchedCallback(callbacks ...func()) { - c.membersChangedCallbacks = append(c.membersChangedCallbacks, callbacks...) +func (c *pdServiceDiscovery) AddServiceAddrsSwitchedCallback(callbacks ...func()) { + c.membersChangedCbs = append(c.membersChangedCbs, callbacks...) } -// AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called -// when any global/local tso allocator leader is switched. -func (c *pdBaseClient) AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) { - c.tsoAllocatorLeaderSwitchedCallback = append(c.tsoAllocatorLeaderSwitchedCallback, callbacks...) +// SetTSOLocalServAddrsUpdatedCallback adds a callback which will be called when the local tso +// allocator leader list is updated. +func (c *pdServiceDiscovery) SetTSOLocalServAddrsUpdatedCallback(callback tsoLocalServAddrsUpdatedFunc) { + c.tsoLocalAllocLeadersUpdatedCb = callback +} + +// SetTSOGlobalServAddrUpdatedCallback adds a callback which will be called when the global tso +// allocator leader is updated. +func (c *pdServiceDiscovery) SetTSOGlobalServAddrUpdatedCallback(callback tsoGlobalServAddrUpdatedFunc) { + c.tsoGlobalAllocLeaderUpdatedCb = callback } // getLeaderAddr returns the leader address. -func (c *pdBaseClient) getLeaderAddr() string { +func (c *pdServiceDiscovery) getLeaderAddr() string { leaderAddr := c.leader.Load() if leaderAddr == nil { return "" @@ -288,7 +307,7 @@ func (c *pdBaseClient) getLeaderAddr() string { } // getFollowerAddrs returns the follower address. -func (c *pdBaseClient) getFollowerAddrs() []string { +func (c *pdServiceDiscovery) getFollowerAddrs() []string { followerAddrs := c.followers.Load() if followerAddrs == nil { return []string{} @@ -296,45 +315,7 @@ func (c *pdBaseClient) getFollowerAddrs() []string { return followerAddrs.([]string) } -// GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation -func (c *pdBaseClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) { - url, exist := c.tsoAllocators.Load(dcLocation) - if !exist { - return "", false - } - return url.(string), true -} - -// GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection of the given dcLocation -func (c *pdBaseClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { - url, ok := c.tsoAllocators.Load(dcLocation) - if !ok { - panic(fmt.Sprintf("the allocator leader in %s should exist", dcLocation)) - } - cc, ok := c.clientConns.Load(url) - if !ok { - panic(fmt.Sprintf("the client connection of %s in %s should exist", url, dcLocation)) - } - return cc.(*grpc.ClientConn), url.(string) -} - -func (c *pdBaseClient) gcAllocatorLeaderAddr(curAllocatorMap map[string]*pdpb.Member) { - // Clean up the old TSO allocators - c.tsoAllocators.Range(func(dcLocationKey, _ interface{}) bool { - dcLocation := dcLocationKey.(string) - // Skip the Global TSO Allocator - if dcLocation == globalDCLocation { - return true - } - if _, exist := curAllocatorMap[dcLocation]; !exist { - log.Info("[pd] delete unused tso allocator", zap.String("dc-location", dcLocation)) - c.tsoAllocators.Delete(dcLocation) - } - return true - }) -} - -func (c *pdBaseClient) initClusterID() error { +func (c *pdServiceDiscovery) initClusterID() error { ctx, cancel := context.WithCancel(c.ctx) defer cancel() var clusterID uint64 @@ -364,7 +345,7 @@ func (c *pdBaseClient) initClusterID() error { return nil } -func (c *pdBaseClient) updateMember() error { +func (c *pdServiceDiscovery) updateMember() error { for i, u := range c.GetURLs() { failpoint.Inject("skipFirstUpdateMember", func() { if i == 0 { @@ -383,7 +364,7 @@ func (c *pdBaseClient) updateMember() error { err = errs.ErrClientGetLeader.FastGenByArgs("leader address don't exist") } // Still need to update TsoAllocatorLeaders, even if there is no PD leader - errTSO = c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders()) + errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders()) } // Failed to get members @@ -404,10 +385,6 @@ func (c *pdBaseClient) updateMember() error { if err := c.switchLeader(members.GetLeader().GetClientUrls()); err != nil { return err } - // Run callbacks to refelect any change in the local/global tso allocator. - for _, cb := range c.tsoAllocatorLeaderSwitchedCallback { - cb() - } // If `switchLeader` succeeds but `switchTSOAllocatorLeader` has an error, // the error of `switchTSOAllocatorLeader` will be returned. @@ -416,7 +393,7 @@ func (c *pdBaseClient) updateMember() error { return errs.ErrClientGetMember.FastGenByArgs(c.GetURLs()) } -func (c *pdBaseClient) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) { +func (c *pdServiceDiscovery) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() cc, err := c.GetOrCreateGRPCConn(url) @@ -435,7 +412,7 @@ func (c *pdBaseClient) getMembers(ctx context.Context, url string, timeout time. return members, nil } -func (c *pdBaseClient) updateURLs(members []*pdpb.Member) { +func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) { urls := make([]string, 0, len(members)) for _, m := range members { urls = append(urls, m.GetClientUrls()...) @@ -451,14 +428,14 @@ func (c *pdBaseClient) updateURLs(members []*pdpb.Member) { // Update the connection contexts when member changes if TSO Follower Proxy is enabled. if c.option.getEnableTSOFollowerProxy() { // Run callbacks to refelect the membership changes in the leader and followers. - for _, cb := range c.membersChangedCallbacks { + for _, cb := range c.membersChangedCbs { cb() } } log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) } -func (c *pdBaseClient) switchLeader(addrs []string) error { +func (c *pdServiceDiscovery) switchLeader(addrs []string) error { // FIXME: How to safely compare leader urls? For now, only allows one client url. addr := addrs[0] oldLeader := c.getLeaderAddr() @@ -472,16 +449,20 @@ func (c *pdBaseClient) switchLeader(addrs []string) error { } // Set PD leader and Global TSO Allocator (which is also the PD leader) c.leader.Store(addr) - c.tsoAllocators.Store(globalDCLocation, addr) // Run callbacks - for _, cb := range c.leaderSwitchedCallbacks { + if c.tsoGlobalAllocLeaderUpdatedCb != nil { + if err := c.tsoGlobalAllocLeaderUpdatedCb(addr); err != nil { + return err + } + } + for _, cb := range c.leaderSwitchedCbs { cb() } log.Info("[pd] switch leader", zap.String("new-leader", addr), zap.String("old-leader", oldLeader)) return nil } -func (c *pdBaseClient) updateFollowers(members []*pdpb.Member, leader *pdpb.Member) { +func (c *pdServiceDiscovery) updateFollowers(members []*pdpb.Member, leader *pdpb.Member) { var addrs []string for _, member := range members { if member.GetMemberId() != leader.GetMemberId() { @@ -493,39 +474,31 @@ func (c *pdBaseClient) updateFollowers(members []*pdpb.Member, leader *pdpb.Memb c.followers.Store(addrs) } -func (c *pdBaseClient) switchTSOAllocatorLeader(allocatorMap map[string]*pdpb.Member) error { +func (c *pdServiceDiscovery) switchTSOAllocatorLeaders(allocatorMap map[string]*pdpb.Member) error { if len(allocatorMap) == 0 { return nil } + + allocMap := make(map[string]string) // Switch to the new one for dcLocation, member := range allocatorMap { if len(member.GetClientUrls()) == 0 { continue } - addr := member.GetClientUrls()[0] - oldAddr, exist := c.GetTSOAllocatorServingAddrByDCLocation(dcLocation) - if exist && addr == oldAddr { - continue - } - if _, err := c.GetOrCreateGRPCConn(addr); err != nil { - log.Warn("[pd] failed to connect dc tso allocator leader", - zap.String("dc-location", dcLocation), - zap.String("leader", addr), - errs.ZapError(err)) + allocMap[dcLocation] = member.GetClientUrls()[0] + } + + // Run the callback to refelect any possible change in the local tso allocators. + if c.tsoLocalAllocLeadersUpdatedCb != nil { + if err := c.tsoLocalAllocLeadersUpdatedCb(allocMap); err != nil { return err } - c.tsoAllocators.Store(dcLocation, addr) - log.Info("[pd] switch dc tso allocator leader", - zap.String("dc-location", dcLocation), - zap.String("new-leader", addr), - zap.String("old-leader", oldAddr)) } - // Garbage collection of the old TSO allocator leaders - c.gcAllocatorLeaderAddr(allocatorMap) + return nil } // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr -func (c *pdBaseClient) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { +func (c *pdServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) } diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index cd0040b228d0..fe9abcbe9b2f 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -51,7 +51,7 @@ type ResourceManagerClient interface { // resourceManagerClient gets the ResourceManager client of current PD leader. func (c *client) resourceManagerClient() rmpb.ResourceManagerClient { - if cc, err := c.bc.GetOrCreateGRPCConn(c.GetLeaderAddr()); err == nil { + if cc, err := c.svcDiscovery.GetOrCreateGRPCConn(c.GetLeaderAddr()); err == nil { return rmpb.NewResourceManagerClient(cc) } return nil @@ -60,7 +60,7 @@ func (c *client) resourceManagerClient() rmpb.ResourceManagerClient { // gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. func (c *client) gRPCErrorHandler(err error) { if strings.Contains(err.Error(), errNotPrimary) { - c.bc.ScheduleCheckMemberChanged() + c.svcDiscovery.ScheduleCheckMemberChanged() } } @@ -303,7 +303,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb // If the stream is still nil, return an error. if stream == nil { firstRequest.done <- errors.Errorf("failed to get the stream connection") - c.bc.ScheduleCheckMemberChanged() + c.svcDiscovery.ScheduleCheckMemberChanged() connection.reset() continue } @@ -315,7 +315,7 @@ func (c *client) handleResourceTokenDispatcher(dispatcherCtx context.Context, tb default: } if err = c.processTokenRequests(stream, firstRequest); err != nil { - c.bc.ScheduleCheckMemberChanged() + c.svcDiscovery.ScheduleCheckMemberChanged() connection.reset() log.Info("[resource_manager] token request error", zap.Error(err)) } diff --git a/client/tso_client.go b/client/tso_client.go index 50ac5d5194ea..871a8da48475 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -17,222 +17,131 @@ package pd import ( "context" "fmt" - "path" + "math/rand" "sync" - "sync/atomic" "time" - "github.com/gogo/protobuf/proto" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" - "github.com/tikv/pd/client/grpcutil" - "github.com/tikv/pd/client/tlsutil" "go.uber.org/zap" "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -// TSOClient manages resource group info and token request. +// TSOClient defines basic interface of the TSO client +// For test only type TSOClient interface { - // GetTSWithinKeyspace gets a timestamp within the given keyspace from the TSO service - GetTSWithinKeyspace(ctx context.Context, keyspaceID uint32) (int64, int64, error) - // GetTSWithinKeyspaceAsync gets a timestamp within the given keyspace from the TSO service, - // without block the caller. - GetTSWithinKeyspaceAsync(ctx context.Context, keyspaceID uint32) TSFuture - // GetLocalTSWithinKeyspace gets a local timestamp within the given keyspace from the TSO service - GetLocalTSWithinKeyspace(ctx context.Context, dcLocation string, keyspaceID uint32) (int64, int64, error) - // GetLocalTSWithinKeyspaceAsync gets a local timestamp within the given keyspace from the TSO service, - // without block the caller. - GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation string, keyspaceID uint32) TSFuture + // GetTSOAllocators returns {dc-location -> TSO allocator serving URL} connection map + GetTSOAllocators() *sync.Map } -// GetTSWithinKeyspace gets a timestamp within the given keyspace from the TSO service -// TODO: Refactor and share the TSO streaming framework in the PD client. The implementation -// here is in a basic manner and only for testing and integration purpose -- no batching, -// no async, no pooling, no forwarding, no retry and no deliberate error handling. -func (c *client) GetTSWithinKeyspace(ctx context.Context, keyspaceID uint32) (physical int64, logical int64, err error) { - resp := c.GetTSWithinKeyspaceAsync(ctx, keyspaceID) - return resp.Wait() -} - -// GetLocalTSWithinKeyspace gets a local timestamp within the given keyspace from the TSO service -func (c *client) GetLocalTSWithinKeyspace(ctx context.Context, dcLocation string, keyspaceID uint32) (physical int64, logical int64, err error) { - resp := c.GetLocalTSWithinKeyspaceAsync(ctx, dcLocation, keyspaceID) - return resp.Wait() -} - -// GetTSWithinKeyspaceAsync gets a timestamp within the given keyspace from the TSO service, -// without block the caller. -func (c *client) GetTSWithinKeyspaceAsync(ctx context.Context, keyspaceID uint32) TSFuture { - return c.GetLocalTSWithinKeyspaceAsync(ctx, globalDCLocation, keyspaceID) +type tsoRequest struct { + start time.Time + clientCtx context.Context + requestCtx context.Context + done chan error + physical int64 + logical int64 + keyspaceID uint32 + dcLocation string } -// GetLocalTSWithinKeyspaceAsync gets a local timestamp within the given keyspace from the TSO service, -// without block the caller. -// TODO: implement the following API -func (c *client) GetLocalTSWithinKeyspaceAsync(ctx context.Context, dcLocation string, keyspaceID uint32) TSFuture { - if span := opentracing.SpanFromContext(ctx); span != nil { - span = opentracing.StartSpan("GetLocalTSWithinKeyspaceAsync", opentracing.ChildOf(span.Context())) - ctx = opentracing.ContextWithSpan(ctx, span) - } - req := tsoReqPool.Get().(*tsoRequest) - req.requestCtx = ctx - req.clientCtx = c.ctx - req.start = time.Now() - req.dcLocation = dcLocation - req.keyspaceID = keyspaceID - if err := c.dispatchRequest(dcLocation, req); err != nil { - // Wait for a while and try again - time.Sleep(50 * time.Millisecond) - if err = c.dispatchRequest(dcLocation, req); err != nil { - req.done <- err +var tsoReqPool = sync.Pool{ + New: func() interface{} { + return &tsoRequest{ + done: make(chan error, 1), + physical: 0, + logical: 0, } - } - return req + }, } -const ( - // tsoPrimaryPrefix defines the key prefix for keyspace group primary election. - // The entire key is in the format of "/ms//tso//primary" in which - // is 5 digits integer with leading zeros. For now we use 0 as the default cluster id. - tsoPrimaryPrefix = "/ms/0/tso" -) - -var _ BaseClient = (*tsoMcsClient)(nil) - -// tsoMcsClient is the service discovery client of TSO microservice which is primary/standby configured -type tsoMcsClient struct { - keyspaceID uint32 - // primary key is the etcd path used for discoverying the serving endpoint of this keyspace - primaryKey string - urls atomic.Value // Store as []string - // TSO Primary URL - primary atomic.Value // Store as string - // TSO Secondary URLs - secondaries atomic.Value // Store as []string - metacli MetaStorageClient - - // addr -> a gRPC connection - clientConns sync.Map // Store as map[string]*grpc.ClientConn - // dc-location -> TSO allocator primary URL - tsoAllocators sync.Map // Store as map[string]string - - // primarySwitchedCallbacks will be called after the primary swichted - primarySwitchedCallbacks []func() - // membersChangedCallbacks will be called after there is any membership - // change in the primary and followers - membersChangedCallbacks []func() - // tsoAllocatorLeaderSwitchedCallback will be called when any keyspace group tso - // allocator primary is switched. - tsoAllocatorLeaderSwitchedCallback []func() - - checkMembershipCh chan struct{} - - wg *sync.WaitGroup +type tsoClient struct { ctx context.Context cancel context.CancelFunc - - tlsCfg *tlsutil.TLSConfig - - // Client option. + wg *sync.WaitGroup option *option + + keyspaceID uint32 + svcDiscovery ServiceDiscovery + tsoStreamBuilderFactory + // tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL} + tsoAllocators sync.Map // Store as map[string]string + // tsoAllocServingAddrSwitchedCallback will be called when any global/local + // tso allocator leader is switched. + tsoAllocServingAddrSwitchedCallback []func() + + // tsoDispatcher is used to dispatch different TSO requests to + // the corresponding dc-location TSO channel. + tsoDispatcher sync.Map // Same as map[string]chan *tsoRequest + // dc-location -> deadline + tsDeadline sync.Map // Same as map[string]chan deadline + // dc-location -> *lastTSO + lastTSMap sync.Map // Same as map[string]*lastTSO + + checkTSDeadlineCh chan struct{} + checkTSODispatcherCh chan struct{} + updateTSOConnectionCtxsCh chan struct{} } -// newTSOMcsClient returns a new BaseClient of a TSO microservice. -func newTSOMcsClient(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, metacli MetaStorageClient, - keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) BaseClient { - bc := &tsoMcsClient{ - ctx: ctx, - cancel: cancel, - wg: wg, - metacli: metacli, - keyspaceID: keyspaceID, - primaryKey: path.Join(tsoPrimaryPrefix, fmt.Sprintf("%05d", 0), "primary"), - tlsCfg: tlsCfg, - option: option, - checkMembershipCh: make(chan struct{}, 1), +// newTSOClient returns a new TSO client. +func newTSOClient(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, option *option, + keyspaceID uint32, svcDiscovery ServiceDiscovery, eventSrc tsoAllocatorEventSource, factory tsoStreamBuilderFactory) *tsoClient { + c := &tsoClient{ + ctx: ctx, + cancel: cancel, + wg: wg, + option: option, + keyspaceID: keyspaceID, + svcDiscovery: svcDiscovery, + tsoStreamBuilderFactory: factory, + checkTSDeadlineCh: make(chan struct{}), + checkTSODispatcherCh: make(chan struct{}, 1), + updateTSOConnectionCtxsCh: make(chan struct{}, 1), } - bc.urls.Store(urls) - return bc -} + eventSrc.SetTSOLocalServAddrsUpdatedCallback(c.updateTSOLocalServAddrs) + eventSrc.SetTSOGlobalServAddrUpdatedCallback(c.updateTSOGlobalServAddr) + c.svcDiscovery.AddServiceAddrsSwitchedCallback(c.scheduleUpdateTSOConnectionCtxs) -// Init initialize the concrete client underlying -func (c *tsoMcsClient) Init() error { - if err := c.initRetry(c.updateMember); err != nil { - c.cancel() - return err - } - c.wg.Add(1) - go c.startCheckMemberLoop() - return nil + return c } -func (c *tsoMcsClient) initRetry(f func() error) error { - var err error - for i := 0; i < c.option.maxRetryTimes; i++ { - if err = f(); err == nil { - return nil - } - select { - case <-c.ctx.Done(): - return err - case <-time.After(time.Second): - } +func (c *tsoClient) setup() error { + if err := c.svcDiscovery.Init(); err != nil { + return err } - return errors.WithStack(err) -} -func (c *tsoMcsClient) startCheckMemberLoop() { - defer c.wg.Done() - - ctx, cancel := context.WithCancel(c.ctx) - defer cancel() - - for { - select { - case <-c.checkMembershipCh: - case <-time.After(memberUpdateInterval): - case <-ctx.Done(): - return - } - if err := c.updateMember(); err != nil { - log.Error("[pd(tso)] failed to update member", errs.ZapError(err)) - } - } + // Start the daemons. + c.wg.Add(2) + go c.tsoDispatcherCheckLoop() + go c.tsCancelLoop() + return nil } -// Close releases all resources -func (c *tsoMcsClient) Close() { - c.clientConns.Range(func(key, cc interface{}) bool { - if err := cc.(*grpc.ClientConn).Close(); err != nil { - log.Error("[pd(tso)] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) +// Close closes the TSO client +func (c *tsoClient) Close() { + log.Info("close tso client") + c.tsoDispatcher.Range(func(_, dispatcherInterface interface{}) bool { + if dispatcherInterface != nil { + dispatcher := dispatcherInterface.(*tsoDispatcher) + tsoErr := errors.WithStack(errClosing) + dispatcher.tsoBatchController.revokePendingTSORequest(tsoErr) + dispatcher.dispatcherCancel() } - c.clientConns.Delete(key) return true }) + c.svcDiscovery.Close() } -// GetClusterID returns the ID of the cluster -func (c *tsoMcsClient) GetClusterID(context.Context) uint64 { - return 0 -} - -// GetURLs returns the URLs of the servers. -// For testing use. It should only be called when the client is closed. -func (c *tsoMcsClient) GetURLs() []string { - return c.urls.Load().([]string) -} - -// GetTSOAllocators returns {dc-location -> TSO allocator primary URL} connection map -func (c *tsoMcsClient) GetTSOAllocators() *sync.Map { +// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map +func (c *tsoClient) GetTSOAllocators() *sync.Map { return &c.tsoAllocators } // GetTSOAllocatorServingAddrByDCLocation returns the tso allocator of the given dcLocation -func (c *tsoMcsClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) { +func (c *tsoClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) (string, bool) { url, exist := c.tsoAllocators.Load(dcLocation) if !exist { return "", false @@ -242,136 +151,113 @@ func (c *tsoMcsClient) GetTSOAllocatorServingAddrByDCLocation(dcLocation string) // GetTSOAllocatorClientConnByDCLocation returns the tso allocator grpc client connection // of the given dcLocation -func (c *tsoMcsClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { +func (c *tsoClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*grpc.ClientConn, string) { url, ok := c.tsoAllocators.Load(dcLocation) if !ok { panic(fmt.Sprintf("the allocator leader in %s should exist", dcLocation)) } - cc, ok := c.clientConns.Load(url) + cc, ok := c.svcDiscovery.GetClientConns().Load(url) if !ok { panic(fmt.Sprintf("the client connection of %s in %s should exist", url, dcLocation)) } return cc.(*grpc.ClientConn), url.(string) } -// GetServingAddr returns the grpc client connection of the serving endpoint -// which is the primary in a primary/secondy configured cluster. -func (c *tsoMcsClient) GetServingEndpointClientConn() *grpc.ClientConn { - if cc, ok := c.clientConns.Load(c.getPrimaryAddr()); ok { - return cc.(*grpc.ClientConn) - } - return nil -} - -// GetServingAddr returns the serving endpoint which is the primary in a -// primary/secondy configured cluster. -func (c *tsoMcsClient) GetServingAddr() string { - return c.getPrimaryAddr() -} - -// GetBackupAddrs gets the addresses of the current reachable and healthy -// backup service endpoints randomly. Backup service endpoints are secondaries in -// a primary/secondary configured cluster. -func (c *tsoMcsClient) GetBackupAddrs() []string { - return c.getSecondaryAddrs() -} - -// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr. -func (c *tsoMcsClient) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { - return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) +// AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called +// when any global/local tso allocator service endpoint is switched. +func (c *tsoClient) AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) { + c.tsoAllocServingAddrSwitchedCallback = append(c.tsoAllocServingAddrSwitchedCallback, callbacks...) } -// ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in ervice endpoints. -func (c *tsoMcsClient) ScheduleCheckMemberChanged() { - select { - case c.checkMembershipCh <- struct{}{}: - default: +func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) error { + if len(allocatorMap) == 0 { + return nil } -} -// Immediately checkif there is any membership change among the primary/secondaries in -// a primary/secondy configured cluster. -func (c *tsoMcsClient) CheckMemberChanged() error { - return c.updateMember() -} + updated := false -// AddServingAddrSwitchedCallback adds callbacks which will be called when the primary in -// a primary/secondary configured cluster is switched. -func (c *tsoMcsClient) AddServingAddrSwitchedCallback(callbacks ...func()) { - c.primarySwitchedCallbacks = append(c.primarySwitchedCallbacks, callbacks...) -} + // Switch to the new one + for dcLocation, addr := range allocatorMap { + if len(addr) == 0 { + continue + } + oldAddr, exist := c.GetTSOAllocatorServingAddrByDCLocation(dcLocation) + if exist && addr == oldAddr { + continue + } + updated = true + if _, err := c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { + log.Warn("[tso] failed to connect dc tso allocator serving address", + zap.String("dc-location", dcLocation), + zap.String("serving-address", addr), + errs.ZapError(err)) + return err + } + c.tsoAllocators.Store(dcLocation, addr) + log.Info("[tso] switch dc tso allocator serving address", + zap.String("dc-location", dcLocation), + zap.String("new-address", addr), + zap.String("old-address", oldAddr)) + } -// AddServiceAddrsSwitchedCallback adds callbacks which will be called when any primary/secondary -// in a primary/secondary configured cluster is changed. -func (c *tsoMcsClient) AddServiceAddrsSwitchedCallback(callbacks ...func()) { - c.membersChangedCallbacks = append(c.membersChangedCallbacks, callbacks...) -} + // Garbage collection of the old TSO allocator primaries + c.gcAllocatorServingAddr(allocatorMap) -// AddTSOAllocatorServingAddrSwitchedCallback adds callbacks which will be called -// when any keyspace group tso allocator primary is switched. -func (c *tsoMcsClient) AddTSOAllocatorServingAddrSwitchedCallback(callbacks ...func()) { - c.tsoAllocatorLeaderSwitchedCallback = append(c.tsoAllocatorLeaderSwitchedCallback, callbacks...) -} - -// getPrimaryAddr returns the primary address. -func (c *tsoMcsClient) getPrimaryAddr() string { - primaryAddr := c.primary.Load() - if primaryAddr == nil { - return "" + if updated { + c.scheduleCheckTSODispatcher() } - return primaryAddr.(string) -} -// getSecondaryAddrs returns the secondary addresses. -func (c *tsoMcsClient) getSecondaryAddrs() []string { - secondaryAddrs := c.secondaries.Load() - if secondaryAddrs == nil { - return []string{} - } - return secondaryAddrs.([]string) + return nil } -func (c *tsoMcsClient) switchPrimary(addrs []string) error { - // FIXME: How to safely compare primary urls? For now, only allows one client url. - addr := addrs[0] - oldPrimary := c.getPrimaryAddr() - if addr == oldPrimary { - return nil - } - - if _, err := c.GetOrCreateGRPCConn(addr); err != nil { - log.Warn("[pd(tso)] failed to connect primary", zap.String("primary", addr), errs.ZapError(err)) - return err - } - // Set PD primary and Global TSO Allocator (which is also the PD primary) - c.primary.Store(addr) +func (c *tsoClient) updateTSOGlobalServAddr(addr string) error { c.tsoAllocators.Store(globalDCLocation, addr) - // Run callbacks - for _, cb := range c.primarySwitchedCallbacks { - cb() - } - log.Info("[pd(tso)] switch primary", zap.String("new-primary", addr), zap.String("old-primary", oldPrimary)) + log.Info("[tso] switch dc tso allocator serving address", + zap.String("dc-location", globalDCLocation), + zap.String("new-address", addr)) + c.scheduleCheckTSODispatcher() return nil } -func (c *tsoMcsClient) updateMember() error { - resp, err := c.metacli.Get(c.ctx, []byte(c.primaryKey)) - if err != nil { - log.Error("[pd(tso)] failed to get the keyspace serving endpoint", errs.ZapError(err)) - return err - } +func (c *tsoClient) gcAllocatorServingAddr(curAllocatorMap map[string]string) { + // Clean up the old TSO allocators + c.tsoAllocators.Range(func(dcLocationKey, _ interface{}) bool { + dcLocation := dcLocationKey.(string) + // Skip the Global TSO Allocator + if dcLocation == globalDCLocation { + return true + } + if _, exist := curAllocatorMap[dcLocation]; !exist { + log.Info("[tso] delete unused tso allocator", zap.String("dc-location", dcLocation)) + c.tsoAllocators.Delete(dcLocation) + } + return true + }) +} - if resp == nil || len(resp.Kvs) == 0 { - log.Error("[pd(tso)] didn't find the keyspace serving endpoint") - return errs.ErrClientGetLeader - } else if resp.Count > 1 { - return errs.ErrClientGetMultiResponse.FastGenByArgs(resp.Kvs) +// backupClientConn gets a grpc client connection of the current reachable and healthy +// backup service endpoints randomly. Backup service endpoints are followers in a +// quorum-based cluster or secondaries in a primary/secondary configured cluster. +func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { + addrs := c.svcDiscovery.GetBackupAddrs() + if len(addrs) < 1 { + return nil, "" } - - value := resp.Kvs[0].Value - member := &pdpb.Member{} - if err := proto.Unmarshal(value, member); err != nil { - return errs.ErrClientProtoUnmarshal.Wrap(err).GenWithStackByCause() + var ( + cc *grpc.ClientConn + err error + ) + for i := 0; i < len(addrs); i++ { + addr := addrs[rand.Intn(len(addrs))] + if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { + continue + } + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) + healthCancel() + if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { + return cc, addr + } } - return c.switchPrimary(addrsToUrls([]string{member.Name})) + return nil, "" } diff --git a/client/tso_request_dispatcher.go b/client/tso_dispatcher.go similarity index 81% rename from client/tso_request_dispatcher.go rename to client/tso_dispatcher.go index dcc664aa77b9..84c13a20fb67 100644 --- a/client/tso_request_dispatcher.go +++ b/client/tso_dispatcher.go @@ -53,26 +53,26 @@ const ( maxRetryTimes = 6 ) -func (c *client) scheduleCheckTSODispatcher() { +func (c *tsoClient) scheduleCheckTSODispatcher() { select { case c.checkTSODispatcherCh <- struct{}{}: default: } } -func (c *client) scheduleUpdateTSOConnectionCtxs() { +func (c *tsoClient) scheduleUpdateTSOConnectionCtxs() { select { case c.updateTSOConnectionCtxsCh <- struct{}{}: default: } } -func (c *client) dispatchRequest(dcLocation string, request *tsoRequest) error { +func (c *tsoClient) dispatchRequest(dcLocation string, request *tsoRequest) error { dispatcher, ok := c.tsoDispatcher.Load(dcLocation) if !ok { err := errs.ErrClientGetTSO.FastGenByArgs(fmt.Sprintf("unknown dc-location %s to the client", dcLocation)) - log.Error("[pd/tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err)) - c.tsobc.ScheduleCheckMemberChanged() + log.Error("[tso] dispatch tso request error", zap.String("dc-location", dcLocation), errs.ZapError(err)) + c.svcDiscovery.ScheduleCheckMemberChanged() return err } dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request @@ -110,9 +110,9 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { } } -func (c *client) updateTSODispatcher() { +func (c *tsoClient) updateTSODispatcher() { // Set up the new TSO dispatcher and batch controller. - c.tsobc.GetTSOAllocators().Range(func(dcLocationKey, _ interface{}) bool { + c.GetTSOAllocators().Range(func(dcLocationKey, _ interface{}) bool { dcLocation := dcLocationKey.(string) if !c.checkTSODispatcher(dcLocation) { c.createTSODispatcher(dcLocation) @@ -120,14 +120,14 @@ func (c *client) updateTSODispatcher() { return true }) // Clean up the unused TSO dispatcher - c.tsoDispatcher.Range(func(dcLocationKey, _ interface{}) bool { + c.tsoDispatcher.Range(func(dcLocationKey, dispatcher interface{}) bool { dcLocation := dcLocationKey.(string) // Skip the Global TSO Allocator if dcLocation == globalDCLocation { return true } - if dispatcher, exist := c.tsobc.GetTSOAllocators().Load(dcLocation); !exist { - log.Info("[pd/tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) + if _, exist := c.GetTSOAllocators().Load(dcLocation); !exist { + log.Info("[tso] delete unused tso dispatcher", zap.String("dc-location", dcLocation)) dispatcher.(*tsoDispatcher).dispatcherCancel() c.tsoDispatcher.Delete(dcLocation) } @@ -141,7 +141,7 @@ type deadline struct { cancel context.CancelFunc } -func (c *client) tsCancelLoop() { +func (c *tsoClient) tsCancelLoop() { defer c.wg.Done() tsCancelLoopCtx, tsCancelLoopCancel := context.WithCancel(c.ctx) @@ -151,7 +151,7 @@ func (c *client) tsCancelLoop() { defer ticker.Stop() for { // Watch every dc-location's tsDeadlineCh - c.tsobc.GetTSOAllocators().Range(func(dcLocation, _ interface{}) bool { + c.GetTSOAllocators().Range(func(dcLocation, _ interface{}) bool { c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string)) return true }) @@ -166,7 +166,7 @@ func (c *client) tsCancelLoop() { } } -func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) { +func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { if _, exist := c.tsDeadline.Load(dcLocation); !exist { tsDeadlineCh := make(chan deadline, 1) c.tsDeadline.Store(dcLocation, tsDeadlineCh) @@ -176,7 +176,7 @@ func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) { case d := <-tsDeadlineCh: select { case <-d.timer: - log.Error("[pd/tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) + log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() case <-d.done: continue @@ -191,14 +191,14 @@ func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) { } } -func (c *client) scheduleCheckTSDeadline() { +func (c *tsoClient) scheduleCheckTSDeadline() { select { case c.checkTSDeadlineCh <- struct{}{}: default: } } -func (c *client) tsLoop() { +func (c *tsoClient) tsoDispatcherCheckLoop() { defer c.wg.Done() loopCtx, loopCancel := context.WithCancel(c.ctx) @@ -212,12 +212,13 @@ func (c *client) tsLoop() { case <-ticker.C: case <-c.checkTSODispatcherCh: case <-loopCtx.Done(): + log.Info("exit tso dispacther loop") return } } } -func (c *client) checkAllocator( +func (c *tsoClient) checkAllocator( dispatcherCtx context.Context, forwardCancel context.CancelFunc, dc, forwardedHostTrim, addrTrim, url string, @@ -227,12 +228,12 @@ func (c *client) checkAllocator( forwardCancel() requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(0) }() - cc, u := c.tsobc.GetTSOAllocatorClientConnByDCLocation(dc) + cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc) healthCli := healthpb.NewHealthClient(cc) for { // the pd/allocator leader change, we need to re-establish the stream if u != url { - log.Info("[pd/tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) + log.Info("[tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) return } healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout) @@ -246,7 +247,7 @@ func (c *client) checkAllocator( cctx, cancel := context.WithCancel(dispatcherCtx) stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) if err == nil && stream != nil { - log.Info("[pd/tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) + log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url)) updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel}) return } @@ -257,12 +258,12 @@ func (c *client) checkAllocator( case <-time.After(time.Second): // To ensure we can get the latest allocator leader // and once the leader is changed, we can exit this function. - _, u = c.tsobc.GetTSOAllocatorClientConnByDCLocation(dc) + _, u = c.GetTSOAllocatorClientConnByDCLocation(dc) } } } -func (c *client) checkTSODispatcher(dcLocation string) bool { +func (c *tsoClient) checkTSODispatcher(dcLocation string) bool { dispatcher, ok := c.tsoDispatcher.Load(dcLocation) if !ok || dispatcher == nil { return false @@ -270,7 +271,7 @@ func (c *client) checkTSODispatcher(dcLocation string) bool { return true } -func (c *client) createTSODispatcher(dcLocation string) { +func (c *tsoClient) createTSODispatcher(dcLocation string) { dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) dispatcher := &tsoDispatcher{ dispatcherCancel: dispatcherCancel, @@ -278,16 +279,22 @@ func (c *client) createTSODispatcher(dcLocation string) { make(chan *tsoRequest, defaultMaxTSOBatchSize*2), defaultMaxTSOBatchSize), } - // Each goroutine is responsible for handling the tso stream request for its dc-location. - // The only case that will make the dispatcher goroutine exit - // is that the loopCtx is done, otherwise there is no circumstance - // this goroutine should exit. - go c.handleDispatcher(dispatcherCtx, dcLocation, dispatcher.tsoBatchController) - c.tsoDispatcher.Store(dcLocation, dispatcher) - log.Info("[pd/tso] tso dispatcher created", zap.String("dc-location", dcLocation)) + + if _, ok := c.tsoDispatcher.LoadOrStore(dcLocation, dispatcher); !ok { + // Successfully stored the value. Start the following goroutine. + // Each goroutine is responsible for handling the tso stream request for its dc-location. + // The only case that will make the dispatcher goroutine exit + // is that the loopCtx is done, otherwise there is no circumstance + // this goroutine should exit. + c.wg.Add(1) + go c.handleDispatcher(dispatcherCtx, dcLocation, dispatcher.tsoBatchController) + log.Info("[tso] tso dispatcher created", zap.String("dc-location", dcLocation)) + } else { + dispatcherCancel() + } } -func (c *client) handleDispatcher( +func (c *tsoClient) handleDispatcher( dispatcherCtx context.Context, dc string, tbc *tsoBatchController) { @@ -302,12 +309,13 @@ func (c *client) handleDispatcher( opts []opentracing.StartSpanOption ) defer func() { - log.Info("[pd/tso] exit tso dispatcher", zap.String("dc-location", dc)) + log.Info("[tso] exit tso dispatcher", zap.String("dc-location", dc)) // Cancel all connections. connectionCtxs.Range(func(_, cc interface{}) bool { cc.(*tsoConnectionContext).cancel() return true }) + c.wg.Done() }() // Call updateTSOConnectionCtxs once to init the connectionCtxs first. c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) @@ -365,10 +373,10 @@ tsoBatchLoop: maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { if err == context.Canceled { - log.Info("[pd/tso] stop fetching the pending tso requests due to context canceled", + log.Info("[tso] stop fetching the pending tso requests due to context canceled", zap.String("dc-location", dc)) } else { - log.Error("[pd/tso] fetch pending tso requests error", + log.Error("[tso] fetch pending tso requests error", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSO, err)) } return @@ -386,7 +394,7 @@ tsoBatchLoop: } // Check stream and retry if necessary. if stream == nil { - log.Info("[pd/tso] tso stream is not ready", zap.String("dc", dc)) + log.Info("[tso] tso stream is not ready", zap.String("dc", dc)) if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) { continue streamChoosingLoop } @@ -395,8 +403,8 @@ tsoBatchLoop: return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) - log.Error("[pd/tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) - c.tsobc.ScheduleCheckMemberChanged() + log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) + c.svcDiscovery.ScheduleCheckMemberChanged() c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) continue tsoBatchLoop case <-time.After(retryInterval): @@ -405,7 +413,7 @@ tsoBatchLoop: } select { case <-streamCtx.Done(): - log.Info("[pd/tso] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr)) + log.Info("[tso] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr)) // Set `stream` to nil and remove this stream from the `connectionCtxs` due to being canceled. connectionCtxs.Delete(streamAddr) cancel() @@ -442,15 +450,15 @@ tsoBatchLoop: return default: } - c.tsobc.ScheduleCheckMemberChanged() - log.Error("[pd/tso] getTS error", zap.String("dc-location", dc), zap.String("stream-addr", streamAddr), errs.ZapError(errs.ErrClientGetTSO, err)) + c.svcDiscovery.ScheduleCheckMemberChanged() + log.Error("[tso] getTS error", zap.String("dc-location", dc), zap.String("stream-addr", streamAddr), errs.ZapError(errs.ErrClientGetTSO, err)) // Set `stream` to nil and remove this stream from the `connectionCtxs` due to error. connectionCtxs.Delete(streamAddr) cancel() stream = nil // Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP. if IsLeaderChange(err) { - if err := c.tsobc.CheckMemberChanged(); err != nil { + if err := c.svcDiscovery.CheckMemberChanged(); err != nil { select { case <-dispatcherCtx.Done(): return @@ -469,13 +477,13 @@ tsoBatchLoop: } // TSO Follower Proxy only supports the Global TSO proxy now. -func (c *client) allowTSOFollowerProxy(dc string) bool { +func (c *tsoClient) allowTSOFollowerProxy(dc string) bool { return dc == globalDCLocation && c.option.getEnableTSOFollowerProxy() } // chooseStream uses the reservoir sampling algorithm to randomly choose a connection. // connectionCtxs will only have only one stream to choose when the TSO Follower Proxy is off. -func (c *client) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { +func (c *tsoClient) chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext) { idx := 0 connectionCtxs.Range(func(_, cc interface{}) bool { j := rand.Intn(idx + 1) @@ -497,14 +505,14 @@ type tsoConnectionContext struct { cancel context.CancelFunc } -func (c *client) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { +func (c *tsoClient) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool { // Normal connection creating, it will be affected by the `enableForwarding`. createTSOConnection := c.tryConnectToTSO if c.allowTSOFollowerProxy(dc) { createTSOConnection = c.tryConnectToTSOWithProxy } if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil { - log.Error("[pd/tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) + log.Error("[tso] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err)) return false } return true @@ -514,7 +522,7 @@ func (c *client) updateTSOConnectionCtxs(updaterCtx context.Context, dc string, // and enableForwarding is true, it will create a new connection to a follower to do the forwarding, // while a new daemon will be created also to switch back to a normal leader connection ASAP the // connection comes back to normal. -func (c *client) tryConnectToTSO( +func (c *tsoClient) tryConnectToTSO( dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map, @@ -543,8 +551,8 @@ func (c *client) tryConnectToTSO( // retry several times before falling back to the follower when the network problem happens for i := 0; i < maxRetryTimes; i++ { - c.tsobc.ScheduleCheckMemberChanged() - cc, url = c.tsobc.GetTSOAllocatorClientConnByDCLocation(dc) + c.svcDiscovery.ScheduleCheckMemberChanged() + cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc) cctx, cancel := context.WithCancel(dispatcherCtx) stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) failpoint.Inject("unreachableNetwork", func() { @@ -579,8 +587,8 @@ func (c *client) tryConnectToTSO( // encounter the network error backupClientConn, addr := c.backupClientConn() if backupClientConn != nil { - log.Info("[pd/tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("addr", addr)) - forwardedHost, ok := c.tsobc.GetTSOAllocatorServingAddrByDCLocation(dc) + log.Info("[tso] fall back to use follower to forward tso stream", zap.String("dc", dc), zap.String("addr", addr)) + forwardedHost, ok := c.GetTSOAllocatorServingAddrByDCLocation(dc) if !ok { return errors.Errorf("cannot find the allocator leader in %s", dc) } @@ -606,9 +614,9 @@ func (c *client) tryConnectToTSO( // getAllTSOStreamBuilders returns a TSO stream builder for every service endpoint of TSO leader/followers // or of keyspace group primary/secondaries. -func (c *client) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { +func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { var ( - addrs = c.tsobc.GetURLs() + addrs = c.svcDiscovery.GetURLs() streamBuilders = make(map[string]tsoStreamBuilder, len(addrs)) cc *grpc.ClientConn err error @@ -617,7 +625,7 @@ func (c *client) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { if len(addrs) == 0 { continue } - if cc, err = c.tsobc.GetOrCreateGRPCConn(addr); err != nil { + if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { continue } healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) @@ -632,10 +640,10 @@ func (c *client) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { // tryConnectToTSOWithProxy will create multiple streams to all the service endpoints to work as // a TSO proxy to reduce the pressure of the main serving service endpoint. -func (c *client) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { +func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc string, connectionCtxs *sync.Map) error { tsoStreamBuilders := c.getAllTSOStreamBuilders() - leaderAddr := c.GetLeaderAddr() - forwardedHost, ok := c.tsobc.GetTSOAllocatorServingAddrByDCLocation(dc) + leaderAddr := c.svcDiscovery.GetServingAddr() + forwardedHost, ok := c.GetTSOAllocatorServingAddrByDCLocation(dc) if !ok { return errors.Errorf("cannot find the allocator leader in %s", dc) } @@ -655,7 +663,7 @@ func (c *client) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc stri cctx, cancel := context.WithCancel(dispatcherCtx) // Do not proxy the leader client. if addr != leaderAddr { - log.Info("[pd/tso] use follower to forward tso stream to do the proxy", zap.String("dc", dc), zap.String("addr", addr)) + log.Info("[tso] use follower to forward tso stream to do the proxy", zap.String("dc", dc), zap.String("addr", addr)) cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) } // Create the TSO stream. @@ -669,7 +677,7 @@ func (c *client) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc stri connectionCtxs.Store(addr, &tsoConnectionContext{addr, stream, cctx, cancel}) continue } - log.Error("[pd/tso] create the tso stream failed", zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) + log.Error("[tso] create the tso stream failed", zap.String("dc", dc), zap.String("addr", addr), errs.ZapError(err)) cancel() } return nil @@ -684,7 +692,7 @@ func extractSpanReference(tbc *tsoBatchController, opts []opentracing.StartSpanO return opts } -func (c *client) processTSORequests(stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption) error { +func (c *tsoClient) processTSORequests(stream tsoStream, dcLocation string, tbc *tsoBatchController, opts []opentracing.StartSpanOption) error { if len(opts) > 0 { span := opentracing.StartSpan("pdclient.processTSORequests", opts...) defer span.Finish() @@ -692,7 +700,7 @@ func (c *client) processTSORequests(stream tsoStream, dcLocation string, tbc *ts requests := tbc.getCollectedRequests() count := int64(len(requests)) - physical, logical, suffixBits, err := stream.processRequests(c.tsobc.GetClusterID(c.ctx), dcLocation, requests, tbc.batchStartTime) + physical, logical, suffixBits, err := stream.processRequests(c.svcDiscovery.GetClusterID(c.ctx), dcLocation, requests, tbc.batchStartTime) if err != nil { c.finishTSORequest(requests, 0, 0, 0, err) return err @@ -709,7 +717,7 @@ func addLogical(logical, count int64, suffixBits uint32) int64 { return logical + count</tso//primary" in which + // is 5 digits integer with leading zeros. For now we use 0 as the default cluster id. + tsoPrimaryPrefix = "/ms/0/tso" +) + +var _ ServiceDiscovery = (*tsoServiceDiscovery)(nil) +var _ tsoAllocatorEventSource = (*tsoServiceDiscovery)(nil) + +// tsoServiceDiscovery is the service discovery client of the independent TSO service +type tsoServiceDiscovery struct { + keyspaceID uint32 + urls atomic.Value // Store as []string + // primary key is the etcd path used for discoverying the serving endpoint of this keyspace + primaryKey string + // TSO Primary URL + primary atomic.Value // Store as string + // TSO Secondary URLs + secondaries atomic.Value // Store as []string + metacli MetaStorageClient + + // addr -> a gRPC connection + clientConns sync.Map // Store as map[string]*grpc.ClientConn + + // primarySwitchedCbs will be called after the primary swichted + primarySwitchedCbs []func() + // membersChangedCbs will be called after there is any membership + // change in the primary and followers + membersChangedCbs []func() + // localAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated. + // The input is a map {DC Localtion -> Leader Addr} + localAllocPrimariesUpdatedCb tsoLocalServAddrsUpdatedFunc + // globalAllocPrimariesUpdatedCb will be called when the local tso allocator primary list is updated. + globalAllocPrimariesUpdatedCb tsoGlobalServAddrUpdatedFunc + + checkMembershipCh chan struct{} + + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + + tlsCfg *tlsutil.TLSConfig + + // Client option. + option *option +} + +// newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. +func newTSOServiceDiscovery(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, metacli MetaStorageClient, + keyspaceID uint32, urls []string, tlsCfg *tlsutil.TLSConfig, option *option) ServiceDiscovery { + bc := &tsoServiceDiscovery{ + ctx: ctx, + cancel: cancel, + wg: wg, + metacli: metacli, + keyspaceID: keyspaceID, + primaryKey: path.Join(tsoPrimaryPrefix, fmt.Sprintf("%05d", 0), "primary"), + tlsCfg: tlsCfg, + option: option, + checkMembershipCh: make(chan struct{}, 1), + } + bc.urls.Store(urls) + + return bc +} + +// Init initialize the concrete client underlying +func (c *tsoServiceDiscovery) Init() error { + if err := c.initRetry(c.updateMember); err != nil { + c.cancel() + return err + } + c.wg.Add(1) + go c.startCheckMemberLoop() + return nil +} + +func (c *tsoServiceDiscovery) initRetry(f func() error) error { + var err error + for i := 0; i < c.option.maxRetryTimes; i++ { + if err = f(); err == nil { + return nil + } + select { + case <-c.ctx.Done(): + return err + case <-time.After(time.Second): + } + } + return errors.WithStack(err) +} + +func (c *tsoServiceDiscovery) startCheckMemberLoop() { + defer c.wg.Done() + + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + + for { + select { + case <-c.checkMembershipCh: + case <-time.After(memberUpdateInterval): + case <-ctx.Done(): + return + } + if err := c.updateMember(); err != nil { + log.Error("[tso] failed to update member", errs.ZapError(err)) + } + } +} + +// Close releases all resources +func (c *tsoServiceDiscovery) Close() { + log.Info("close tso service discovery") + c.clientConns.Range(func(key, cc interface{}) bool { + if err := cc.(*grpc.ClientConn).Close(); err != nil { + log.Error("[tso] failed to close gRPC clientConn", errs.ZapError(errs.ErrCloseGRPCConn, err)) + } + c.clientConns.Delete(key) + return true + }) +} + +// GetClusterID returns the ID of the cluster +func (c *tsoServiceDiscovery) GetClusterID(context.Context) uint64 { + return 0 +} + +// GetURLs returns the URLs of the servers. +// For testing use. It should only be called when the client is closed. +func (c *tsoServiceDiscovery) GetURLs() []string { + return c.urls.Load().([]string) +} + +// GetServingAddr returns the grpc client connection of the serving endpoint +// which is the primary in a primary/secondary configured cluster. +func (c *tsoServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn { + if cc, ok := c.clientConns.Load(c.getPrimaryAddr()); ok { + return cc.(*grpc.ClientConn) + } + return nil +} + +// GetClientConns returns the mapping {addr -> a gRPC connectio} +func (c *tsoServiceDiscovery) GetClientConns() *sync.Map { + return &c.clientConns +} + +// GetServingAddr returns the serving endpoint which is the primary in a +// primary/secondary configured cluster. +func (c *tsoServiceDiscovery) GetServingAddr() string { + return c.getPrimaryAddr() +} + +// GetBackupAddrs gets the addresses of the current reachable and healthy +// backup service endpoints randomly. Backup service endpoints are secondaries in +// a primary/secondary configured cluster. +func (c *tsoServiceDiscovery) GetBackupAddrs() []string { + return c.getSecondaryAddrs() +} + +// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr. +func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, addr, c.tlsCfg, c.option.gRPCDialOptions...) +} + +// ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in ervice endpoints. +func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() { + select { + case c.checkMembershipCh <- struct{}{}: + default: + } +} + +// Immediately check if there is any membership change among the primary/secondaries in +// a primary/secondary configured cluster. +func (c *tsoServiceDiscovery) CheckMemberChanged() error { + return c.updateMember() +} + +// AddServingAddrSwitchedCallback adds callbacks which will be called when the primary in +// a primary/secondary configured cluster is switched. +func (c *tsoServiceDiscovery) AddServingAddrSwitchedCallback(callbacks ...func()) { + c.primarySwitchedCbs = append(c.primarySwitchedCbs, callbacks...) +} + +// AddServiceAddrsSwitchedCallback adds callbacks which will be called when any primary/secondary +// in a primary/secondary configured cluster is changed. +func (c *tsoServiceDiscovery) AddServiceAddrsSwitchedCallback(callbacks ...func()) { + c.membersChangedCbs = append(c.membersChangedCbs, callbacks...) +} + +// SetTSOLocalServAddrsUpdatedCallback adds a callback which will be called when the local tso +// allocator leader list is updated. +func (c *tsoServiceDiscovery) SetTSOLocalServAddrsUpdatedCallback(callback tsoLocalServAddrsUpdatedFunc) { + c.localAllocPrimariesUpdatedCb = callback +} + +// SetTSOGlobalServAddrUpdatedCallback adds a callback which will be called when the global tso +// allocator leader is updated. +func (c *tsoServiceDiscovery) SetTSOGlobalServAddrUpdatedCallback(callback tsoGlobalServAddrUpdatedFunc) { + c.globalAllocPrimariesUpdatedCb = callback +} + +// getPrimaryAddr returns the primary address. +func (c *tsoServiceDiscovery) getPrimaryAddr() string { + primaryAddr := c.primary.Load() + if primaryAddr == nil { + return "" + } + return primaryAddr.(string) +} + +// getSecondaryAddrs returns the secondary addresses. +func (c *tsoServiceDiscovery) getSecondaryAddrs() []string { + secondaryAddrs := c.secondaries.Load() + if secondaryAddrs == nil { + return []string{} + } + return secondaryAddrs.([]string) +} + +func (c *tsoServiceDiscovery) switchPrimary(addrs []string) error { + // FIXME: How to safely compare primary urls? For now, only allows one client url. + addr := addrs[0] + oldPrimary := c.getPrimaryAddr() + if addr == oldPrimary { + return nil + } + + if _, err := c.GetOrCreateGRPCConn(addr); err != nil { + log.Warn("[tso] failed to connect primary", zap.String("primary", addr), errs.ZapError(err)) + return err + } + // Set PD primary and Global TSO Allocator (which is also the PD primary) + c.primary.Store(addr) + // Run callbacks + if c.globalAllocPrimariesUpdatedCb != nil { + if err := c.globalAllocPrimariesUpdatedCb(addr); err != nil { + return err + } + } + for _, cb := range c.primarySwitchedCbs { + cb() + } + log.Info("[tso] switch primary", zap.String("new-primary", addr), zap.String("old-primary", oldPrimary)) + return nil +} + +func (c *tsoServiceDiscovery) updateMember() error { + resp, err := c.metacli.Get(c.ctx, []byte(c.primaryKey)) + if err != nil { + log.Error("[tso] failed to get the keyspace serving endpoint", errs.ZapError(err)) + return err + } + + if resp == nil || len(resp.Kvs) == 0 { + log.Error("[tso] didn't find the keyspace serving endpoint") + return errs.ErrClientGetLeader + } else if resp.Count > 1 { + return errs.ErrClientGetMultiResponse.FastGenByArgs(resp.Kvs) + } + + value := resp.Kvs[0].Value + member := &pdpb.Member{} + if err := proto.Unmarshal(value, member); err != nil { + return errs.ErrClientProtoUnmarshal.Wrap(err).GenWithStackByCause() + } + return c.switchPrimary(addrsToUrls([]string{member.Name})) +} diff --git a/tests/client/Makefile b/tests/client/Makefile index 744433267041..5663a2f3b458 100644 --- a/tests/client/Makefile +++ b/tests/client/Makefile @@ -27,7 +27,7 @@ tidy: git diff --quiet go.mod go.sum test: enable-codegen - CGO_ENABLED=1 go test -tags deadlock -race -cover || { $(MAKE) disable-codegen && exit 1; } + CGO_ENABLED=1 go test -v -tags deadlock -race -cover || { $(MAKE) disable-codegen && exit 1; } $(MAKE) disable-codegen basic-test: diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 38681b16ded2..6c328e6052e6 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -102,7 +102,7 @@ func TestClientLeaderChange(t *testing.T) { endpoints := runServer(re, cluster) cli := setupCli(re, ctx, endpoints) - innerCli, ok := cli.(interface{ GetBaseClient() pd.BaseClient }) + innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) var ts1, ts2 uint64 @@ -118,14 +118,14 @@ func TestClientLeaderChange(t *testing.T) { re.True(cluster.CheckTSOUnique(ts1)) leader := cluster.GetLeader() - waitLeader(re, innerCli.GetBaseClient(), cluster.GetServer(leader).GetConfig().ClientUrls) + waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader).GetConfig().ClientUrls) err = cluster.GetServer(leader).Stop() re.NoError(err) leader = cluster.WaitLeader() re.NotEmpty(leader) - waitLeader(re, innerCli.GetBaseClient(), cluster.GetServer(leader).GetConfig().ClientUrls) + waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader).GetConfig().ClientUrls) // Check TS won't fall back after leader changed. testutil.Eventually(re, func() bool { @@ -142,7 +142,7 @@ func TestClientLeaderChange(t *testing.T) { // Check URL list. cli.Close() - urls := innerCli.GetBaseClient().GetURLs() + urls := innerCli.GetServiceDiscovery().GetURLs() sort.Strings(urls) sort.Strings(endpoints) re.Equal(endpoints, urls) @@ -288,14 +288,14 @@ func TestTSOAllocatorLeader(t *testing.T) { allocatorLeaderMap[dcLocation] = pdName } cli := setupCli(re, ctx, endpoints) - innerCli, ok := cli.(interface{ GetBaseClient() pd.BaseClient }) + innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) // Check allocator leaders URL map. cli.Close() - for dcLocation, url := range getTSOAllocatorServingEndpointURLs(innerCli.GetBaseClient()) { + for dcLocation, url := range getTSOAllocatorServingEndpointURLs(cli.(pd.TSOClient)) { if dcLocation == tso.GlobalDCLocation { - urls := innerCli.GetBaseClient().GetURLs() + urls := innerCli.GetServiceDiscovery().GetURLs() sort.Strings(urls) sort.Strings(endpoints) re.Equal(endpoints, urls) @@ -512,7 +512,7 @@ func requestGlobalAndLocalTSO( wg.Wait() } -func getTSOAllocatorServingEndpointURLs(c pd.BaseClient) map[string]string { +func getTSOAllocatorServingEndpointURLs(c pd.TSOClient) map[string]string { allocatorLeaders := make(map[string]string) c.GetTSOAllocators().Range(func(dcLocation, url interface{}) bool { allocatorLeaders[dcLocation.(string)] = url.(string) @@ -666,7 +666,7 @@ func setupCli(re *require.Assertions, ctx context.Context, endpoints []string, o return cli } -func waitLeader(re *require.Assertions, cli pd.BaseClient, leader string) { +func waitLeader(re *require.Assertions, cli pd.ServiceDiscovery, leader string) { testutil.Eventually(re, func() bool { cli.ScheduleCheckMemberChanged() return cli.GetServingAddr() == leader diff --git a/tests/mcs/resource_manager/resource_manager_test.go b/tests/mcs/resource_manager/resource_manager_test.go index 99d31762f808..1ab913911963 100644 --- a/tests/mcs/resource_manager/resource_manager_test.go +++ b/tests/mcs/resource_manager/resource_manager_test.go @@ -104,12 +104,12 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { } func (suite *resourceManagerClientTestSuite) waitLeader(cli pd.Client, leaderAddr string) { - innerCli, ok := cli.(interface{ GetBaseClient() pd.BaseClient }) + innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) suite.True(ok) suite.NotNil(innerCli) testutil.Eventually(suite.Require(), func() bool { - innerCli.GetBaseClient().ScheduleCheckMemberChanged() - return innerCli.GetBaseClient().GetServingAddr() == leaderAddr + innerCli.GetServiceDiscovery().ScheduleCheckMemberChanged() + return innerCli.GetServiceDiscovery().GetServingAddr() == leaderAddr }) }