From 832b165fa1ecfd1ef6a63d54c97600dacfe34946 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 25 May 2023 20:41:39 +0800 Subject: [PATCH] mcs: fix the members field is null (#6518) close tikv/pd#6519 Signed-off-by: Ryan Leung --- pkg/keyspace/keyspace.go | 9 +++-- pkg/keyspace/tso_keyspace_group.go | 17 +++++++--- .../mcs/tso/keyspace_group_manager_test.go | 34 ++++++++++++++++++- tests/server/apiv2/handlers/keyspace_test.go | 2 +- tests/server/apiv2/handlers/testutil.go | 3 +- 5 files changed, 56 insertions(+), 9 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index adfded5862a..49c6ff8cd7e 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -390,7 +390,9 @@ func (manager *Manager) LoadKeyspaceByID(spaceID uint32) (*keyspacepb.KeyspaceMe } return nil }) - meta.Id = spaceID + if meta != nil { + meta.Id = spaceID + } return meta, err } @@ -671,8 +673,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { ) }() for moreToPatrol { + var defaultKeyspaceGroup *endpoint.KeyspaceGroup err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { - defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID) + var err error + defaultKeyspaceGroup, err = manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID) if err != nil { return err } @@ -752,6 +756,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { if err != nil { return err } + manager.kgm.groups[endpoint.StringUserKind(defaultKeyspaceGroup.UserKind)].Put(defaultKeyspaceGroup) // If all keyspaces in the current batch are assigned, update the next start ID. manager.nextPatrolStartID = nextStartID } diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 46810be92d5..4012f13a909 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/balancer" "github.com/tikv/pd/pkg/mcs/discovery" @@ -149,6 +150,10 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { defer logutil.LogPanic() defer m.wg.Done() ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval) + failpoint.Inject("acceleratedAllocNodes", func() { + ticker.Stop() + ticker = time.NewTicker(time.Millisecond * 100) + }) defer ticker.Stop() for { select { @@ -162,7 +167,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { } groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0) if err != nil { - log.Error("failed to load the all keyspace group", zap.Error(err)) + log.Error("failed to load all keyspace groups", zap.Error(err)) continue } withError := false @@ -171,7 +176,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount) if err != nil { withError = true - log.Error("failed to alloc nodes for keyspace group", zap.Error(err)) + log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err)) continue } group.Members = nodes @@ -626,9 +631,12 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount defer cancel() ticker := time.NewTicker(allocNodesInterval) defer ticker.Stop() + + var kg *endpoint.KeyspaceGroup nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount) err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { - kg, err := m.store.LoadKeyspaceGroup(txn, id) + var err error + kg, err = m.store.LoadKeyspaceGroup(txn, id) if err != nil { return err } @@ -672,7 +680,8 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount if err != nil { return nil, err } - log.Info("alloc nodes for keyspace group", zap.Uint32("id", id), zap.Reflect("nodes", nodes)) + m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg) + log.Info("alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", id), zap.Reflect("nodes", nodes)) return nodes, nil } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 4b650bf1e25..aca79b7a5aa 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -23,16 +23,17 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" - "github.com/tikv/pd/client/testutil" "github.com/tikv/pd/pkg/election" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" tsopkg "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" @@ -425,3 +426,34 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() cancel() wg.Wait() } + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) + re.Equal(uint32(0), kg.ID) + re.Equal([]uint32{0}, kg.Keyspaces) + re.False(kg.IsSplitting()) + // wait for finishing alloc nodes + testutil.Eventually(re, func() bool { + kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) + return len(kg.Members) == 2 + }) + testConfig := map[string]string{ + "config": "1", + "tso_keyspace_group_id": "0", + "user_kind": "basic", + } + handlersutil.MustCreateKeyspace(re, suite.pdLeaderServer, &handlers.CreateKeyspaceParams{ + Name: "test_keyspace", + Config: testConfig, + }) + kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) + testutil.Eventually(re, func() bool { + kg = handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 0) + return len(kg.Members) == 2 + }) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) +} diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index ad456bd5be6..7fd8de013f7 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -152,7 +152,7 @@ func mustMakeTestKeyspaces(re *require.Assertions, server *tests.TestServer, cou Name: fmt.Sprintf("test_keyspace_%d", i), Config: testConfig, } - resultMeta[i] = mustCreateKeyspace(re, server, createRequest) + resultMeta[i] = MustCreateKeyspace(re, server, createRequest) } return resultMeta } diff --git a/tests/server/apiv2/handlers/testutil.go b/tests/server/apiv2/handlers/testutil.go index 3ab41bfb0cc..b638f1bbba4 100644 --- a/tests/server/apiv2/handlers/testutil.go +++ b/tests/server/apiv2/handlers/testutil.go @@ -79,7 +79,8 @@ func sendUpdateStateRequest(re *require.Assertions, server *tests.TestServer, na return true, meta.KeyspaceMeta } -func mustCreateKeyspace(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceParams) *keyspacepb.KeyspaceMeta { +// MustCreateKeyspace creates a keyspace with HTTP API. +func MustCreateKeyspace(re *require.Assertions, server *tests.TestServer, request *handlers.CreateKeyspaceParams) *keyspacepb.KeyspaceMeta { data, err := json.Marshal(request) re.NoError(err) httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspacesPrefix, bytes.NewBuffer(data))