Skip to content

Commit

Permalink
Merge the upstream
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Apr 11, 2023
2 parents 03dcbcb + 4493c08 commit dac4b7b
Show file tree
Hide file tree
Showing 8 changed files with 519 additions and 52 deletions.
37 changes: 34 additions & 3 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,17 @@ func (manager *Manager) Bootstrap() error {
if err != nil && err != ErrKeyspaceExists {
return err
}

if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, id, defaultKeyspace.GetId(), opAdd); err != nil {
return err
}
// Initialize pre-alloc keyspace.
preAlloc := manager.config.GetPreAlloc()
for _, keyspaceName := range preAlloc {
id, err := manager.kgm.GetAvailableKeyspaceGroupIDByKind(endpoint.Basic)
if err != nil {
return err
}
_, err = manager.CreateKeyspace(&CreateKeyspaceRequest{
keyspace, err := manager.CreateKeyspace(&CreateKeyspaceRequest{
Name: keyspaceName,
Now: now,
Config: map[string]string{
Expand All @@ -151,6 +153,9 @@ func (manager *Manager) Bootstrap() error {
if err != nil && err != ErrKeyspaceExists {
return err
}
if err := manager.kgm.UpdateKeyspaceForGroup(endpoint.Basic, id, keyspace.GetId(), opAdd); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -199,6 +204,9 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
)
return nil, err
}
if err := manager.kgm.UpdateKeyspaceForGroup(userKind, id, keyspace.GetId(), opAdd); err != nil {
return nil, err
}
log.Info("[keyspace] keyspace created",
zap.Uint32("ID", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
Expand Down Expand Up @@ -333,6 +341,7 @@ const (
// It returns error if saving failed, operation not allowed, or if keyspace not exists.
func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation) (*keyspacepb.KeyspaceMeta, error) {
var meta *keyspacepb.KeyspaceMeta
oldConfig := make(map[string]string)
err := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
// First get KeyspaceID from Name.
loaded, id, err := manager.store.LoadKeyspaceID(txn, name)
Expand Down Expand Up @@ -360,6 +369,9 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation)
if meta.GetConfig() == nil {
meta.Config = map[string]string{}
}
for k, v := range meta.GetConfig() {
oldConfig[k] = v
}
// Update keyspace config according to mutations.
for _, mutation := range mutations {
switch mutation.Op {
Expand All @@ -371,8 +383,27 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation)
return errIllegalOperation
}
}
newConfig := meta.GetConfig()
oldUserKind := endpoint.StringUserKind(oldConfig[UserKindKey])
newUserKind := endpoint.StringUserKind(newConfig[UserKindKey])
oldID := oldConfig[TSOKeyspaceGroupIDKey]
newID := newConfig[TSOKeyspaceGroupIDKey]
needUpdate := oldUserKind != newUserKind || oldID != newID
if needUpdate {
if err := manager.kgm.UpdateKeyspaceGroup(oldID, newID, oldUserKind, newUserKind, meta.GetId()); err != nil {
return err
}
}
// Save the updated keyspace meta.
return manager.store.SaveKeyspaceMeta(txn, meta)
if err := manager.store.SaveKeyspaceMeta(txn, meta); err != nil {
if needUpdate {
if err := manager.kgm.UpdateKeyspaceGroup(newID, oldID, newUserKind, oldUserKind, meta.GetId()); err != nil {
log.Error("failed to revert keyspace group", zap.Error(err))
}
}
return err
}
return nil
})

if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store)
suite.manager = NewKeyspaceManager(store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(suite.manager.Bootstrap())
}

Expand Down
199 changes: 180 additions & 19 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,98 @@ package keyspace

import (
"context"
"strconv"
"sync"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
)

const (
opAdd int = iota
opDelete
)

// GroupManager is the manager of keyspace group related data.
type GroupManager struct {
ctx context.Context
// the lock for the groups
sync.RWMutex
// groups is the cache of keyspace group related information.
// user kind -> keyspace group
groups map[endpoint.UserKind]*indexedHeap
// store is the storage for keyspace group related information.
store endpoint.KeyspaceGroupStorage
}

// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupStorage) *GroupManager {
groups := make(map[endpoint.UserKind]*indexedHeap)
for i := 0; i < int(endpoint.UserKindCount); i++ {
groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
}
return &GroupManager{
ctx: ctx,
store: store,
ctx: ctx,
store: store,
groups: groups,
}
}

// Bootstrap saves default keyspace group info.
// Bootstrap saves default keyspace group info and init group mapping in the memory.
func (m *GroupManager) Bootstrap() error {
defaultKeyspaceGroup := &endpoint.KeyspaceGroup{
ID: utils.DefaultKeySpaceGroupID,
UserKind: endpoint.Basic.String(),
}
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup})
// It's possible that default keyspace group already exists in the storage (e.g. PD restart/recover),
// so we ignore the ErrKeyspaceGroupExists.

m.Lock()
defer m.Unlock()
// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
return err
}

userKind := endpoint.StringUserKind(defaultKeyspaceGroup.UserKind)
// If the group for the userKind does not exist, create a new one.
if _, ok := m.groups[userKind]; !ok {
m.groups[userKind] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
}
m.groups[userKind].Put(defaultKeyspaceGroup)

// Load all the keyspace groups from the storage and add to the respective userKind groups.
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeySpaceGroupID, 0)
if err != nil {
return err
}
for _, group := range groups {
userKind := endpoint.StringUserKind(group.UserKind)
if _, ok := m.groups[userKind]; !ok {
m.groups[userKind] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
}
m.groups[userKind].Put(group)
}

return nil
}

// CreateKeyspaceGroups creates keyspace groups.
func (m *GroupManager) CreateKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup) error {
return m.saveKeyspaceGroups(keyspaceGroups)
m.Lock()
defer m.Unlock()
if err := m.saveKeyspaceGroups(keyspaceGroups, false); err != nil {
return err
}

for _, keyspaceGroup := range keyspaceGroups {
userKind := endpoint.StringUserKind(keyspaceGroup.UserKind)
m.groups[userKind].Put(keyspaceGroup)
}

return nil
}

// GetKeyspaceGroups gets keyspace groups from the start ID with limit.
Expand All @@ -71,35 +123,59 @@ func (m *GroupManager) GetKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGroup,
err error
)

m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
return nil
})
}); err != nil {
return nil, err
}
return kg, nil
}

// DeleteKeyspaceGroupByID deletes the keyspace group by ID.
func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) error {
m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGroup, error) {
var (
kg *endpoint.KeyspaceGroup
err error
)

m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if kg == nil {
return nil
}
return m.store.DeleteKeyspaceGroup(txn, id)
})
return nil
}); err != nil {
return nil, err
}

userKind := endpoint.StringUserKind(kg.UserKind)
// TODO: move out the keyspace to another group
// we don't need the keyspace group as the return value
m.groups[userKind].Remove(id)

return kg, nil
}

// saveKeyspaceGroups will try to save the given keyspace groups into the storage.
// If any keyspace group already exists, it will return ErrKeyspaceGroupExists.
func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup) error {
// If any keyspace group already exists and the overwrite is false, it will return ErrKeyspaceGroupExists.
func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup, overwrite bool) error {
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
for _, keyspaceGroup := range keyspaceGroups {
// Check if keyspace group has already existed.
kg, err := m.store.LoadKeyspaceGroup(txn, keyspaceGroup.ID)
oldKG, err := m.store.LoadKeyspaceGroup(txn, keyspaceGroup.ID)
if err != nil {
return err
}
if kg != nil {
if oldKG != nil && !overwrite {
return ErrKeyspaceGroupExists
}
m.store.SaveKeyspaceGroup(txn, &endpoint.KeyspaceGroup{
Expand Down Expand Up @@ -141,8 +217,93 @@ func (m *GroupManager) updateKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceG

// GetAvailableKeyspaceGroupIDByKind returns the available keyspace group ID by user kind.
func (m *GroupManager) GetAvailableKeyspaceGroupIDByKind(userKind endpoint.UserKind) (string, error) {
// TODO: implement it
return "0", nil
m.RLock()
defer m.RUnlock()
groups, ok := m.groups[userKind]
if !ok {
return "", errors.Errorf("user kind %s not found", userKind)
}
kg := groups.Top()
return strconv.FormatUint(uint64(kg.ID), 10), nil
}

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
id, err := strconv.ParseUint(groupID, 10, 64)
if err != nil {
return err
}

m.Lock()
defer m.Unlock()
kg := m.groups[userKind].Get(uint32(id))
if kg == nil {
return errors.Errorf("keyspace group %d not found", id)
}
switch mutation {
case opAdd:
if !slice.Contains(kg.Keyspaces, keyspaceID) {
kg.Keyspaces = append(kg.Keyspaces, keyspaceID)
}
case opDelete:
if slice.Contains(kg.Keyspaces, keyspaceID) {
kg.Keyspaces = slice.Remove(kg.Keyspaces, keyspaceID)
}
}
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
return err
}

m.groups[userKind].Put(kg)
return nil
}

// UpdateKeyspaceGroup updates the keyspace group.
func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUserKind, newUserKind endpoint.UserKind, keyspaceID uint32) error {
oldID, err := strconv.ParseUint(oldGroupID, 10, 64)
if err != nil {
return err
}
newID, err := strconv.ParseUint(newGroupID, 10, 64)
if err != nil {
return err
}

m.Lock()
defer m.Unlock()
oldKG := m.groups[oldUserKind].Get(uint32(oldID))
if oldKG == nil {
return errors.Errorf("keyspace group %s not found in %s group", oldGroupID, oldUserKind)
}
newKG := m.groups[newUserKind].Get(uint32(newID))
if newKG == nil {
return errors.Errorf("keyspace group %s not found in %s group", newGroupID, newUserKind)
}

var updateOld, updateNew bool
if !slice.Contains(newKG.Keyspaces, keyspaceID) {
newKG.Keyspaces = append(newKG.Keyspaces, keyspaceID)
updateNew = true
}

if slice.Contains(oldKG.Keyspaces, keyspaceID) {
oldKG.Keyspaces = slice.Remove(oldKG.Keyspaces, keyspaceID)
updateOld = true
}

if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{oldKG, newKG}, true); err != nil {
return err
}

if updateOld {
m.groups[oldUserKind].Put(oldKG)
}

if updateNew {
m.groups[newUserKind].Put(newKG)
}

return nil
}

// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
Expand Down
Loading

0 comments on commit dac4b7b

Please sign in to comment.