From 80f620cb827dbbd2f0d6fcdfa93d04d3d052e60f Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 6 Apr 2023 18:54:49 -0700 Subject: [PATCH 01/11] Apply multi-keyspace-group membership to tso service and handle inconsistency issue Signed-off-by: Bin Shi --- pkg/errs/errno.go | 1 + pkg/mcs/tso/server/grpc_service.go | 33 ++++----- pkg/mcs/tso/server/server.go | 6 +- pkg/storage/endpoint/tso_keyspace_group.go | 3 + pkg/tso/keyspace_group_manager.go | 78 ++++++++++++++++++---- pkg/tso/local_allocator.go | 3 +- 6 files changed, 92 insertions(+), 32 deletions(-) diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index d14275bb5e3..c1ee62ed46f 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -52,6 +52,7 @@ var ( ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout")) ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated")) ErrLoadKeyspaceGroupsRetryExhaustd = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("ErrLoadKeyspaceGroupsRetryExhaustd")) + ErrKeyspaceNotAssigned = errors.Normalize("the keyspace isn't assigned to any keyspace group, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned")) ) // member errors diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go index c5a7f88ed30..21a5bef0d60 100644 --- a/pkg/mcs/tso/server/grpc_service.go +++ b/pkg/mcs/tso/server/grpc_service.go @@ -25,7 +25,6 @@ import ( "github.com/pkg/errors" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/mcs/registry" - "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/tsoutil" @@ -131,16 +130,19 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { return status.Errorf(codes.Unknown, "server not started") } if request.GetHeader().GetClusterId() != s.clusterID { - return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId()) + return status.Errorf( + codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", + s.clusterID, request.GetHeader().GetClusterId()) } count := request.GetCount() - ts, err := s.keyspaceGroupManager.HandleTSORequest(utils.DefaultKeySpaceGroupID, request.GetDcLocation(), count) + ts, keyspaceGroupBelongTo, err := s.keyspaceGroupManager.HandleTSORequest( + request.Header.KeyspaceId, request.Header.KeyspaceGroupId, request.GetDcLocation(), count) if err != nil { return status.Errorf(codes.Unknown, err.Error()) } tsoHandleDuration.Observe(time.Since(start).Seconds()) response := &tsopb.TsoResponse{ - Header: s.header(), + Header: s.header(keyspaceGroupBelongTo), Timestamp: &ts, Count: count, } @@ -150,23 +152,24 @@ func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { } } -func (s *Service) header() *tsopb.ResponseHeader { +func (s *Service) header(keyspaceGroupBelongTo uint32) *tsopb.ResponseHeader { if s.clusterID == 0 { - return s.wrapErrorToHeader(tsopb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready") + return s.wrapErrorToHeader( + tsopb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready", keyspaceGroupBelongTo) } - return &tsopb.ResponseHeader{ClusterId: s.clusterID} + return &tsopb.ResponseHeader{ClusterId: s.clusterID, KeyspaceGroupId: keyspaceGroupBelongTo} } -func (s *Service) wrapErrorToHeader(errorType tsopb.ErrorType, message string) *tsopb.ResponseHeader { - return s.errorHeader(&tsopb.Error{ - Type: errorType, - Message: message, - }) +func (s *Service) wrapErrorToHeader( + errorType tsopb.ErrorType, message string, keyspaceGroupBelongTo uint32, +) *tsopb.ResponseHeader { + return s.errorHeader(&tsopb.Error{Type: errorType, Message: message}, keyspaceGroupBelongTo) } -func (s *Service) errorHeader(err *tsopb.Error) *tsopb.ResponseHeader { +func (s *Service) errorHeader(err *tsopb.Error, keyspaceGroupBelongTo uint32) *tsopb.ResponseHeader { return &tsopb.ResponseHeader{ - ClusterId: s.clusterID, - Error: err, + ClusterId: s.clusterID, + Error: err, + KeyspaceGroupId: keyspaceGroupBelongTo, } } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 642952b16e4..a54ce8e9118 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -208,7 +208,8 @@ func (s *Server) IsServing() bool { return false } - member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID) + member, err := s.keyspaceGroupManager.GetElectionMember( + mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID) if err != nil { log.Error("failed to get election member", errs.ZapError(err)) return false @@ -219,7 +220,8 @@ func (s *Server) IsServing() bool { // GetLeaderListenUrls gets service endpoints from the leader in election group. // The entry at the index 0 is the primary's service endpoint. func (s *Server) GetLeaderListenUrls() []string { - member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID) + member, err := s.keyspaceGroupManager.GetElectionMember( + mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeySpaceGroupID) if err != nil { log.Error("failed to get election member", errs.ZapError(err)) return nil diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 91268d2c739..39be78493fe 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -73,6 +73,9 @@ type KeyspaceGroup struct { Members []KeyspaceGroupMember `json:"members"` // Keyspaces are the keyspace IDs which belong to the keyspace group. Keyspaces []uint32 `json:"keyspaces"` + // KeyspaceLookupTable is for fast lookup if an given keyspace belongs to this keyspace group. + // It's not persisted and will be built when loading from storage. + KeyspaceLookupTable map[uint32]struct{} `json:"-"` } // KeyspaceGroupStorage is the interface for keyspace group storage. diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 2f97299fede..5d7d349fca7 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -57,8 +57,8 @@ const ( // The replicas campaign for the leaders which provide the tso service for the corresponding // keyspace groups. type KeyspaceGroupManager struct { - // ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned - // with an allocator manager managing its global/local tso allocators. + // ams stores the allocator managers of the keyspace groups. Each keyspace group is + // assigned with an allocator manager managing its global/local tso allocators. // Use a fixed size array to maximize the efficiency of concurrent access to // different keyspace groups for tso service. ams [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[AllocatorManager] @@ -446,6 +446,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro if kgm.ams[group.ID].Load() != nil { log.Info("keyspace group already initialized, so update meta only", zap.Uint32("keyspace-group-id", group.ID)) + group.KeyspaceLookupTable = make(map[uint32]struct{}) + for _, kid := range group.Keyspaces { + group.KeyspaceLookupTable[kid] = struct{}{} + } kgm.ksgs[group.ID].Store(group) return } @@ -476,6 +480,11 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro } kgm.ams[group.ID].Store(NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)) + + group.KeyspaceLookupTable = make(map[uint32]struct{}) + for _, kid := range group.Keyspaces { + group.KeyspaceLookupTable[kid] = struct{}{} + } kgm.ksgs[group.ID].Store(group) } else { // Not assigned to me. If this host/pod owns this keyspace group, it should resign. @@ -495,20 +504,40 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(id uint32) { } // GetAllocatorManager returns the AllocatorManager of the given keyspace group -func (kgm *KeyspaceGroupManager) GetAllocatorManager(id uint32) (*AllocatorManager, error) { - if err := kgm.checkKeySpaceGroupID(id); err != nil { +func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) (*AllocatorManager, error) { + if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil { + return nil, err + } + if am := kgm.ams[keyspaceGroupID].Load(); am != nil { + return am, nil + } + return nil, kgm.genNotServedErr(keyspaceGroupID) +} + +// GetAllocatorManager returns the AllocatorManager of the given keyspace group +func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceID, keyspaceGroupID uint32) (*AllocatorManager, error) { + if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil { return nil, err } - if am := kgm.ams[id].Load(); am != nil { + if am := kgm.ams[keyspaceGroupID].Load(); am != nil { + ksg := kgm.ksgs[keyspaceGroupID].Load() + if ksg == nil { + return nil, kgm.genNotServedErr(keyspaceGroupID) + } + if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; !ok { + return nil, kgm.genNotServedErr(keyspaceGroupID) + } return am, nil } - return nil, errs.ErrGetAllocatorManager.FastGenByArgs( - fmt.Sprintf("requested keyspace group with id %d %s by this host/pod", id, errs.NotServedErr)) + return nil, kgm.genNotServedErr(keyspaceGroupID) } // GetElectionMember returns the election member of the given keyspace group -func (kgm *KeyspaceGroupManager) GetElectionMember(id uint32) (ElectionMember, error) { - am, err := kgm.GetAllocatorManager(id) +// TODO: support multiple keyspace groups for GetElectionMember +func (kgm *KeyspaceGroupManager) GetElectionMember( + keyspaceID, keyspaceGroupID uint32, +) (ElectionMember, error) { + am, err := kgm.GetAllocatorManager(keyspaceID, keyspaceGroupID) if err != nil { return nil, err } @@ -516,12 +545,26 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(id uint32) (ElectionMember, e } // HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group. -func (kgm *KeyspaceGroupManager) HandleTSORequest(id uint32, dcLocation string, count uint32) (pdpb.Timestamp, error) { - am, err := kgm.GetAllocatorManager(id) +func (kgm *KeyspaceGroupManager) HandleTSORequest( + keyspaceID, keyspaceGroupID uint32, + dcLocation string, count uint32, +) (ts pdpb.Timestamp, currentKeyspaceGroupID uint32, err error) { + am, err := kgm.GetAllocatorManager(keyspaceID, keyspaceGroupID) if err != nil { - return pdpb.Timestamp{}, err + if strings.Contains(err.Error(), errs.NotServedErr) { + for _, ksgp := range kgm.ksgs { + if ksg := ksgp.Load(); ksg == nil { + continue; + } else if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; ok { + return pdpb.Timestamp{}, ksg.ID,err + } + } + return pdpb.Timestamp{}, keyspaceGroupID, err + } + return pdpb.Timestamp{}, keyspaceGroupID, err } - return am.HandleRequest(dcLocation, count) + ts, err = am.HandleRequest(dcLocation, count) + return ts, keyspaceGroupID, err } func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { @@ -529,5 +572,12 @@ func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { return nil } return errs.ErrKeyspaceGroupIDInvalid.FastGenByArgs( - fmt.Sprintf("invalid keyspace group id %d which shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse)) + fmt.Sprintf("invalid keyspace group id %d which shouldn't >= %d", + id, mcsutils.MaxKeyspaceGroupCountInUse)) } + +func (kgm *KeyspaceGroupManager) genNotServedErr(keyspaceGroupID uint32) error { + return errs.ErrGetAllocatorManager.FastGenByArgs( + fmt.Sprintf("requested keyspace group with id %d %s by this host/pod", + keyspaceGroupID, errs.NotServedErr)) +} \ No newline at end of file diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index db369079279..9b0d1dc1869 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -105,7 +105,8 @@ func (lta *LocalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCh func (lta *LocalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) { if !lta.leadership.Check() { tsoCounter.WithLabelValues("not_leader", lta.timestampOracle.dcLocation).Inc() - return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation)) + return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs( + fmt.Sprintf("requested pd %s of %s allocator", errs.NotLeaderErr, lta.timestampOracle.dcLocation)) } return lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits()) } From 2fc6c9a3a0c608dacada79675a8f9ed94b0579ee Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 6 Apr 2023 19:41:38 -0700 Subject: [PATCH 02/11] Apply multi-keyspace-group membership to tso service and handle inconsistency issue. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Add KeyspaceLookupTable to endpoint.KeyspaceGroup type KeyspaceGroup struct { ... // KeyspaceLookupTable is for fast lookup if a given keyspace belongs to this keyspace group. // It's not persisted and will be built when loading from storage. KeyspaceLookupTable map[uint32]struct{} `json:"-"` } 2. After loading keyspace groups, the Keyspace Group Manager builds KeyspaceLookupTable for every keyspace groups. 3. When Keyspace Group Manager handles tso requests, it uses the keyspaceLookupTable to check if the required keypsace still belongs to the required keyspace group。If not, returns the current keyspace group id in the tso response header. Signed-off-by: Bin Shi --- pkg/storage/endpoint/tso_keyspace_group.go | 2 +- pkg/tso/keyspace_group_manager.go | 24 +++++++++++++--------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index 39be78493fe..aca0b847dac 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -73,7 +73,7 @@ type KeyspaceGroup struct { Members []KeyspaceGroupMember `json:"members"` // Keyspaces are the keyspace IDs which belong to the keyspace group. Keyspaces []uint32 `json:"keyspaces"` - // KeyspaceLookupTable is for fast lookup if an given keyspace belongs to this keyspace group. + // KeyspaceLookupTable is for fast lookup if a given keyspace belongs to this keyspace group. // It's not persisted and will be built when loading from storage. KeyspaceLookupTable map[uint32]struct{} `json:"-"` } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 5d7d349fca7..5cdb1223cf5 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -514,8 +514,11 @@ func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) (*A return nil, kgm.genNotServedErr(keyspaceGroupID) } -// GetAllocatorManager returns the AllocatorManager of the given keyspace group -func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceID, keyspaceGroupID uint32) (*AllocatorManager, error) { +// GetAMWithMembershipCheck returns the AllocatorManager of the given keyspace group and check if the keyspace +// is served by this keyspace group. +func (kgm *KeyspaceGroupManager) GetAMWithMembershipCheck( + keyspaceID, keyspaceGroupID uint32, +) (*AllocatorManager, error) { if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil { return nil, err } @@ -537,7 +540,7 @@ func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceID, keyspaceGroupID func (kgm *KeyspaceGroupManager) GetElectionMember( keyspaceID, keyspaceGroupID uint32, ) (ElectionMember, error) { - am, err := kgm.GetAllocatorManager(keyspaceID, keyspaceGroupID) + am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID) if err != nil { return nil, err } @@ -549,17 +552,18 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( keyspaceID, keyspaceGroupID uint32, dcLocation string, count uint32, ) (ts pdpb.Timestamp, currentKeyspaceGroupID uint32, err error) { - am, err := kgm.GetAllocatorManager(keyspaceID, keyspaceGroupID) + am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID) if err != nil { + // If the keyspace doesn't belong to this keyspace group, we should check if it belongs to any other + // keyspace groups, and return the correct keyspace group ID to the client. if strings.Contains(err.Error(), errs.NotServedErr) { - for _, ksgp := range kgm.ksgs { - if ksg := ksgp.Load(); ksg == nil { - continue; + for i := 0; i < int(mcsutils.MaxKeyspaceGroupCountInUse); i++ { + if ksg := kgm.ksgs[i].Load(); ksg == nil { + continue } else if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; ok { - return pdpb.Timestamp{}, ksg.ID,err + return pdpb.Timestamp{}, ksg.ID, err } } - return pdpb.Timestamp{}, keyspaceGroupID, err } return pdpb.Timestamp{}, keyspaceGroupID, err } @@ -580,4 +584,4 @@ func (kgm *KeyspaceGroupManager) genNotServedErr(keyspaceGroupID uint32) error { return errs.ErrGetAllocatorManager.FastGenByArgs( fmt.Sprintf("requested keyspace group with id %d %s by this host/pod", keyspaceGroupID, errs.NotServedErr)) -} \ No newline at end of file +} From 4349074965d313634eee4a8da61af9ecdc15d859 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Sun, 9 Apr 2023 21:22:29 -0700 Subject: [PATCH 03/11] Add unittest Signed-off-by: Bin Shi --- errors.toml | 5 ++ pkg/tso/keyspace_group_manager.go | 39 ++++++------ pkg/tso/keyspace_group_manager_test.go | 84 +++++++++++++++++++++++--- 3 files changed, 102 insertions(+), 26 deletions(-) diff --git a/errors.toml b/errors.toml index 1133a8bf12a..9bf2e77c8a4 100644 --- a/errors.toml +++ b/errors.toml @@ -761,6 +761,11 @@ error = ''' the keyspace group id is invalid, %s ''' +["PD:tso:ErrKeyspaceNotAssigned"] +error = ''' +the keyspace isn't assigned to any keyspace group, %s +''' + ["PD:tso:ErrLogicOverflow"] error = ''' logic part overflow diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 5cdb1223cf5..f7bbe03546e 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "time" + perrors "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -511,7 +512,7 @@ func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) (*A if am := kgm.ams[keyspaceGroupID].Load(); am != nil { return am, nil } - return nil, kgm.genNotServedErr(keyspaceGroupID) + return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID) } // GetAMWithMembershipCheck returns the AllocatorManager of the given keyspace group and check if the keyspace @@ -519,20 +520,17 @@ func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) (*A func (kgm *KeyspaceGroupManager) GetAMWithMembershipCheck( keyspaceID, keyspaceGroupID uint32, ) (*AllocatorManager, error) { - if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil { - return nil, err - } if am := kgm.ams[keyspaceGroupID].Load(); am != nil { ksg := kgm.ksgs[keyspaceGroupID].Load() if ksg == nil { - return nil, kgm.genNotServedErr(keyspaceGroupID) + return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID) } if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; !ok { - return nil, kgm.genNotServedErr(keyspaceGroupID) + return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID) } return am, nil } - return nil, kgm.genNotServedErr(keyspaceGroupID) + return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID) } // GetElectionMember returns the election member of the given keyspace group @@ -540,6 +538,9 @@ func (kgm *KeyspaceGroupManager) GetAMWithMembershipCheck( func (kgm *KeyspaceGroupManager) GetElectionMember( keyspaceID, keyspaceGroupID uint32, ) (ElectionMember, error) { + if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil { + return nil, err + } am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID) if err != nil { return nil, err @@ -552,17 +553,18 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( keyspaceID, keyspaceGroupID uint32, dcLocation string, count uint32, ) (ts pdpb.Timestamp, currentKeyspaceGroupID uint32, err error) { + if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil { + return pdpb.Timestamp{}, keyspaceGroupID, err + } am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID) if err != nil { - // If the keyspace doesn't belong to this keyspace group, we should check if it belongs to any other + // The keyspace doesn't belong to this keyspace group, we should check if it belongs to any other // keyspace groups, and return the correct keyspace group ID to the client. - if strings.Contains(err.Error(), errs.NotServedErr) { - for i := 0; i < int(mcsutils.MaxKeyspaceGroupCountInUse); i++ { - if ksg := kgm.ksgs[i].Load(); ksg == nil { - continue - } else if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; ok { - return pdpb.Timestamp{}, ksg.ID, err - } + for i := 0; i < int(mcsutils.MaxKeyspaceGroupCountInUse); i++ { + if ksg := kgm.ksgs[i].Load(); ksg == nil { + continue + } else if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; ok { + return pdpb.Timestamp{}, ksg.ID, err } } return pdpb.Timestamp{}, keyspaceGroupID, err @@ -580,8 +582,9 @@ func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { id, mcsutils.MaxKeyspaceGroupCountInUse)) } -func (kgm *KeyspaceGroupManager) genNotServedErr(keyspaceGroupID uint32) error { - return errs.ErrGetAllocatorManager.FastGenByArgs( - fmt.Sprintf("requested keyspace group with id %d %s by this host/pod", +func (kgm *KeyspaceGroupManager) genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error { + return perr.FastGenByArgs( + fmt.Sprintf( + "requested keyspace group with id %d %s by this host/pod", keyspaceGroupID, errs.NotServedErr)) } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 64f1a462293..104d9275f44 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -181,7 +181,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { addKeyspaceGroupAssignment( suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0)) + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) // Set the timeout to 1 second and inject the delayLoadKeyspaceGroups to return 3 seconds to let // the loading sleep 3 seconds. @@ -204,7 +204,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTem addKeyspaceGroupAssignment( suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0)) + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) // Set the max retry times to 3 and inject the loadKeyspaceGroupsTemporaryFail to return 2 to let // loading from etcd fail 2 times but the whole initialization still succeeds. @@ -226,7 +226,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() { addKeyspaceGroupAssignment( suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0)) + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) // Set the max retry times to 3 and inject the loadKeyspaceGroupsTemporaryFail to return 3 to let // loading from etcd fail 3 times which should cause the whole initialization to fail. @@ -307,6 +307,73 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( }) } +// TestGetAMWithMembershipCheck tests GetAMWithMembershipCheck. +func (suite *keyspaceGroupManagerTestSuite) TestGetAMWithMembershipCheck() { + re := suite.Require() + + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1) + re.NotNil(mgr) + defer mgr.Close() + + var ( + am *AllocatorManager + err error + ) + + // Create keyspace group 0 which contains keyspace 0, 1, 2. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + uint32(0), []uint32{0, 1, 2}) + + err = mgr.Initialize(true) + re.NoError(err) + + // Should be able to get AM for keyspace 0, 1, 2 in keyspace group 0. + am, err = mgr.GetAMWithMembershipCheck(0, 0) + re.NoError(err) + re.NotNil(am) + am, err = mgr.GetAMWithMembershipCheck(1, 0) + re.NoError(err) + re.NotNil(am) + am, err = mgr.GetAMWithMembershipCheck(2, 0) + re.NoError(err) + re.NotNil(am) + // Should fail because keyspace 3 is not in keyspace group 0. + am, err = mgr.GetAMWithMembershipCheck(3, 0) + re.Error(err) + re.Nil(am) + // Should fail because keyspace group 1 doesn't exist. + am, err = mgr.GetAMWithMembershipCheck(0, 1) + re.Error(err) + re.Nil(am) +} + +// TestHandleTSORequestWithWrongMembership tests the case that HandleTSORequest receives +// a tso request with mismatched keyspace and keyspace group. +func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembership() { + re := suite.Require() + + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1) + re.NotNil(mgr) + defer mgr.Close() + + // Create keyspace group 0 which contains keyspace 0, 1, 2. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, + uint32(0), []uint32{0, 1, 2}) + + err := mgr.Initialize(true) + re.NoError(err) + + // Should fail because keyspace 0 is not in keyspace group 1 and the API returns + // the keyspace group 0 to which the keyspace 0 belongs. + _, keyspaceGroupBelongTo, err := mgr.HandleTSORequest(0, 1, GlobalDCLocation, 1) + re.Error(err) + re.Equal(uint32(0), keyspaceGroupBelongTo) +} + type etcdEvent struct { eventType mvccpb.Event_EventType ksg *endpoint.KeyspaceGroup @@ -374,7 +441,7 @@ func runTestLoadKeyspaceGroupsAssignment( } addKeyspaceGroupAssignment( ctx, etcdClient, assignToMe, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(j)) + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(j), []uint32{uint32(j)}) } }(i) } @@ -446,7 +513,8 @@ func deleteKeyspaceGroupInEtcd( // addKeyspaceGroupAssignment adds a keyspace group assignment to etcd. func addKeyspaceGroupAssignment( ctx context.Context, etcdClient *clientv3.Client, - assignToMe bool, rootPath, svcAddr string, id uint32, + assignToMe bool, rootPath, svcAddr string, + groupID uint32, keyspaces []uint32, ) error { var location string if assignToMe { @@ -455,12 +523,12 @@ func addKeyspaceGroupAssignment( location = uuid.NewString() } group := &endpoint.KeyspaceGroup{ - ID: id, + ID: groupID, Members: []endpoint.KeyspaceGroupMember{{Address: location}}, - Keyspaces: []uint32{id}, + Keyspaces: keyspaces, } - key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(id)}, "/") + key := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(groupID)}, "/") value, err := json.Marshal(group) if err != nil { return err From 33a5ee8e83038b61fb7c9528254e2add4c9f3778 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 10 Apr 2023 12:19:49 -0700 Subject: [PATCH 04/11] Implement new algorithm to optimize for read Signed-off-by: Bin Shi --- errors.toml | 2 +- pkg/errs/errno.go | 2 +- pkg/tso/keyspace_group_manager.go | 142 +++++++++++++++++++------ pkg/tso/keyspace_group_manager_test.go | 51 ++++++++- 4 files changed, 163 insertions(+), 34 deletions(-) diff --git a/errors.toml b/errors.toml index 9bf2e77c8a4..4777112b5b3 100644 --- a/errors.toml +++ b/errors.toml @@ -763,7 +763,7 @@ the keyspace group id is invalid, %s ["PD:tso:ErrKeyspaceNotAssigned"] error = ''' -the keyspace isn't assigned to any keyspace group, %s +the keyspace %d isn't assigned to any keyspace group ''' ["PD:tso:ErrLogicOverflow"] diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index c1ee62ed46f..827ea513ee6 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -52,7 +52,7 @@ var ( ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout")) ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated")) ErrLoadKeyspaceGroupsRetryExhaustd = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("ErrLoadKeyspaceGroupsRetryExhaustd")) - ErrKeyspaceNotAssigned = errors.Normalize("the keyspace isn't assigned to any keyspace group, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned")) + ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned")) ) // member errors diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index f7bbe03546e..c799bcef011 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -63,8 +63,11 @@ type KeyspaceGroupManager struct { // Use a fixed size array to maximize the efficiency of concurrent access to // different keyspace groups for tso service. ams [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[AllocatorManager] - // ksgs stores the keyspace groups' membership/distribution meta. - ksgs [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[endpoint.KeyspaceGroup] + // kgs stores the keyspace groups' membership/distribution meta. + kgs [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[endpoint.KeyspaceGroup] + // keyspaceLookupTable is a map from keyspace (id) to its keyspace group (id). + // stored as map[uint32]uint32 + keyspaceLookupTable sync.Map ctx context.Context cancel context.CancelFunc @@ -447,11 +450,11 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro if kgm.ams[group.ID].Load() != nil { log.Info("keyspace group already initialized, so update meta only", zap.Uint32("keyspace-group-id", group.ID)) - group.KeyspaceLookupTable = make(map[uint32]struct{}) - for _, kid := range group.Keyspaces { - group.KeyspaceLookupTable[kid] = struct{}{} - } - kgm.ksgs[group.ID].Store(group) + + oldGroup := kgm.kgs[group.ID].Load() + group.KeyspaceLookupTable = kgm.updateKeyspaceGroupMembership( + group.ID, oldGroup.Keyspaces, group.Keyspaces, oldGroup.KeyspaceLookupTable) + kgm.kgs[group.ID].Store(group) return } @@ -480,28 +483,111 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } + group.KeyspaceLookupTable = kgm.buildKeyspaceLookupTable(group.ID, group.Keyspaces) + kgm.kgs[group.ID].Store(group) kgm.ams[group.ID].Store(NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)) - - group.KeyspaceLookupTable = make(map[uint32]struct{}) - for _, kid := range group.Keyspaces { - group.KeyspaceLookupTable[kid] = struct{}{} - } - kgm.ksgs[group.ID].Store(group) } else { // Not assigned to me. If this host/pod owns this keyspace group, it should resign. kgm.deleteKeyspaceGroup(group.ID) } } +// updateKeyspaceGroupMembership updates the keyspace lookup table for the given keyspace group. +// Mostly, the membership has no change, so we optimize for this case. +func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( + groupID uint32, oldKeyspaces, newKeyspaces []uint32, + defaultKeyspaceLookupTable map[uint32]struct{}, +) (keyspaceLookupTable map[uint32]struct{}) { + oldLen := len(oldKeyspaces) + newLen := len(newKeyspaces) + + sameMembership := true + i, j := 0, 0 + for i < oldLen || j < newLen { + // We assume that the keyspace IDs are sorted in ascending order. If not, we will + // break the loop and do a full update. + if i < oldLen && i > 0 && oldKeyspaces[i-1] >= oldKeyspaces[i] { + break + } + if j < newLen && j > 0 && newKeyspaces[j-1] >= newKeyspaces[j] { + break + } + + if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] { + i++ + j++ + } else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen { + sameMembership = false + kgm.keyspaceLookupTable.Delete(oldKeyspaces[i]) + i++ + } else { + sameMembership = false + kgm.keyspaceLookupTable.Store(newKeyspaces[j], groupID) + if keyspaceLookupTable == nil { + keyspaceLookupTable = make(map[uint32]struct{}) + } + keyspaceLookupTable[newKeyspaces[j]] = struct{}{} + j++ + } + } + + if i < oldLen || j < newLen { + log.Warn("keyspace IDs are not sorted in ascending order, do a full update", + zap.Uint32("keyspace-group-id", groupID)) + + // Do a full update, because the keyspace IDs are not sorted in ascending order. + for _, kid := range oldKeyspaces { + kgm.keyspaceLookupTable.Delete(kid) + } + keyspaceLookupTable = kgm.buildKeyspaceLookupTable(groupID, newKeyspaces) + } else if sameMembership { + // The keyspace group membership is not changed, so we reuse the old one. + return defaultKeyspaceLookupTable + } else { + // The keyspace group membership is changed, so we update the keyspace lookup table. + // We haven't added the keyspace IDs which belong to both old and new groups, so add them. + if keyspaceLookupTable == nil { + keyspaceLookupTable = make(map[uint32]struct{}) + } + for i, j = 0, 0; i < oldLen && j < newLen; { + if oldKeyspaces[i] == newKeyspaces[j] { + keyspaceLookupTable[oldKeyspaces[i]] = struct{}{} + i++ + j++ + } else if oldKeyspaces[i] < newKeyspaces[j] { + i++ + } else { + j++ + } + } + } + + return keyspaceLookupTable +} + +func (kgm *KeyspaceGroupManager) buildKeyspaceLookupTable(groupID uint32, keyspaces []uint32) map[uint32]struct{} { + keyspaceLookupTable := make(map[uint32]struct{}) + for _, kid := range keyspaces { + keyspaceLookupTable[kid] = struct{}{} + kgm.keyspaceLookupTable.Store(kid, groupID) + } + return keyspaceLookupTable +} + // deleteKeyspaceGroup deletes the given keyspace group. -func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(id uint32) { - kgm.ksgs[id].Store(nil) - am := kgm.ams[id].Swap(nil) - if am == nil { - return +func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { + kg := kgm.kgs[groupID].Swap(nil) + if kg != nil { + for _, kid := range kg.Keyspaces { + kgm.keyspaceLookupTable.CompareAndDelete(kid, kg.ID) + } } - am.close() - log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", id)) + am := kgm.ams[groupID].Swap(nil) + if am != nil { + am.close() + } + + log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", groupID)) } // GetAllocatorManager returns the AllocatorManager of the given keyspace group @@ -521,7 +607,7 @@ func (kgm *KeyspaceGroupManager) GetAMWithMembershipCheck( keyspaceID, keyspaceGroupID uint32, ) (*AllocatorManager, error) { if am := kgm.ams[keyspaceGroupID].Load(); am != nil { - ksg := kgm.ksgs[keyspaceGroupID].Load() + ksg := kgm.kgs[keyspaceGroupID].Load() if ksg == nil { return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID) } @@ -560,14 +646,11 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { // The keyspace doesn't belong to this keyspace group, we should check if it belongs to any other // keyspace groups, and return the correct keyspace group ID to the client. - for i := 0; i < int(mcsutils.MaxKeyspaceGroupCountInUse); i++ { - if ksg := kgm.ksgs[i].Load(); ksg == nil { - continue - } else if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; ok { - return pdpb.Timestamp{}, ksg.ID, err - } + kgid, loaded := kgm.keyspaceLookupTable.Load(keyspaceID) + if loaded && kgid != nil { + return pdpb.Timestamp{}, kgid.(uint32), err } - return pdpb.Timestamp{}, keyspaceGroupID, err + return pdpb.Timestamp{}, keyspaceGroupID, errs.ErrKeyspaceNotAssigned.FastGenByArgs(keyspaceID) } ts, err = am.HandleRequest(dcLocation, count) return ts, keyspaceGroupID, err @@ -578,8 +661,7 @@ func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { return nil } return errs.ErrKeyspaceGroupIDInvalid.FastGenByArgs( - fmt.Sprintf("invalid keyspace group id %d which shouldn't >= %d", - id, mcsutils.MaxKeyspaceGroupCountInUse)) + fmt.Sprintf("%d which shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse)) } func (kgm *KeyspaceGroupManager) genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 104d9275f44..09848bca6c6 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -543,8 +543,8 @@ func addKeyspaceGroupAssignment( func collectAssignedKeyspaceGroupIDs(re *require.Assertions, ksgMgr *KeyspaceGroupManager) []int { ids := []int{} - for i := 0; i < len(ksgMgr.ksgs); i++ { - ksg := ksgMgr.ksgs[i].Load() + for i := 0; i < len(ksgMgr.kgs); i++ { + ksg := ksgMgr.kgs[i].Load() if ksg == nil { re.Nil(ksgMgr.ams[i].Load(), fmt.Sprintf("ksg is nil but am is not nil for id %d", i)) } else { @@ -563,3 +563,50 @@ func collectAssignedKeyspaceGroupIDs(re *require.Assertions, ksgMgr *KeyspaceGro return ids } + +func (suite *keyspaceGroupManagerTestSuite) TestUpdateKeyspaceGroupMembership() { + re := suite.Require() + + var keyspaceLookupTable map[uint32]struct{} + + // Start with empty keyspace group. + // Add keyspace 1 to the keyspace group. + oldKeyspaces := []uint32{} + newKeyspaces := []uint32{1} + defaultKeyspaceLookupTable := map[uint32]struct{}{} + kgm := &KeyspaceGroupManager{} + + keyspaceLookupTable = kgm.updateKeyspaceGroupMembership(0, oldKeyspaces, newKeyspaces, defaultKeyspaceLookupTable) + verifyLocalKeyspaceLookupTable(re, keyspaceLookupTable, newKeyspaces) + verifyGlobalKeyspaceLookupTable(re, kgm, keyspaceLookupTable) + + targetKeyspacesList := [][]uint32 { + {1, 2}, {1, 2}, {1, 2, 3, 4}, {5, 6, 7}, {7, 8, 9}, {1, 2, 3, 4, 5, 6, 7, 8, 9}, {8, 9}, {10}, {}, + } + + for _, keyspaces := range targetKeyspacesList { + oldKeyspaces = newKeyspaces + newKeyspaces = keyspaces + defaultKeyspaceLookupTable = keyspaceLookupTable + keyspaceLookupTable = kgm.updateKeyspaceGroupMembership(0, oldKeyspaces, newKeyspaces, defaultKeyspaceLookupTable) + verifyLocalKeyspaceLookupTable(re, keyspaceLookupTable, newKeyspaces) + verifyGlobalKeyspaceLookupTable(re, kgm, keyspaceLookupTable) + } +} + +func verifyLocalKeyspaceLookupTable(re *require.Assertions, keyspaceLookupTable map[uint32]struct{}, newKeyspaces []uint32) { + re.Equal(len(newKeyspaces), len(keyspaceLookupTable), fmt.Sprintf("%v %v", newKeyspaces, keyspaceLookupTable)) + for _, keyspace := range newKeyspaces { + _, ok := keyspaceLookupTable[keyspace] + re.True(ok) + } +} + +func verifyGlobalKeyspaceLookupTable(re *require.Assertions, kgm *KeyspaceGroupManager, keyspaceLookupTable map[uint32]struct{}) { + kgm.keyspaceLookupTable.Range(func(key, value interface{}) bool { + _, ok := keyspaceLookupTable[key.(uint32)] + re.True(ok) + re.Equal(uint32(0), value.(uint32)) + return true + }) +} From b5d83e01d7be60f6d374e57422eb32c87b7aa367 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 10 Apr 2023 12:43:17 -0700 Subject: [PATCH 05/11] Refine unittest Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager.go | 6 ++++++ pkg/tso/keyspace_group_manager_test.go | 19 +++++++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index c799bcef011..d585f3f017b 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "path" + "sort" "strings" "sync" "sync/atomic" @@ -539,6 +540,11 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( for _, kid := range oldKeyspaces { kgm.keyspaceLookupTable.Delete(kid) } + // Sort the keyspaces in ascending order + sort.Slice(newKeyspaces, func(i, j int) bool { + return newKeyspaces[i] < newKeyspaces[j] + }) + keyspaceLookupTable = kgm.buildKeyspaceLookupTable(groupID, newKeyspaces) } else if sameMembership { // The keyspace group membership is not changed, so we reuse the old one. diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 09848bca6c6..bafeb11692e 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -569,10 +569,10 @@ func (suite *keyspaceGroupManagerTestSuite) TestUpdateKeyspaceGroupMembership() var keyspaceLookupTable map[uint32]struct{} - // Start with empty keyspace group. + // Start from an empty keyspace group. // Add keyspace 1 to the keyspace group. oldKeyspaces := []uint32{} - newKeyspaces := []uint32{1} + newKeyspaces := []uint32{} defaultKeyspaceLookupTable := map[uint32]struct{}{} kgm := &KeyspaceGroupManager{} @@ -580,8 +580,19 @@ func (suite *keyspaceGroupManagerTestSuite) TestUpdateKeyspaceGroupMembership() verifyLocalKeyspaceLookupTable(re, keyspaceLookupTable, newKeyspaces) verifyGlobalKeyspaceLookupTable(re, kgm, keyspaceLookupTable) - targetKeyspacesList := [][]uint32 { - {1, 2}, {1, 2}, {1, 2, 3, 4}, {5, 6, 7}, {7, 8, 9}, {1, 2, 3, 4, 5, 6, 7, 8, 9}, {8, 9}, {10}, {}, + targetKeyspacesList := [][]uint32{ + {1}, // Add keyspace 1 to the keyspace group. + {1, 2}, // Add keyspace 2 to the keyspace group. + {1, 2}, // No change. + {1, 2, 3, 4}, // Add keyspace 3 and 4 to the keyspace group. + {5, 6, 7}, // Remove keyspace 1, 2, 3, 4 from the keyspace group and add 5, 6, 7 + {7, 8, 9}, // Partially update the keyspace group. + {1, 2, 3, 4, 5, 6, 7, 8, 9}, // Add more keyspace to the keyspace group. + {9, 8, 4, 5, 6}, // Out of order. + {9, 8, 4, 5, 6}, // No change. Out of order. + {8, 9}, // Remove + {10}, // Remove + {}, // End with the empty keyspace group. } for _, keyspaces := range targetKeyspacesList { From c3474a73dc47183b8508b5d18d2c6f907cddf6aa Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 10 Apr 2023 15:56:39 -0700 Subject: [PATCH 06/11] refine test Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index d585f3f017b..6ef36f495b6 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -536,15 +536,14 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( log.Warn("keyspace IDs are not sorted in ascending order, do a full update", zap.Uint32("keyspace-group-id", groupID)) - // Do a full update, because the keyspace IDs are not sorted in ascending order. - for _, kid := range oldKeyspaces { - kgm.keyspaceLookupTable.Delete(kid) + for ; i < oldLen; i++ { + kgm.keyspaceLookupTable.Delete(oldKeyspaces[i]) } + // Do a full update, because the keyspace IDs are not sorted in ascending order. // Sort the keyspaces in ascending order sort.Slice(newKeyspaces, func(i, j int) bool { return newKeyspaces[i] < newKeyspaces[j] }) - keyspaceLookupTable = kgm.buildKeyspaceLookupTable(groupID, newKeyspaces) } else if sameMembership { // The keyspace group membership is not changed, so we reuse the old one. From 8a1751882e2c70f0812705ed7d4e5222d3f10fd6 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 10 Apr 2023 19:53:34 -0700 Subject: [PATCH 07/11] fix go fmt errors Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index bafeb11692e..9973f406533 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -581,18 +581,18 @@ func (suite *keyspaceGroupManagerTestSuite) TestUpdateKeyspaceGroupMembership() verifyGlobalKeyspaceLookupTable(re, kgm, keyspaceLookupTable) targetKeyspacesList := [][]uint32{ - {1}, // Add keyspace 1 to the keyspace group. - {1, 2}, // Add keyspace 2 to the keyspace group. - {1, 2}, // No change. - {1, 2, 3, 4}, // Add keyspace 3 and 4 to the keyspace group. - {5, 6, 7}, // Remove keyspace 1, 2, 3, 4 from the keyspace group and add 5, 6, 7 - {7, 8, 9}, // Partially update the keyspace group. + {1}, // Add keyspace 1 to the keyspace group. + {1, 2}, // Add keyspace 2 to the keyspace group. + {1, 2}, // No change. + {1, 2, 3, 4}, // Add keyspace 3 and 4 to the keyspace group. + {5, 6, 7}, // Remove keyspace 1, 2, 3, 4 from the keyspace group and add 5, 6, 7 + {7, 8, 9}, // Partially update the keyspace group. {1, 2, 3, 4, 5, 6, 7, 8, 9}, // Add more keyspace to the keyspace group. - {9, 8, 4, 5, 6}, // Out of order. - {9, 8, 4, 5, 6}, // No change. Out of order. - {8, 9}, // Remove - {10}, // Remove - {}, // End with the empty keyspace group. + {9, 8, 4, 5, 6}, // Out of order. + {9, 8, 4, 5, 6}, // No change. Out of order. + {8, 9}, // Remove + {10}, // Remove + {}, // End with the empty keyspace group. } for _, keyspaces := range targetKeyspacesList { From 3b5d712585b1a44f988f0ef3308aaa8a3bb8b703 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 10 Apr 2023 20:03:19 -0700 Subject: [PATCH 08/11] refine algorithm Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager.go | 31 ++++++++++++-------------- pkg/tso/keyspace_group_manager_test.go | 1 - 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 6ef36f495b6..dded439b702 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -494,14 +494,14 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro } // updateKeyspaceGroupMembership updates the keyspace lookup table for the given keyspace group. -// Mostly, the membership has no change, so we optimize for this case. func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( groupID uint32, oldKeyspaces, newKeyspaces []uint32, - defaultKeyspaceLookupTable map[uint32]struct{}, -) (keyspaceLookupTable map[uint32]struct{}) { + oldKeyspaceLookupTable map[uint32]struct{}, +) map[uint32]struct{} { oldLen := len(oldKeyspaces) newLen := len(newKeyspaces) + // Mostly, the membership has no change, so we optimize for this case. sameMembership := true i, j := 0, 0 for i < oldLen || j < newLen { @@ -524,14 +524,12 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( } else { sameMembership = false kgm.keyspaceLookupTable.Store(newKeyspaces[j], groupID) - if keyspaceLookupTable == nil { - keyspaceLookupTable = make(map[uint32]struct{}) - } - keyspaceLookupTable[newKeyspaces[j]] = struct{}{} j++ } } + var newKeyspaceLookupTable map[uint32]struct{} + if i < oldLen || j < newLen { log.Warn("keyspace IDs are not sorted in ascending order, do a full update", zap.Uint32("keyspace-group-id", groupID)) @@ -544,30 +542,29 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( sort.Slice(newKeyspaces, func(i, j int) bool { return newKeyspaces[i] < newKeyspaces[j] }) - keyspaceLookupTable = kgm.buildKeyspaceLookupTable(groupID, newKeyspaces) + newKeyspaceLookupTable = kgm.buildKeyspaceLookupTable(groupID, newKeyspaces) } else if sameMembership { // The keyspace group membership is not changed, so we reuse the old one. - return defaultKeyspaceLookupTable + newKeyspaceLookupTable = oldKeyspaceLookupTable } else { // The keyspace group membership is changed, so we update the keyspace lookup table. // We haven't added the keyspace IDs which belong to both old and new groups, so add them. - if keyspaceLookupTable == nil { - keyspaceLookupTable = make(map[uint32]struct{}) - } - for i, j = 0, 0; i < oldLen && j < newLen; { - if oldKeyspaces[i] == newKeyspaces[j] { - keyspaceLookupTable[oldKeyspaces[i]] = struct{}{} + newKeyspaceLookupTable = make(map[uint32]struct{}) + for i, j = 0, 0; i < oldLen || j < newLen; { + if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] { + newKeyspaceLookupTable[newKeyspaces[j]] = struct{}{} i++ j++ - } else if oldKeyspaces[i] < newKeyspaces[j] { + } else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen { i++ } else { + newKeyspaceLookupTable[newKeyspaces[j]] = struct{}{} j++ } } } - return keyspaceLookupTable + return newKeyspaceLookupTable } func (kgm *KeyspaceGroupManager) buildKeyspaceLookupTable(groupID uint32, keyspaces []uint32) map[uint32]struct{} { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 9973f406533..d0c421678e4 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -570,7 +570,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestUpdateKeyspaceGroupMembership() var keyspaceLookupTable map[uint32]struct{} // Start from an empty keyspace group. - // Add keyspace 1 to the keyspace group. oldKeyspaces := []uint32{} newKeyspaces := []uint32{} defaultKeyspaceLookupTable := map[uint32]struct{}{} From 4a96a7d4171eedc5a0fc2bd4e4fdd5778441d865 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 10 Apr 2023 21:21:02 -0700 Subject: [PATCH 09/11] Update comments. Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index dded439b702..7bbb92926a4 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -534,10 +534,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( log.Warn("keyspace IDs are not sorted in ascending order, do a full update", zap.Uint32("keyspace-group-id", groupID)) + // Do a full update, because the keyspace IDs are not sorted in ascending order. for ; i < oldLen; i++ { kgm.keyspaceLookupTable.Delete(oldKeyspaces[i]) } - // Do a full update, because the keyspace IDs are not sorted in ascending order. // Sort the keyspaces in ascending order sort.Slice(newKeyspaces, func(i, j int) bool { return newKeyspaces[i] < newKeyspaces[j] @@ -548,7 +548,6 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( newKeyspaceLookupTable = oldKeyspaceLookupTable } else { // The keyspace group membership is changed, so we update the keyspace lookup table. - // We haven't added the keyspace IDs which belong to both old and new groups, so add them. newKeyspaceLookupTable = make(map[uint32]struct{}) for i, j = 0, 0; i < oldLen || j < newLen; { if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] { From 54aa71c78c5899bcfa0a3890928765c7802bb6ee Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 10 Apr 2023 22:35:16 -0700 Subject: [PATCH 10/11] Simlify the algorithm by performing sorting always. Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager.go | 54 ++++++++++--------------------- 1 file changed, 17 insertions(+), 37 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7bbb92926a4..af16955163c 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -501,63 +501,43 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( oldLen := len(oldKeyspaces) newLen := len(newKeyspaces) + // Sort the keyspaces in ascending order + sort.Slice(newKeyspaces, func(i, j int) bool { + return newKeyspaces[i] < newKeyspaces[j] + }) + // Mostly, the membership has no change, so we optimize for this case. sameMembership := true - i, j := 0, 0 - for i < oldLen || j < newLen { - // We assume that the keyspace IDs are sorted in ascending order. If not, we will - // break the loop and do a full update. - if i < oldLen && i > 0 && oldKeyspaces[i-1] >= oldKeyspaces[i] { - break - } - if j < newLen && j > 0 && newKeyspaces[j-1] >= newKeyspaces[j] { - break - } - - if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] { - i++ - j++ - } else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen { - sameMembership = false - kgm.keyspaceLookupTable.Delete(oldKeyspaces[i]) - i++ - } else { - sameMembership = false - kgm.keyspaceLookupTable.Store(newKeyspaces[j], groupID) - j++ + if oldLen != newLen { + sameMembership = false + } else { + for i := 0; i < oldLen; i++ { + if oldKeyspaces[i] != newKeyspaces[i] { + sameMembership = false + break + } } } var newKeyspaceLookupTable map[uint32]struct{} - if i < oldLen || j < newLen { - log.Warn("keyspace IDs are not sorted in ascending order, do a full update", - zap.Uint32("keyspace-group-id", groupID)) - - // Do a full update, because the keyspace IDs are not sorted in ascending order. - for ; i < oldLen; i++ { - kgm.keyspaceLookupTable.Delete(oldKeyspaces[i]) - } - // Sort the keyspaces in ascending order - sort.Slice(newKeyspaces, func(i, j int) bool { - return newKeyspaces[i] < newKeyspaces[j] - }) - newKeyspaceLookupTable = kgm.buildKeyspaceLookupTable(groupID, newKeyspaces) - } else if sameMembership { + if sameMembership { // The keyspace group membership is not changed, so we reuse the old one. newKeyspaceLookupTable = oldKeyspaceLookupTable } else { // The keyspace group membership is changed, so we update the keyspace lookup table. newKeyspaceLookupTable = make(map[uint32]struct{}) - for i, j = 0, 0; i < oldLen || j < newLen; { + for i, j := 0, 0; i < oldLen || j < newLen; { if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] { newKeyspaceLookupTable[newKeyspaces[j]] = struct{}{} i++ j++ } else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen { + kgm.keyspaceLookupTable.Delete(oldKeyspaces[i]) i++ } else { newKeyspaceLookupTable[newKeyspaces[j]] = struct{}{} + kgm.keyspaceLookupTable.Store(newKeyspaces[j], groupID) j++ } } From 0df4320d916fca8542180021d14ad0fce03b0bcf Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 10 Apr 2023 23:42:05 -0700 Subject: [PATCH 11/11] remove unnecessary comment Signed-off-by: Bin Shi --- pkg/tso/keyspace_group_manager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index af16955163c..95157f74b3a 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -601,7 +601,6 @@ func (kgm *KeyspaceGroupManager) GetAMWithMembershipCheck( } // GetElectionMember returns the election member of the given keyspace group -// TODO: support multiple keyspace groups for GetElectionMember func (kgm *KeyspaceGroupManager) GetElectionMember( keyspaceID, keyspaceGroupID uint32, ) (ElectionMember, error) { @@ -642,7 +641,7 @@ func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error { return nil } return errs.ErrKeyspaceGroupIDInvalid.FastGenByArgs( - fmt.Sprintf("%d which shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse)) + fmt.Sprintf("%d shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse)) } func (kgm *KeyspaceGroupManager) genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error {