Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: add more tests for multiple keyspace groups #6395

Merged
merged 7 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb
return member, nil
}

// ResignPrimary resigns the primary of the given keyspace and keyspace group.
func (s *Server) ResignPrimary() error {
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
// ResignPrimary resigns the primary of the given keyspace.
func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {
member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID(
return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil
}

// GetElectionMember returns the election member of the given keyspace group
// GetElectionMember returns the election member of the keyspace group serving the given keyspace.
func (kgm *KeyspaceGroupManager) GetElectionMember(
keyspaceID, keyspaceGroupID uint32,
) (ElectionMember, error) {
Expand Down
13 changes: 9 additions & 4 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mcs

import (
"context"
"fmt"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -92,12 +93,16 @@ func (tc *TestTSOCluster) DestroyServer(addr string) {
}

// ResignPrimary resigns the primary TSO server.
func (tc *TestTSOCluster) ResignPrimary() {
tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary()
func (tc *TestTSOCluster) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {
primaryServer := tc.GetPrimaryServer(keyspaceID, keyspaceGroupID)
if primaryServer == nil {
return fmt.Errorf("no tso server serves this keyspace %d", keyspaceID)
}
return primaryServer.ResignPrimary(keyspaceID, keyspaceGroupID)
}

// GetPrimary returns the primary TSO server.
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
// GetPrimaryServer returns the primary TSO server of the given keyspace
func (tc *TestTSOCluster) GetPrimaryServer(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
Expand Down
87 changes: 85 additions & 2 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
)

var once sync.Once
Expand All @@ -48,13 +49,25 @@ func InitLogger(cfg *tso.Config) (err error) {
return err
}

// SetupClientWithKeyspace creates a TSO client for test.
func SetupClientWithKeyspace(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client {
// SetupClientWithDefaultKeyspaceName creates a TSO client with default keyspace name for test.
func SetupClientWithDefaultKeyspaceName(
ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption,
) pd.Client {
cli, err := pd.NewClientWithKeyspaceName(ctx, "", endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// SetupClientWithKeyspaceID creates a TSO client with the given keyspace id for test.
func SetupClientWithKeyspaceID(
ctx context.Context, re *require.Assertions,
keyspaceID uint32, endpoints []string, opts ...pd.ClientOption,
) pd.Client {
cli, err := pd.NewClientWithKeyspace(ctx, keyspaceID, endpoints, pd.SecurityOption{}, opts...)
re.NoError(err)
return cli
}

// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing.
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) {
cfg := rm.NewConfig()
Expand Down Expand Up @@ -137,3 +150,73 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error {
}
return errors.WithStack(err)
}

// CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces.
func CheckMultiKeyspacesTSO(
ctx context.Context, re *require.Assertions,
clients []pd.Client, parallelAct func(),
) {
ctx, cancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
wg.Add(len(clients))

for _, client := range clients {
go func(cli pd.Client) {
defer wg.Done()
var ts, lastTS uint64
for {
select {
case <-ctx.Done():
// Make sure the lastTS is not empty
re.NotEmpty(lastTS)
return
default:
}
physical, logical, err := cli.GetTS(ctx)
// omit the error check since there are many kinds of errors
if err != nil {
continue
}
ts = tsoutil.ComposeTS(physical, logical)
re.Less(lastTS, ts)
lastTS = ts
}
}(client)
}

wg.Add(1)
go func() {
defer wg.Done()
parallelAct()
cancel()
}()

wg.Wait()
}

// WaitForMultiKeyspacesTSOAvailable waits for the given keyspaces being served by the tso server side
func WaitForMultiKeyspacesTSOAvailable(
ctx context.Context, re *require.Assertions,
keyspaceIDs []uint32, backendEndpoints []string,
) []pd.Client {
wg := sync.WaitGroup{}
wg.Add(len(keyspaceIDs))

clients := make([]pd.Client, 0, len(keyspaceIDs))
for _, keyspaceID := range keyspaceIDs {
cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints)
re.NotNil(cli)
clients = append(clients, cli)

go func() {
defer wg.Done()
testutil.Eventually(re, func() bool {
_, _, err := cli.GetTS(ctx)
return err == nil
})
}()
}

wg.Wait()
return clients
}
79 changes: 77 additions & 2 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() {
}

func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) {
for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") {
keyspaceGroups := handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0")
for _, group := range keyspaceGroups {
// Do not delete default keyspace group.
if group.ID == mcsutils.DefaultKeyspaceGroupID {
continue
Expand Down Expand Up @@ -130,6 +131,80 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp
}
}
}

keyspaceIDs := []uint32{0, 1, 2, 3, 1000}
clients := mcs.WaitForMultiKeyspacesTSOAvailable(
suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()})
re.Equal(len(keyspaceIDs), len(clients))
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() {
time.Sleep(3 * time.Second)
})
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroups() {
// Create multiple keyspace groups, and every keyspace should be served by one of them
// on a tso server.
re := suite.Require()

params := []struct {
keyspaceGroupID uint32
keyspaceIDs []uint32
}{
{0, []uint32{0, 10}},
{1, []uint32{1, 11}},
{2, []uint32{2, 12}},
}

for _, param := range params {
if param.keyspaceGroupID == 0 {
// we have already created default keyspace group, so we can skip it.
// keyspace 10 isn't assigned to any keyspace group, so they will be
// served by default keyspace group.
continue
}
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: param.keyspaceGroupID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: param.keyspaceIDs,
},
},
})
}

testutil.Eventually(re, func() bool {
for _, param := range params {
for _, keyspaceID := range param.keyspaceIDs {
served := false
for _, server := range suite.tsoCluster.GetServers() {
if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) {
tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID)
re.NoError(err)
re.NotNil(tam)
served = true
}
}
if !served {
return false
}
}
}
return true
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

keyspaceIDs := make([]uint32, 0)
for _, param := range params {
keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...)
}

clients := mcs.WaitForMultiKeyspacesTSOAvailable(
suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()})
re.Equal(len(keyspaceIDs), len(clients))
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() {
time.Sleep(3 * time.Second)
})
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
Expand Down Expand Up @@ -160,7 +235,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
re.NoError(err)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
Expand Down
23 changes: 12 additions & 11 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) {
_, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc())
defer cleanup()

cli := mcs.SetupClientWithKeyspace(ctx, re, []string{backendEndpoints})
cli := mcs.SetupClientWithDefaultKeyspaceName(ctx, re, []string{backendEndpoints})
physical, logical, err := cli.GetTS(ctx)
re.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -349,13 +349,14 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() {

type CommonTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
pdLeader *tests.TestServer
tsoPrimary *tso.Server
backendEndpoints string
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
pdLeader *tests.TestServer
// tsoDefaultPrimaryServer is the primary server of the default keyspace group
tsoDefaultPrimaryServer *tso.Server
backendEndpoints string
}

func TestCommonTestSuite(t *testing.T) {
Expand All @@ -380,7 +381,7 @@ func (suite *CommonTestSuite) SetupSuite() {
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
suite.NoError(err)
suite.tsoCluster.WaitForDefaultPrimaryServing(re)
suite.tsoPrimary = suite.tsoCluster.GetPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
suite.tsoDefaultPrimaryServer = suite.tsoCluster.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
}

func (suite *CommonTestSuite) TearDownSuite() {
Expand All @@ -401,14 +402,14 @@ func (suite *CommonTestSuite) TearDownSuite() {
func (suite *CommonTestSuite) TestAdvertiseAddr() {
re := suite.Require()

conf := suite.tsoPrimary.GetConfig()
conf := suite.tsoDefaultPrimaryServer.GetConfig()
re.Equal(conf.GetListenAddr(), conf.GetAdvertiseListenAddr())
}

func (suite *CommonTestSuite) TestMetrics() {
re := suite.Require()

resp, err := http.Get(suite.tsoPrimary.GetConfig().GetAdvertiseListenAddr() + "/metrics")
resp, err := http.Get(suite.tsoDefaultPrimaryServer.GetConfig().GetAdvertiseListenAddr() + "/metrics")
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
Expand Down
Loading