Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: fix keyspace update in tsoSvcDiscovery #6612

Merged
merged 8 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ type serviceModeKeeper struct {
tsoSvcDiscovery ServiceDiscovery
}

func (k *serviceModeKeeper) SetKeyspaceID(keyspaceID uint32) {
k.Lock()
defer k.Unlock()
if k.serviceMode == pdpb.ServiceMode_API_SVC_MODE {
k.tsoSvcDiscovery.SetKeyspaceID(keyspaceID)
}
}

func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
Expand Down Expand Up @@ -471,9 +479,6 @@ func newClientWithKeyspaceName(
ctx context.Context, keyspaceName string, svrAddrs []string,
security SecurityOption, opts ...ClientOption,
) (Client, error) {
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName))

tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
Expand Down Expand Up @@ -510,8 +515,12 @@ func newClientWithKeyspaceName(
if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil {
return nil, err
}
// We call "c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)" after service mode already switching to API mode
// and tso service discovery already initialized, so here we need to set the tso_service_discovery's keyspace id too.
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)

c.serviceModeKeeper.SetKeyspaceID(c.keyspaceID)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), zap.Uint32("keyspace-id", c.keyspaceID))
return c, nil
}

Expand Down Expand Up @@ -593,15 +602,15 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
)
switch newMode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(
c.ctx, MetaStorageClient(c), c.pdSvcDiscovery,
c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOCli = newTSOClient(c.ctx, c.option,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
Expand Down
4 changes: 1 addition & 3 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type tsoClient struct {
wg sync.WaitGroup
option *option

keyspaceID uint32
svcDiscovery ServiceDiscovery
tsoStreamBuilderFactory
// tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL}
Expand All @@ -94,15 +93,14 @@ type tsoClient struct {

// newTSOClient returns a new TSO client.
func newTSOClient(
ctx context.Context, option *option, keyspaceID uint32,
ctx context.Context, option *option,
svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory,
) *tsoClient {
ctx, cancel := context.WithCancel(ctx)
c := &tsoClient{
ctx: ctx,
cancel: cancel,
option: option,
keyspaceID: keyspaceID,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
checkTSDeadlineCh: make(chan struct{}),
Expand Down
3 changes: 2 additions & 1 deletion pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func (r *RoundRobin[T]) Next() (t T) {
func (r *RoundRobin[T]) GetAll() []T {
r.RLock()
defer r.RUnlock()
return r.nodes
// return a copy to avoid data race
return append(r.nodes[:0:0], r.nodes...)
}

// Put puts one into balancer.
Expand Down
6 changes: 6 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,12 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
return err
}
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 {
log.Debug("the split source TSO is not greater than the newly split TSO",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical),
)
return nil
}
// If the split source TSO is greater than the newly split TSO, we need to update the split
Expand Down
104 changes: 104 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
Expand Down Expand Up @@ -465,3 +466,106 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))

// Init api server config but not start.
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
})
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start pd client and wait pd server start.
var clients sync.Map
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_b", cli)
}()
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_a") // its keyspace id is 1.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_a", cli)
}()

// Start api server and tso server.
err = tc.RunInitialServers()
re.NoError(err)
defer tc.Destroy()
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)

// Wait pd clients are ready.
testutil.Eventually(re, func() bool {
count := 0
clients.Range(func(key, value interface{}) bool {
count++
return true
})
return count == 2
})
clientA, ok := clients.Load("keyspace_a")
re.True(ok)
clientB, ok := clients.Load("keyspace_b")
re.True(ok)

// First split keyspace group 0 to 1 with keyspace 2.
kgm := leaderServer.GetServer().GetKeyspaceGroupManager()
re.NotNil(kgm)
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2})
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientB.(pd.Client).GetTS(ctx)
re.NoError(err)
kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
return !kg.IsSplitting()
})
clientB.(pd.Client).Close()

// Then split keyspace group 0 to 2 with keyspace 1.
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 2, []uint32{1})
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientA.(pd.Client).GetTS(ctx)
re.NoError(err)
kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
return !kg.IsSplitting()
})
clientA.(pd.Client).Close()

// Check the keyspace group 0 is split to 1 and 2.
kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 1)
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 2)
re.Equal([]uint32{0}, kg0.Keyspaces)
re.Equal([]uint32{2}, kg1.Keyspaces)
re.Equal([]uint32{1}, kg2.Keyspaces)
re.False(kg0.IsSplitting())
re.False(kg1.IsSplitting())
re.False(kg2.IsSplitting())

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}