diff --git a/client/client.go b/client/client.go index c2f673c5da0..0597f7344ee 100644 --- a/client/client.go +++ b/client/client.go @@ -277,6 +277,14 @@ type serviceModeKeeper struct { tsoSvcDiscovery ServiceDiscovery } +func (k *serviceModeKeeper) SetKeyspaceID(keyspaceID uint32) { + k.Lock() + defer k.Unlock() + if k.serviceMode == pdpb.ServiceMode_API_SVC_MODE { + k.tsoSvcDiscovery.SetKeyspaceID(keyspaceID) + } +} + func (k *serviceModeKeeper) close() { k.Lock() defer k.Unlock() @@ -471,9 +479,6 @@ func newClientWithKeyspaceName( ctx context.Context, keyspaceName string, svrAddrs []string, security SecurityOption, opts ...ClientOption, ) (Client, error) { - log.Info("[pd] create pd client with endpoints and keyspace", - zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName)) - tlsCfg := &tlsutil.TLSConfig{ CAPath: security.CAPath, CertPath: security.CertPath, @@ -510,8 +515,12 @@ func newClientWithKeyspaceName( if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil { return nil, err } + // We call "c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)" after service mode already switching to API mode + // and tso service discovery already initialized, so here we need to set the tso_service_discovery's keyspace id too. c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID) - + c.serviceModeKeeper.SetKeyspaceID(c.keyspaceID) + log.Info("[pd] create pd client with endpoints and keyspace", + zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), zap.Uint32("keyspace-id", c.keyspaceID)) return c, nil } @@ -593,7 +602,7 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { ) switch newMode { case pdpb.ServiceMode_PD_SVC_MODE: - newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID, + newTSOCli = newTSOClient(c.ctx, c.option, c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) case pdpb.ServiceMode_API_SVC_MODE: newTSOSvcDiscovery = newTSOServiceDiscovery( @@ -601,7 +610,7 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option) // At this point, the keyspace group isn't known yet. Starts from the default keyspace group, // and will be updated later. - newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID, + newTSOCli = newTSOClient(c.ctx, c.option, newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{}) if err := newTSOSvcDiscovery.Init(); err != nil { log.Error("[pd] failed to initialize tso service discovery. keep the current service mode", diff --git a/client/tso_client.go b/client/tso_client.go index c326e3e7160..d4dfaa03a91 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -70,7 +70,6 @@ type tsoClient struct { wg sync.WaitGroup option *option - keyspaceID uint32 svcDiscovery ServiceDiscovery tsoStreamBuilderFactory // tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL} @@ -94,7 +93,7 @@ type tsoClient struct { // newTSOClient returns a new TSO client. func newTSOClient( - ctx context.Context, option *option, keyspaceID uint32, + ctx context.Context, option *option, svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory, ) *tsoClient { ctx, cancel := context.WithCancel(ctx) @@ -102,7 +101,6 @@ func newTSOClient( ctx: ctx, cancel: cancel, option: option, - keyspaceID: keyspaceID, svcDiscovery: svcDiscovery, tsoStreamBuilderFactory: factory, checkTSDeadlineCh: make(chan struct{}), diff --git a/pkg/balancer/round_robin.go b/pkg/balancer/round_robin.go index cef35c43a5f..5013a447d3e 100644 --- a/pkg/balancer/round_robin.go +++ b/pkg/balancer/round_robin.go @@ -51,7 +51,8 @@ func (r *RoundRobin[T]) Next() (t T) { func (r *RoundRobin[T]) GetAll() []T { r.RLock() defer r.RUnlock() - return r.nodes + // return a copy to avoid data race + return append(r.nodes[:0:0], r.nodes...) } // Put puts one into balancer. diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 3159518387f..d962cfd0306 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -829,6 +829,12 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( return err } if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 { + log.Debug("the split source TSO is not greater than the newly split TSO", + zap.Int64("split-source-tso-physical", splitSourceTSO.Physical), + zap.Int64("split-source-tso-logical", splitSourceTSO.Logical), + zap.Int64("split-tso-physical", splitTSO.Physical), + zap.Int64("split-tso-logical", splitTSO.Logical), + ) return nil } // If the split source TSO is greater than the newly split TSO, we need to update the split diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 8849f7a4ab5..ed3bfe35280 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" @@ -465,3 +466,106 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } + +func TestTwiceSplitKeyspaceGroup(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + + // Init api server config but not start. + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = []string{ + "keyspace_a", "keyspace_b", + } + }) + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + // Start pd client and wait pd server start. + var clients sync.Map + go func() { + apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2. + cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{}) + re.NoError(err) + clients.Store("keyspace_b", cli) + }() + go func() { + apiCtx := pd.NewAPIContextV2("keyspace_a") // its keyspace id is 1. + cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{}) + re.NoError(err) + clients.Store("keyspace_a", cli) + }() + + // Start api server and tso server. + err = tc.RunInitialServers() + re.NoError(err) + defer tc.Destroy() + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr) + re.NoError(err) + defer tsoCluster.Destroy() + tsoCluster.WaitForDefaultPrimaryServing(re) + + // Wait pd clients are ready. + testutil.Eventually(re, func() bool { + count := 0 + clients.Range(func(key, value interface{}) bool { + count++ + return true + }) + return count == 2 + }) + clientA, ok := clients.Load("keyspace_a") + re.True(ok) + clientB, ok := clients.Load("keyspace_b") + re.True(ok) + + // First split keyspace group 0 to 1 with keyspace 2. + kgm := leaderServer.GetServer().GetKeyspaceGroupManager() + re.NotNil(kgm) + testutil.Eventually(re, func() bool { + err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2}) + return err == nil + }) + + // Trigger checkTSOSplit to ensure the split is finished. + testutil.Eventually(re, func() bool { + _, _, err = clientB.(pd.Client).GetTS(ctx) + re.NoError(err) + kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) + return !kg.IsSplitting() + }) + clientB.(pd.Client).Close() + + // Then split keyspace group 0 to 2 with keyspace 1. + testutil.Eventually(re, func() bool { + err = kgm.SplitKeyspaceGroupByID(0, 2, []uint32{1}) + return err == nil + }) + + // Trigger checkTSOSplit to ensure the split is finished. + testutil.Eventually(re, func() bool { + _, _, err = clientA.(pd.Client).GetTS(ctx) + re.NoError(err) + kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) + return !kg.IsSplitting() + }) + clientA.(pd.Client).Close() + + // Check the keyspace group 0 is split to 1 and 2. + kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 1) + kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 2) + re.Equal([]uint32{0}, kg0.Keyspaces) + re.Equal([]uint32{2}, kg1.Keyspaces) + re.Equal([]uint32{1}, kg2.Keyspaces) + re.False(kg0.IsSplitting()) + re.False(kg1.IsSplitting()) + re.False(kg2.IsSplitting()) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) +}