Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply multi-keyspace-group membership to tso service and handle inconsistency issue #6282

Merged
merged 11 commits into from
Apr 11, 2023
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,11 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceNotAssigned"]
error = '''
the keyspace %d isn't assigned to any keyspace group
'''

["PD:tso:ErrLogicOverflow"]
error = '''
logic part overflow
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
ErrLoadKeyspaceGroupsRetryExhaustd = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("ErrLoadKeyspaceGroupsRetryExhaustd"))
ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned"))
)

// member errors
Expand Down
33 changes: 18 additions & 15 deletions pkg/mcs/tso/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pkg/errors"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
Expand Down Expand Up @@ -131,16 +130,19 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
return status.Errorf(codes.Unknown, "server not started")
}
if request.GetHeader().GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
return status.Errorf(
codes.FailedPrecondition, "mismatch cluster id, need %d but got %d",
s.clusterID, request.GetHeader().GetClusterId())
}
count := request.GetCount()
ts, err := s.keyspaceGroupManager.HandleTSORequest(utils.DefaultKeySpaceGroupID, request.GetDcLocation(), count)
ts, keyspaceGroupBelongTo, err := s.keyspaceGroupManager.HandleTSORequest(
request.Header.KeyspaceId, request.Header.KeyspaceGroupId, request.GetDcLocation(), count)
if err != nil {
return status.Errorf(codes.Unknown, err.Error())
}
tsoHandleDuration.Observe(time.Since(start).Seconds())
response := &tsopb.TsoResponse{
Header: s.header(),
Header: s.header(keyspaceGroupBelongTo),
Timestamp: &ts,
Count: count,
}
Expand All @@ -150,23 +152,24 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error {
}
}

func (s *Service) header() *tsopb.ResponseHeader {
func (s *Service) header(keyspaceGroupBelongTo uint32) *tsopb.ResponseHeader {
if s.clusterID == 0 {
return s.wrapErrorToHeader(tsopb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready")
return s.wrapErrorToHeader(
tsopb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready", keyspaceGroupBelongTo)
}
return &tsopb.ResponseHeader{ClusterId: s.clusterID}
return &tsopb.ResponseHeader{ClusterId: s.clusterID, KeyspaceGroupId: keyspaceGroupBelongTo}
}

func (s *Service) wrapErrorToHeader(errorType tsopb.ErrorType, message string) *tsopb.ResponseHeader {
return s.errorHeader(&tsopb.Error{
Type: errorType,
Message: message,
})
func (s *Service) wrapErrorToHeader(
errorType tsopb.ErrorType, message string, keyspaceGroupBelongTo uint32,
) *tsopb.ResponseHeader {
return s.errorHeader(&tsopb.Error{Type: errorType, Message: message}, keyspaceGroupBelongTo)
}

func (s *Service) errorHeader(err *tsopb.Error) *tsopb.ResponseHeader {
func (s *Service) errorHeader(err *tsopb.Error, keyspaceGroupBelongTo uint32) *tsopb.ResponseHeader {
return &tsopb.ResponseHeader{
ClusterId: s.clusterID,
Error: err,
ClusterId: s.clusterID,
Error: err,
KeyspaceGroupId: keyspaceGroupBelongTo,
}
}
6 changes: 4 additions & 2 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ func (s *Server) IsServing() bool {
return false
}

member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID)
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return false
Expand All @@ -219,7 +220,8 @@ func (s *Server) IsServing() bool {
// GetLeaderListenUrls gets service endpoints from the leader in election group.
// The entry at the index 0 is the primary's service endpoint.
func (s *Server) GetLeaderListenUrls() []string {
member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID)
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type KeyspaceGroup struct {
Members []KeyspaceGroupMember `json:"members"`
// Keyspaces are the keyspace IDs which belong to the keyspace group.
Keyspaces []uint32 `json:"keyspaces"`
// KeyspaceLookupTable is for fast lookup if a given keyspace belongs to this keyspace group.
// It's not persisted and will be built when loading from storage.
KeyspaceLookupTable map[uint32]struct{} `json:"-"`
}

// KeyspaceGroupStorage is the interface for keyspace group storage.
Expand Down
169 changes: 144 additions & 25 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"errors"
"fmt"
"path"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

perrors "github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -57,13 +59,16 @@ const (
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
type KeyspaceGroupManager struct {
// ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned
// with an allocator manager managing its global/local tso allocators.
// ams stores the allocator managers of the keyspace groups. Each keyspace group is
// assigned with an allocator manager managing its global/local tso allocators.
// Use a fixed size array to maximize the efficiency of concurrent access to
// different keyspace groups for tso service.
ams [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[AllocatorManager]
// ksgs stores the keyspace groups' membership/distribution meta.
ksgs [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[endpoint.KeyspaceGroup]
// kgs stores the keyspace groups' membership/distribution meta.
kgs [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[endpoint.KeyspaceGroup]
// keyspaceLookupTable is a map from keyspace (id) to its keyspace group (id).
// stored as map[uint32]uint32
keyspaceLookupTable sync.Map

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -446,7 +451,11 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
if kgm.ams[group.ID].Load() != nil {
log.Info("keyspace group already initialized, so update meta only",
zap.Uint32("keyspace-group-id", group.ID))
kgm.ksgs[group.ID].Store(group)

oldGroup := kgm.kgs[group.ID].Load()
group.KeyspaceLookupTable = kgm.updateKeyspaceGroupMembership(
group.ID, oldGroup.Keyspaces, group.Keyspaces, oldGroup.KeyspaceLookupTable)
kgm.kgs[group.ID].Store(group)
return
}

Expand Down Expand Up @@ -475,59 +484,169 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
storage = kgm.tsoSvcStorage
}

group.KeyspaceLookupTable = kgm.buildKeyspaceLookupTable(group.ID, group.Keyspaces)
kgm.kgs[group.ID].Store(group)
kgm.ams[group.ID].Store(NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true))
kgm.ksgs[group.ID].Store(group)
} else {
// Not assigned to me. If this host/pod owns this keyspace group, it should resign.
kgm.deleteKeyspaceGroup(group.ID)
}
}

// updateKeyspaceGroupMembership updates the keyspace lookup table for the given keyspace group.
func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
groupID uint32, oldKeyspaces, newKeyspaces []uint32,
oldKeyspaceLookupTable map[uint32]struct{},
) map[uint32]struct{} {
oldLen := len(oldKeyspaces)
newLen := len(newKeyspaces)

// Sort the keyspaces in ascending order
sort.Slice(newKeyspaces, func(i, j int) bool {
return newKeyspaces[i] < newKeyspaces[j]
})

// Mostly, the membership has no change, so we optimize for this case.
sameMembership := true
if oldLen != newLen {
rleungx marked this conversation as resolved.
Show resolved Hide resolved
sameMembership = false
} else {
for i := 0; i < oldLen; i++ {
if oldKeyspaces[i] != newKeyspaces[i] {
sameMembership = false
break
}
}
}

var newKeyspaceLookupTable map[uint32]struct{}

if sameMembership {
// The keyspace group membership is not changed, so we reuse the old one.
newKeyspaceLookupTable = oldKeyspaceLookupTable
} else {
// The keyspace group membership is changed, so we update the keyspace lookup table.
newKeyspaceLookupTable = make(map[uint32]struct{})
for i, j := 0, 0; i < oldLen || j < newLen; {
if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] {
newKeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
i++
j++
} else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen {
kgm.keyspaceLookupTable.Delete(oldKeyspaces[i])
i++
} else {
newKeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
kgm.keyspaceLookupTable.Store(newKeyspaces[j], groupID)
j++
}
}
}

return newKeyspaceLookupTable
}

func (kgm *KeyspaceGroupManager) buildKeyspaceLookupTable(groupID uint32, keyspaces []uint32) map[uint32]struct{} {
keyspaceLookupTable := make(map[uint32]struct{})
for _, kid := range keyspaces {
keyspaceLookupTable[kid] = struct{}{}
kgm.keyspaceLookupTable.Store(kid, groupID)
}
return keyspaceLookupTable
}

// deleteKeyspaceGroup deletes the given keyspace group.
func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(id uint32) {
kgm.ksgs[id].Store(nil)
am := kgm.ams[id].Swap(nil)
if am == nil {
return
func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
kg := kgm.kgs[groupID].Swap(nil)
if kg != nil {
for _, kid := range kg.Keyspaces {
kgm.keyspaceLookupTable.CompareAndDelete(kid, kg.ID)
}
}
am.close()
log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", id))
am := kgm.ams[groupID].Swap(nil)
if am != nil {
am.close()
}
Comment on lines +560 to +569
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it atomic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline. leave what it is for now, and we may improve it in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And maybe we can improve it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very tiny unreliable/inconsistent window and we can just let client retry to get over it. After all, there are other unreliable/inconsistency windows during kg-split/keyspace-movement/kg-movement which can't be avoided. It won't cause incorrect result for tso requests, because the timeline split problem will be specifically handled by kg-split/keyspace-movement and there is no time split problem because of primary election among replicas within a kg.


log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", groupID))
}

// GetAllocatorManager returns the AllocatorManager of the given keyspace group
func (kgm *KeyspaceGroupManager) GetAllocatorManager(id uint32) (*AllocatorManager, error) {
if err := kgm.checkKeySpaceGroupID(id); err != nil {
func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) (*AllocatorManager, error) {
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return nil, err
}
if am := kgm.ams[id].Load(); am != nil {
if am := kgm.ams[keyspaceGroupID].Load(); am != nil {
return am, nil
}
return nil, errs.ErrGetAllocatorManager.FastGenByArgs(
fmt.Sprintf("requested keyspace group with id %d %s by this host/pod", id, errs.NotServedErr))
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}

// GetAMWithMembershipCheck returns the AllocatorManager of the given keyspace group and check if the keyspace
// is served by this keyspace group.
func (kgm *KeyspaceGroupManager) GetAMWithMembershipCheck(
keyspaceID, keyspaceGroupID uint32,
) (*AllocatorManager, error) {
if am := kgm.ams[keyspaceGroupID].Load(); am != nil {
ksg := kgm.kgs[keyspaceGroupID].Load()
if ksg == nil {
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}
if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; !ok {
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}
return am, nil
}
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}

// GetElectionMember returns the election member of the given keyspace group
func (kgm *KeyspaceGroupManager) GetElectionMember(id uint32) (ElectionMember, error) {
am, err := kgm.GetAllocatorManager(id)
func (kgm *KeyspaceGroupManager) GetElectionMember(
keyspaceID, keyspaceGroupID uint32,
) (ElectionMember, error) {
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return nil, err
}
am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID)
if err != nil {
return nil, err
}
return am.getMember(), nil
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group.
func (kgm *KeyspaceGroupManager) HandleTSORequest(id uint32, dcLocation string, count uint32) (pdpb.Timestamp, error) {
am, err := kgm.GetAllocatorManager(id)
func (kgm *KeyspaceGroupManager) HandleTSORequest(
keyspaceID, keyspaceGroupID uint32,
dcLocation string, count uint32,
) (ts pdpb.Timestamp, currentKeyspaceGroupID uint32, err error) {
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return pdpb.Timestamp{}, keyspaceGroupID, err
}
am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, err
// The keyspace doesn't belong to this keyspace group, we should check if it belongs to any other
// keyspace groups, and return the correct keyspace group ID to the client.
kgid, loaded := kgm.keyspaceLookupTable.Load(keyspaceID)
if loaded && kgid != nil {
return pdpb.Timestamp{}, kgid.(uint32), err
}
return pdpb.Timestamp{}, keyspaceGroupID, errs.ErrKeyspaceNotAssigned.FastGenByArgs(keyspaceID)
}
return am.HandleRequest(dcLocation, count)
ts, err = am.HandleRequest(dcLocation, count)
return ts, keyspaceGroupID, err
}

func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error {
if id < mcsutils.MaxKeyspaceGroupCountInUse {
return nil
}
return errs.ErrKeyspaceGroupIDInvalid.FastGenByArgs(
fmt.Sprintf("invalid keyspace group id %d which shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse))
fmt.Sprintf("%d shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse))
}

func (kgm *KeyspaceGroupManager) genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error {
return perr.FastGenByArgs(
fmt.Sprintf(
"requested keyspace group with id %d %s by this host/pod",
keyspaceGroupID, errs.NotServedErr))
}
Loading