Skip to content

Commit

Permalink
Pick some tso mcs changes (tikv#122)
Browse files Browse the repository at this point in the history
* Revert "mcs: pick some priority about member of keyspace group (tikv#120)"

This reverts commit e962b88.

* keyspace, apiv2: implement the keyspace group merging API (tikv#6594)

ref tikv#6589

Implement the keyspace group merging API.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* keyspace: prohibit merging the default keyspace group (tikv#6606)

ref tikv#6589

Prohibit merging the default keyspace group.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

* keyspace: add priority of tso node for the keyspace group (tikv#6602)

ref tikv#6599

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* tests: reduce unnecessary time.sleep in keyspace group (tikv#6632)

ref tikv#6599

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* tools: add merge commands for pd-ctl (tikv#6675)

ref tikv#6589

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* mcs, tso: fix expensive async forwardTSORequest() and its timeout mechanism. (tikv#6664)

ref tikv#6659

Fix expensive async forwardTSORequest() and its timeout mechanism.

In order to handle the timeout case for forwardStream send/recv, the existing logic is to create 
context.withTimeout(forwardCtx,...) for every request, then start a new goroutine "forwardTSORequest", 
which is very expensive as shown by the profiling in tikv#6659. 

This change create a watchDeadline routine per forward stream and reuse it for all the forward requests
in which forwardTSORequest is called synchronously. Compared to the existing logic, the new change
is much cheaper and the latency is much stable.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

* keyspace, apiv2: support to split keyspace group with the keyspace ID range (tikv#6646)

ref tikv#6232

Support to split keyspace group with the keyspace ID range.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* mcs: add log for finishing split (tikv#6656)

ref tikv#5895

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* tso: fix checkTSOSplit to finish split correctly (tikv#6652)

ref tikv#6232

Fix `checkTSOSplit` to finish split correctly.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* tests: fix TestTSOKeyspaceGroupSplitClient to avoid unexpected panic (tikv#6655)

close tikv#6634

Fix `TestTSOKeyspaceGroupSplitClient` to avoid unexpected panic

Signed-off-by: JmPotato <ghzpotato@gmail.com>

* tso, tests: implement the keyspace group merge checker (tikv#6625)

ref tikv#6589

Implement the keyspace group merge checker.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

* mcs, tso: support weighted-election for TSO keyspace group primary election (tikv#6617)

close tikv#6616

Add the tso server registry watch loop in tso's keyspace group manager.
re-distribute TSO keyspace group primaries according to their replica priorities

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

* Add keyspace group info in the timestamp fallback log in the client. (tikv#6654)

ref tikv#5895

Add keyspace group info in the timestamp fallback log in the client.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

---------

Signed-off-by: JmPotato <ghzpotato@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Co-authored-by: JmPotato <ghzpotato@gmail.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: lhy1024 <admin@liudos.us>
Co-authored-by: Bin Shi <39923490+binshi-bing@users.noreply.github.com>
  • Loading branch information
5 people authored Jun 26, 2023
1 parent 1671629 commit 40866fd
Show file tree
Hide file tree
Showing 30 changed files with 1,829 additions and 303 deletions.
4 changes: 2 additions & 2 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro
return err
}
c.tsoAllocators.Store(dcLocation, addr)
log.Info("[tso] switch dc tso allocator serving address",
log.Info("[tso] switch dc tso local allocator serving address",
zap.String("dc-location", dcLocation),
zap.String("new-address", addr),
zap.String("old-address", oldAddr))
Expand All @@ -227,7 +227,7 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro

func (c *tsoClient) updateTSOGlobalServAddr(addr string) error {
c.tsoAllocators.Store(globalDCLocation, addr)
log.Info("[tso] switch dc tso allocator serving address",
log.Info("[tso] switch dc tso global allocator serving address",
zap.String("dc-location", globalDCLocation),
zap.String("new-address", addr))
c.scheduleCheckTSODispatcher()
Expand Down
34 changes: 26 additions & 8 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type tsoDispatcher struct {
}

type lastTSO struct {
physical int64
logical int64
keyspaceGroupID uint32
physical int64
logical int64
}

const (
Expand Down Expand Up @@ -708,7 +709,7 @@ func (c *tsoClient) processRequests(

requests := tbc.getCollectedRequests()
count := int64(len(requests))
physical, logical, suffixBits, err := stream.processRequests(
respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests(
c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(),
dcLocation, requests, tbc.batchStartTime)
if err != nil {
Expand All @@ -717,33 +718,50 @@ func (c *tsoClient) processRequests(
}
// `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request.
firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits)
c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count)
c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count)
c.finishRequest(requests, physical, firstLogical, suffixBits, nil)
return nil
}

func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical int64, suffixBits uint32, count int64) {
func (c *tsoClient) compareAndSwapTS(
dcLocation string, respKeyspaceGroupID uint32,
physical, firstLogical int64, suffixBits uint32, count int64,
) {
largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits)
lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{
physical: physical,
keyspaceGroupID: respKeyspaceGroupID,
physical: physical,
// Save the largest logical part here
logical: largestLogical,
})
if !loaded {
return
}
lastTSOPointer := lastTSOInterface.(*lastTSO)
lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID
lastPhysical := lastTSOPointer.physical
lastLogical := lastTSOPointer.logical

if lastKeyspaceGroupID != respKeyspaceGroupID {
log.Info("[tso] keyspace group changed",
zap.String("dc-location", dcLocation),
zap.Uint32("old-group-id", lastKeyspaceGroupID),
zap.Uint32("new-group-id", respKeyspaceGroupID))
}

// The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical
// to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then
// all TSOs we get will be [6, 7, 8, 9, 10].
if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) {
panic(errors.Errorf(
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d",
"%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+
"last keyspace group: %d, keyspace in request: %d, "+
"keyspace group in request: %d, keyspace group in response: %d",
dcLocation, physical, firstLogical, lastPhysical, lastLogical,
c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID()))
lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(),
c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID))
}
lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID
lastTSOPointer.physical = physical
// Same as above, we save the largest logical part here.
lastTSOPointer.logical = largestLogical
Expand Down
8 changes: 5 additions & 3 deletions client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type tsoStream interface {
processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error)
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error)
}

type pdTSOStream struct {
Expand All @@ -111,7 +111,7 @@ type pdTSOStream struct {

func (s *pdTSOStream) processRequests(
clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &pdpb.TsoRequest{
Expand Down Expand Up @@ -149,6 +149,7 @@ func (s *pdTSOStream) processRequests(
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = defaultKeySpaceGroupID
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}
Expand All @@ -160,7 +161,7 @@ type tsoTSOStream struct {
func (s *tsoTSOStream) processRequests(
clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string,
requests []*tsoRequest, batchStartTime time.Time,
) (physical, logical int64, suffixBits uint32, err error) {
) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) {
start := time.Now()
count := int64(len(requests))
req := &tsopb.TsoRequest{
Expand Down Expand Up @@ -200,6 +201,7 @@ func (s *tsoTSOStream) processRequests(
}

ts := resp.GetTimestamp()
respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId()
physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits()
return
}
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,11 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceGroupIsMerging"]
error = '''
the keyspace group %d is merging
'''

["PD:tso:ErrKeyspaceGroupNotInitialized"]
error = '''
the keyspace group %d isn't initialized
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized"))
ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned"))
ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS"))
ErrKeyspaceGroupIsMerging = errors.Normalize("the keyspace group %d is merging", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIsMerging"))
)

// member errors
Expand Down
44 changes: 33 additions & 11 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ const (
UserKindKey = "user_kind"
// TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config.
TSOKeyspaceGroupIDKey = "tso_keyspace_group_id"
// keyspacePatrolBatchSize is the batch size for keyspace assignment patrol.
// the limit of etcd txn op is 128, keyspacePatrolBatchSize need to be less than it.
// maxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128.
// We use 120 here to leave some space for other operations.
// See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61
keyspacePatrolBatchSize = 120
maxEtcdTxnOps = 120
)

// Config is the interface for keyspace config.
Expand Down Expand Up @@ -652,7 +652,16 @@ func (manager *Manager) allocID() (uint32, error) {
}

// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID uint32) error {
if startKeyspaceID > manager.nextPatrolStartID {
manager.nextPatrolStartID = startKeyspaceID
}
if endKeyspaceID != 0 && endKeyspaceID < manager.nextPatrolStartID {
log.Info("[keyspace] end keyspace id is smaller than the next patrol start id, skip patrol",
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("next-patrol-start-id", manager.nextPatrolStartID))
return nil
}
var (
// Some statistics info.
start = time.Now()
Expand All @@ -670,7 +679,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
zap.Duration("cost", time.Since(start)),
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
)
Expand All @@ -689,7 +700,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, keyspacePatrolBatchSize)
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps)
if err != nil {
return err
}
Expand All @@ -699,9 +713,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
// If there are less than `keyspacePatrolBatchSize` keyspaces,
// we have reached the end of the keyspace list.
moreToPatrol = keyspaceNum == keyspacePatrolBatchSize
// If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// there is no need to patrol again.
moreToPatrol = keyspaceNum == maxEtcdTxnOps
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum)
Expand All @@ -715,6 +729,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if ks == nil {
continue
}
if endKeyspaceID != 0 && ks.Id > endKeyspaceID {
moreToPatrol = false
break
}
patrolledKeyspaceCount++
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
Expand All @@ -736,7 +754,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
zap.Uint32("keyspace-id", ks.Id), zap.Error(err))
Expand All @@ -748,7 +768,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Int("batch-size", maxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID), zap.Error(err))
return err
Expand Down
55 changes: 49 additions & 6 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
re.NotNil(defaultKeyspaceGroup)
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111))
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if the keyspace is attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
Expand All @@ -405,7 +405,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Expand All @@ -420,21 +420,64 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
err = suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
// Check if all the keyspaces are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Name: strconv.Itoa(i),
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
}
// Check if all the keyspaces are not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment with range [maxEtcdTxnOps/2, maxEtcdTxnOps/2+maxEtcdTxnOps+1]
// to make sure the range crossing the boundary of etcd transaction operation limit.
var (
startKeyspaceID = uint32(maxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + maxEtcdTxnOps + 1
)
err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID)
re.NoError(err)
// Check if only the keyspaces within the range are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < maxEtcdTxnOps*2+1; i++ {
keyspaceID := uint32(i)
if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID {
re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
} else {
re.NotContains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
}
}
}

// Benchmark the keyspace assignment patrol.
func BenchmarkPatrolKeyspaceAssignment1000(b *testing.B) {
benchmarkPatrolKeyspaceAssignmentN(1000, b)
Expand Down Expand Up @@ -471,7 +514,7 @@ func benchmarkPatrolKeyspaceAssignmentN(
// Benchmark the keyspace assignment patrol.
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := suite.manager.PatrolKeyspaceAssignment()
err := suite.manager.PatrolKeyspaceAssignment(0, 0)
re.NoError(err)
}
b.StopTimer()
Expand Down
Loading

0 comments on commit 40866fd

Please sign in to comment.