Skip to content

Commit

Permalink
Fix data race between read APIs and finshiMe:wq keyspace group manager
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 30, 2023
1 parent fa721e7 commit 5605eca
Showing 1 changed file with 60 additions and 38 deletions.
98 changes: 60 additions & 38 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,39 @@ func (s *state) getKeyspaceGroupMeta(
return s.ams[groupID], s.kgs[groupID]
}

func (s *state) checkTSOSplit(
targetGroupID uint32,
) (splitTargetAM, splitSourceAM *AllocatorManager, err error) {
s.RLock()
defer s.RUnlock()
splitTargetAM, splitTargetGroup := s.ams[targetGroupID], s.kgs[targetGroupID]
// Only the split target keyspace group needs to check the TSO split.
if !splitTargetGroup.IsSplitTarget() {
return nil, nil, nil // it isn't in the split state
}
sourceGroupID := splitTargetGroup.SplitSource()
splitSourceAM, splitSourceGroup := s.ams[sourceGroupID], s.kgs[sourceGroupID]
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", sourceGroupID))
return nil, nil, errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(sourceGroupID)
}
return splitTargetAM, splitSourceAM, nil
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (s *state) checkTSOMerge(
groupID uint32,
) error {
s.RLock()
defer s.RUnlock()
if s.kgs[groupID] == nil || !s.kgs[groupID].IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(groupID)
}

// getKeyspaceGroupMetaWithCheck returns the keyspace group meta of the given keyspace.
// It also checks if the keyspace is served by the given keyspace group. If not, it returns the meta
// of the keyspace group to which the keyspace currently belongs and returns NotServed (by the given
Expand Down Expand Up @@ -957,7 +990,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
err = kgm.checkTSOMerge(curKeyspaceGroupID)
err = kgm.state.checkTSOMerge(curKeyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
Expand Down Expand Up @@ -1032,27 +1065,19 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
keyspaceGroupID uint32,
dcLocation string,
) error {
splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
// Only the split target keyspace group needs to check the TSO split.
if !splitGroup.IsSplitTarget() {
return nil
}
splitSource := splitGroup.SplitSource()
splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", splitSource))
return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource)
splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID)
if err != nil || splitTargetAM == nil {
return err
}
splitAllocator, err := splitAM.GetAllocator(dcLocation)
splitTargetAllocator, err := splitTargetAM.GetAllocator(dcLocation)
if err != nil {
return err
}
splitSourceAllocator, err := splitSourceAM.GetAllocator(dcLocation)
if err != nil {
return err
}
splitTSO, err := splitAllocator.GenerateTSO(1)
splitTargetTSO, err := splitTargetAllocator.GenerateTSO(1)
if err != nil {
return err
}
Expand All @@ -1061,19 +1086,19 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
return err
}
// If the split source TSO is not greater than the newly split TSO, we don't need to do anything.
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 {
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTargetTSO) <= 0 {
log.Info("the split source tso is less than the newly split tso",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical))
zap.Int64("split-tso-physical", splitTargetTSO.Physical),
zap.Int64("split-tso-logical", splitTargetTSO.Logical))
// Finish the split state directly.
return kgm.finishSplitKeyspaceGroup(keyspaceGroupID)
}
// If the split source TSO is greater than the newly split TSO, we need to update the split
// TSO to make sure the following TSO will be greater than the split keyspaces ever had
// in the past.
err = splitAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{
err = splitTargetAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{
Physical: splitSourceTSO.Physical + 1,
Logical: splitSourceTSO.Logical,
}), true, true)
Expand All @@ -1083,8 +1108,8 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
log.Info("the split source tso is greater than the newly split tso",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical))
zap.Int64("split-tso-physical", splitTargetTSO.Physical),
zap.Int64("split-tso-logical", splitTargetTSO.Logical))
// Finish the split state.
return kgm.finishSplitKeyspaceGroup(keyspaceGroupID)
}
Expand Down Expand Up @@ -1116,9 +1141,13 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
splitGroup.SplitState = nil
kgm.kgs[id] = splitGroup
// Pre-update the split keyspace group's split state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newSplitGroup := *splitGroup
newSplitGroup.SplitState = nil
kgm.kgs[id] = &newSplitGroup
return nil
}

Expand Down Expand Up @@ -1146,9 +1175,14 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
mergeTarget.MergeState = nil
kgm.kgs[id] = mergeTarget

// Pre-update the merge target keyspace group's merge state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newTargetGroup := *mergeTarget
newTargetGroup.MergeState = nil
kgm.kgs[id] = &newTargetGroup
return nil
}

Expand Down Expand Up @@ -1286,15 +1320,3 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
return
}
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (kgm *KeyspaceGroupManager) checkTSOMerge(
keyspaceGroupID uint32,
) error {
_, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
if !group.IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID)
}

0 comments on commit 5605eca

Please sign in to comment.