Skip to content

Commit

Permalink
keyspace, apiv2: implement the keyspace group merging API (tikv#6594)
Browse files Browse the repository at this point in the history
ref tikv#6589

Implement the keyspace group merging API.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent 094599e commit 491e99c
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 22 deletions.
21 changes: 12 additions & 9 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ const (
UserKindKey = "user_kind"
// TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config.
TSOKeyspaceGroupIDKey = "tso_keyspace_group_id"
// keyspacePatrolBatchSize is the batch size for keyspace assignment patrol.
// the limit of etcd txn op is 128, keyspacePatrolBatchSize need to be less than it.
// maxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128.
// We use 120 here to leave some space for other operations.
// See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61
keyspacePatrolBatchSize = 120
maxEtcdTxnOps = 120
)

// Config is the interface for keyspace config.
Expand Down Expand Up @@ -669,7 +669,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
zap.Duration("cost", time.Since(start)),
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
)
Expand All @@ -688,7 +688,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, keyspacePatrolBatchSize)
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps)
if err != nil {
return err
}
Expand All @@ -698,9 +701,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
// If there are less than `keyspacePatrolBatchSize` keyspaces,
// If there are less than `maxEtcdTxnOps` keyspaces,
// we have reached the end of the keyspace list.
moreToPatrol = keyspaceNum == keyspacePatrolBatchSize
moreToPatrol = keyspaceNum == maxEtcdTxnOps
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum)
Expand Down Expand Up @@ -735,7 +738,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
zap.Uint32("keyspace-id", ks.Id), zap.Error(err))
Expand All @@ -747,7 +750,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID), zap.Error(err))
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Expand All @@ -420,7 +420,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
Expand All @@ -430,7 +430,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}
Expand Down
143 changes: 138 additions & 5 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package keyspace
import (
"context"
"encoding/json"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -334,6 +335,9 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
if oldKG.IsSplitting() && overwrite {
return ErrKeyspaceGroupInSplit
}
if oldKG.IsMerging() && overwrite {
return ErrKeyspaceGroupInMerging
}
newKG := &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
Expand Down Expand Up @@ -409,6 +413,9 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}

changed := false

Expand Down Expand Up @@ -463,6 +470,9 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
if oldKG.IsSplitting() || newKG.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if oldKG.IsMerging() || newKG.IsMerging() {
return ErrKeyspaceGroupInMerging
}

var updateOld, updateNew bool
if !slice.Contains(newKG.Keyspaces, keyspaceID) {
Expand Down Expand Up @@ -510,6 +520,10 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
if splitSourceKg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
// A keyspace group can not be split when it is in merging.
if splitSourceKg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
Expand Down Expand Up @@ -612,11 +626,7 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
if err != nil {
return err
}
err = m.store.SaveKeyspaceGroup(txn, splitSourceKg)
if err != nil {
return err
}
return nil
return m.store.SaveKeyspaceGroup(txn, splitSourceKg)
}); err != nil {
return err
}
Expand Down Expand Up @@ -657,6 +667,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
exists := make(map[string]struct{})
for _, member := range kg.Members {
exists[member.Address] = struct{}{}
Expand Down Expand Up @@ -713,6 +726,9 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
Expand All @@ -737,3 +753,120 @@ func (m *GroupManager) IsExistNode(addr string) bool {
}
return false
}

// MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group.
func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uint32) error {
mergeListNum := len(mergeList)
if mergeListNum == 0 {
return nil
}
// The transaction below will:
// - Load and delete the keyspace groups in the merge list.
// - Load and update the target keyspace group.
// So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction.
if (mergeListNum+1)*2 > maxEtcdTxnOps {
return ErrExceedMaxEtcdTxnOps
}
var (
groups = make(map[uint32]*endpoint.KeyspaceGroup, mergeListNum+1)
mergeTargetKg *endpoint.KeyspaceGroup
)
m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load and check all keyspace groups first.
for _, kgID := range append(mergeList, mergeTargetID) {
kg, err := m.store.LoadKeyspaceGroup(txn, kgID)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
}
// A keyspace group can not be merged if it's in splitting.
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
// A keyspace group can not be split when it is in merging.
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
groups[kgID] = kg
}
mergeTargetKg = groups[mergeTargetID]
keyspaces := make(map[uint32]struct{})
for _, keyspace := range mergeTargetKg.Keyspaces {
keyspaces[keyspace] = struct{}{}
}
// Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group.
for _, kgID := range mergeList {
kg := groups[kgID]
for _, keyspace := range kg.Keyspaces {
keyspaces[keyspace] = struct{}{}
}
if err := m.store.DeleteKeyspaceGroup(txn, kg.ID); err != nil {
return err
}
}
mergedKeyspaces := make([]uint32, 0, len(keyspaces))
for keyspace := range keyspaces {
mergedKeyspaces = append(mergedKeyspaces, keyspace)
}
sort.Slice(mergedKeyspaces, func(i, j int) bool {
return mergedKeyspaces[i] < mergedKeyspaces[j]
})
mergeTargetKg.Keyspaces = mergedKeyspaces
// Update the merge state of the target keyspace group.
mergeTargetKg.MergeState = &endpoint.MergeState{
MergeList: mergeList,
}
return m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg)
for _, kgID := range mergeList {
kg := groups[kgID]
m.groups[endpoint.StringUserKind(kg.UserKind)].Remove(kgID)
}
return nil
}

// FinishMergeKeyspaceByID finishes the merging keyspace group by the merge target ID.
func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
var mergeTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the merge target keyspace group first.
mergeTargetKg, err = m.store.LoadKeyspaceGroup(txn, mergeTargetID)
if err != nil {
return err
}
if mergeTargetKg == nil {
return ErrKeyspaceGroupNotExists
}
// Check if it's in the merging state.
if !mergeTargetKg.IsMergeTarget() {
return ErrKeyspaceGroupNotInMerging
}
// Make sure all merging keyspace groups are deleted.
for _, kgID := range mergeTargetKg.MergeState.MergeList {
kg, err := m.store.LoadKeyspaceGroup(txn, kgID)
if err != nil {
return err
}
if kg != nil {
return ErrKeyspaceGroupNotInMerging
}
}
mergeTargetKg.MergeState = nil
return m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg)
return nil
}
77 changes: 76 additions & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() {
re.NoError(err)
re.Len(kgs, 2)
// get the default keyspace group
kg, err := suite.kgm.GetKeyspaceGroupByID(0)
kg, err := suite.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(uint32(0), kg.ID)
re.Equal(endpoint.Basic.String(), kg.UserKind)
re.False(kg.IsSplitting())
// get the keyspace group 3
kg, err = suite.kgm.GetKeyspaceGroupByID(3)
re.NoError(err)
re.Equal(uint32(3), kg.ID)
Expand Down Expand Up @@ -320,3 +321,77 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
}

func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
re := suite.Require()

keyspaceGroups := []*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
},
{
ID: uint32(3),
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{444, 555},
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
re.NoError(err)
// split the keyspace group 1 to 2
err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{333})
re.NoError(err)
// finish the split of the keyspace group 2
err = suite.kgm.FinishSplitKeyspaceByID(2)
re.NoError(err)
// check the keyspace group 1 and 2
kg1, err := suite.kgm.GetKeyspaceGroupByID(1)
re.NoError(err)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
re.False(kg1.IsMerging())
kg2, err := suite.kgm.GetKeyspaceGroupByID(2)
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{333}, kg2.Keyspaces)
re.False(kg2.IsSplitting())
re.False(kg2.IsMerging())
re.Equal(kg1.UserKind, kg2.UserKind)
re.Equal(kg1.Members, kg2.Members)
// merge the keyspace group 2 and 3 back into 1
err = suite.kgm.MergeKeyspaceGroups(1, []uint32{2, 3})
re.NoError(err)
// check the keyspace group 2 and 3
kg2, err = suite.kgm.GetKeyspaceGroupByID(2)
re.NoError(err)
re.Nil(kg2)
kg3, err := suite.kgm.GetKeyspaceGroupByID(3)
re.NoError(err)
re.Nil(kg3)
// check the keyspace group 1
kg1, err = suite.kgm.GetKeyspaceGroupByID(1)
re.NoError(err)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
re.True(kg1.IsMerging())
// finish the merging
err = suite.kgm.FinishMergeKeyspaceByID(1)
re.NoError(err)
kg1, err = suite.kgm.GetKeyspaceGroupByID(1)
re.NoError(err)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
re.False(kg1.IsMerging())

// merge a non-existing keyspace group
err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5})
re.ErrorIs(err, ErrKeyspaceGroupNotExists)
// merge with the number of keyspace groups exceeds the limit
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2))
re.ErrorIs(err, ErrExceedMaxEtcdTxnOps)
}
Loading

0 comments on commit 491e99c

Please sign in to comment.