Skip to content

Commit

Permalink
reduce get overlap operations
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 21, 2022
1 parent 1a485f7 commit 2519bab
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 46 deletions.
8 changes: 8 additions & 0 deletions pkg/rangetree/range_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func NewRangeTree(degree int, factory DebrisFactory) *RangeTree {
}
}

func (r *RangeTree) Factory(startKey, endKey []byte, old RangeItem) []RangeItem {
return r.factory(startKey, endKey, old)
}

// Update insert the item and delete overlaps.
func (r *RangeTree) Update(item RangeItem) []RangeItem {
overlaps := r.GetOverlaps(item)
Expand All @@ -62,6 +66,10 @@ func (r *RangeTree) Update(item RangeItem) []RangeItem {
return overlaps
}

func (r *RangeTree) ReplaceOrInsert(item RangeItem) {
r.tree.ReplaceOrInsert(item)
}

// GetOverlaps returns the range items that has some intersections with the given items.
func (r *RangeTree) GetOverlaps(item RangeItem) []RangeItem {
// note that Find() gets the last item that is less or equal than the item.
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
origin, err := c.core.PreCheckPutRegion(region)
origin, _, err := c.core.PreCheckPutRegion(region)
if err != nil {
return err
}
Expand Down Expand Up @@ -812,7 +812,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if _, err := c.core.PreCheckPutRegion(region); err != nil {
if _, _, err := c.core.PreCheckPutRegion(region); err != nil {
c.Unlock()
return err
}
Expand Down
10 changes: 5 additions & 5 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,7 +1658,7 @@ func Test(t *testing.T) {
re.Nil(cache.GetRegionByKey(regionKey))
checkRegions(re, cache, regions[0:i])

cache.SetRegion(region)
cache.SetRegion(region, false)
checkRegion(re, cache.GetRegion(i), region)
checkRegion(re, cache.GetRegionByKey(regionKey), region)
checkRegions(re, cache, regions[0:(i+1)])
Expand All @@ -1671,7 +1671,7 @@ func Test(t *testing.T) {
// Update leader to peer np-1.
newRegion := region.Clone(core.WithLeader(region.GetPeers()[np-1]))
regions[i] = newRegion
cache.SetRegion(newRegion)
cache.SetRegion(newRegion, false)
checkRegion(re, cache.GetRegion(i), newRegion)
checkRegion(re, cache.GetRegionByKey(regionKey), newRegion)
checkRegions(re, cache, regions[0:(i+1)])
Expand All @@ -1684,7 +1684,7 @@ func Test(t *testing.T) {
// Reset leader to peer 0.
newRegion = region.Clone(core.WithLeader(region.GetPeers()[0]))
regions[i] = newRegion
cache.SetRegion(newRegion)
cache.SetRegion(newRegion, false)
checkRegion(re, cache.GetRegion(i), newRegion)
checkRegions(re, cache, regions[0:(i+1)])
checkRegion(re, cache.GetRegionByKey(regionKey), newRegion)
Expand All @@ -1705,7 +1705,7 @@ func Test(t *testing.T) {
// check overlaps
// clone it otherwise there are two items with the same key in the tree
overlapRegion := regions[n-1].Clone(core.WithStartKey(regions[n-2].GetStartKey()))
cache.SetRegion(overlapRegion)
cache.SetRegion(overlapRegion, false)
re.Nil(cache.GetRegion(n - 2))
re.NotNil(cache.GetRegion(n - 1))

Expand All @@ -1714,7 +1714,7 @@ func Test(t *testing.T) {
for j := 0; j < cache.GetStoreLeaderCount(i); j++ {
region := filter.SelectOneRegion(tc.RandLeaderRegions(i, []core.KeyRange{core.NewKeyRange("", "")}), nil, pendingFilter, downFilter)
newRegion := region.Clone(core.WithPendingPeers(region.GetPeers()))
cache.SetRegion(newRegion)
cache.SetRegion(newRegion, false)
}
re.Nil(filter.SelectOneRegion(tc.RandLeaderRegions(i, []core.KeyRange{core.NewKeyRange("", "")}), nil, pendingFilter, downFilter))
}
Expand Down
23 changes: 15 additions & 8 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,16 +363,16 @@ func isRegionRecreated(region *RegionInfo) bool {
}

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, []*RegionInfo, error) {
origin, overlaps := bc.getRelevantRegions(region)
for _, item := range overlaps {
// PD ignores stale regions' heartbeats, unless it is recreated recently by unsafe recover operation.
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() && !isRegionRecreated(region) {
return nil, errRegionIsStale(region.GetMeta(), item.GetMeta())
return nil, nil, errRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
if origin == nil {
return nil, nil
return nil, nil, nil
}

r := region.GetRegionEpoch()
Expand All @@ -381,17 +381,24 @@ func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, erro
isTermBehind := region.GetTerm() > 0 && region.GetTerm() < origin.GetTerm()
// Region meta is stale, return an error.
if (isTermBehind || r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer()) && !isRegionRecreated(region) {
return origin, errRegionIsStale(region.GetMeta(), origin.GetMeta())
return origin, nil, errRegionIsStale(region.GetMeta(), origin.GetMeta())
}

return origin, nil
return origin, overlaps, nil
}

// PutRegion put a region.
func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo {
bc.Lock()
defer bc.Unlock()
return bc.Regions.SetRegion(region)
return bc.Regions.SetRegion(region, false)
}

// PutRegionWithOverlaps put a region with given overlaps.
func (bc *BasicCluster) PutRegionWithOverlaps(region *RegionInfo, overlaps []*RegionInfo) []*RegionInfo {
bc.Lock()
defer bc.Unlock()
return bc.Regions.SetRegion(region, true)
}

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
Expand All @@ -403,13 +410,13 @@ func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 {

// CheckAndPutRegion checks if the region is valid to put, if valid then put.
func (bc *BasicCluster) CheckAndPutRegion(region *RegionInfo) []*RegionInfo {
origin, err := bc.PreCheckPutRegion(region)
origin, overlaps, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
// return the state region to delete.
return []*RegionInfo{region}
}
return bc.PutRegion(region)
return bc.PutRegionWithOverlaps(region, overlaps)
}

// RemoveRegionIfExist removes RegionInfo from regionTree and regionMap if exists.
Expand Down
13 changes: 10 additions & 3 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func (r *RegionsInfo) GetRegion(regionID uint64) *RegionInfo {

// SetRegion sets the RegionInfo to regionTree and regionMap, also update leaders and followers by region peers
// overlaps: Other regions that overlap with the specified region, excluding itself.
func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) {
func (r *RegionsInfo) SetRegion(region *RegionInfo, passCheck bool, ol ...*RegionInfo) (overlaps []*RegionInfo) {
var item *regionItem // Pointer to the *RegionInfo of this ID.
rangeChanged := true // This Region is new, or its range has changed.
if item = r.regions.Get(region.GetID()); item != nil {
Expand Down Expand Up @@ -765,8 +765,15 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) (overlaps []*RegionInfo) {
}

if rangeChanged {
// It has been removed and all information needs to be updated again.
overlaps = r.tree.update(item)
if passCheck {
if len(ol) != 0 {
overlaps = r.tree.updateWithOverlaps(item, ol)
} else {
r.tree.tree.ReplaceOrInsert(item)
}
} else {
overlaps = r.tree.update(item)
}
for _, old := range overlaps {
r.RemoveRegion(r.GetRegion(old.GetID()))
}
Expand Down
16 changes: 8 additions & 8 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func TestSetRegion(t *testing.T) {
StartKey: []byte(fmt.Sprintf("%20d", i*10)),
EndKey: []byte(fmt.Sprintf("%20d", (i+1)*10)),
}, peer1)
regions.SetRegion(region)
regions.SetRegion(region, false)
}

peer1 := &metapb.Peer{StoreId: uint64(4), Id: uint64(101)}
Expand All @@ -479,12 +479,12 @@ func TestSetRegion(t *testing.T) {
EndKey: []byte(fmt.Sprintf("%20d", 211)),
}, peer1)
region.pendingPeers = append(region.pendingPeers, peer3)
regions.SetRegion(region)
regions.SetRegion(region, false)
checkRegions(re, regions)
re.Equal(97, regions.tree.length())
re.Len(regions.GetRegions(), 97)

regions.SetRegion(region)
regions.SetRegion(region, false)
peer1 = &metapb.Peer{StoreId: uint64(2), Id: uint64(101)}
peer2 = &metapb.Peer{StoreId: uint64(3), Id: uint64(102), Role: metapb.PeerRole_Learner}
peer3 = &metapb.Peer{StoreId: uint64(1), Id: uint64(103)}
Expand All @@ -495,7 +495,7 @@ func TestSetRegion(t *testing.T) {
EndKey: []byte(fmt.Sprintf("%20d", 212)),
}, peer1)
region.pendingPeers = append(region.pendingPeers, peer3)
regions.SetRegion(region)
regions.SetRegion(region, false)
checkRegions(re, regions)
re.Equal(97, regions.tree.length())
re.Len(regions.GetRegions(), 97)
Expand All @@ -504,7 +504,7 @@ func TestSetRegion(t *testing.T) {
region = region.Clone(WithStartKey([]byte(fmt.Sprintf("%20d", 175))), WithNewRegionID(201))
re.NotNil(regions.GetRegion(21))
re.NotNil(regions.GetRegion(18))
regions.SetRegion(region)
regions.SetRegion(region, false)
checkRegions(re, regions)
re.Equal(96, regions.tree.length())
re.Len(regions.GetRegions(), 96)
Expand All @@ -519,7 +519,7 @@ func TestSetRegion(t *testing.T) {
SetWrittenBytes(40),
SetWrittenKeys(10),
SetReportInterval(5))
regions.SetRegion(region)
regions.SetRegion(region, false)
checkRegions(re, regions)
re.Equal(96, regions.tree.length())
re.Len(regions.GetRegions(), 96)
Expand Down Expand Up @@ -630,7 +630,7 @@ func BenchmarkRandomRegion(b *testing.B) {
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
regions.SetRegion(region)
regions.SetRegion(region, false)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -683,6 +683,6 @@ func BenchmarkAddRegion(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
regions.SetRegion(items[i])
regions.SetRegion(items[i], false)
}
}
35 changes: 35 additions & 0 deletions server/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,41 @@ func (t *regionTree) update(item *regionItem) []*RegionInfo {
return result
}

// updateWithOverlaps updates the tree with the region with pre obtained overlaps.
func (t *regionTree) updateWithOverlaps(item *regionItem, overlaps []*RegionInfo) []*RegionInfo {
region := item.region
t.totalSize += region.approximateSize
regionWriteBytesRate, regionWriteKeysRate := region.GetWriteRate()
t.totalWriteBytesRate += regionWriteBytesRate
t.totalWriteKeysRate += regionWriteKeysRate

result := make([]*RegionInfo, len(overlaps))
for i, old := range overlaps {
o := &regionItem{old}
t.tree.Remove(o)
children := t.tree.Factory(item.GetStartKey(), item.GetEndKey(), o)
for _, child := range children {
if c := bytes.Compare(child.GetStartKey(), child.GetEndKey()); c < 0 {
t.tree.ReplaceOrInsert(child)
} else if c > 0 && len(child.GetEndKey()) == 0 {
t.tree.ReplaceOrInsert(child)
}
}
t.tree.ReplaceOrInsert(item)
result[i] = old
log.Debug("overlapping region",
zap.Uint64("region-id", old.GetID()),
logutil.ZapRedactStringer("delete-region", RegionToHexMeta(old.GetMeta())),
logutil.ZapRedactStringer("update-region", RegionToHexMeta(region.GetMeta())))
t.totalSize -= old.approximateSize
regionWriteBytesRate, regionWriteKeysRate = old.GetWriteRate()
t.totalWriteBytesRate -= regionWriteBytesRate
t.totalWriteKeysRate -= regionWriteKeysRate
}

return result
}

// updateStat is used to update statistics when regionItem.region is directly replaced.
func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) {
t.totalSize += region.approximateSize
Expand Down
2 changes: 1 addition & 1 deletion server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
region = core.NewRegionInfo(r, regionLeader, core.SetFromHeartbeat(false))
}

origin, err := bc.PreCheckPutRegion(region)
origin, _, err := bc.PreCheckPutRegion(region)
if err != nil {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/range_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type RangeCluster struct {
func GenRangeCluster(cluster Cluster, startKey, endKey []byte) *RangeCluster {
subCluster := core.NewBasicCluster()
for _, r := range cluster.ScanRegions(startKey, endKey, -1) {
subCluster.Regions.SetRegion(r)
subCluster.Regions.SetRegion(r, false)
}
return &RangeCluster{
Cluster: cluster,
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func TestScatterRangeBalance(t *testing.T) {
core.SetApproximateKeys(1),
core.SetApproximateSize(1),
)
tc.Regions.SetRegion(regionInfo)
tc.Regions.SetRegion(regionInfo, false)
}
for i := 0; i < 100; i++ {
_, err := tc.AllocPeer(1)
Expand Down Expand Up @@ -1371,7 +1371,7 @@ func TestBalanceLeaderLimit(t *testing.T) {
core.SetApproximateSize(96),
)

tc.Regions.SetRegion(regionInfo)
tc.Regions.SetRegion(regionInfo, false)
}

for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -1481,7 +1481,7 @@ func TestBalanceWhenRegionNotHeartbeat(t *testing.T) {
core.SetApproximateSize(96),
)

tc.Regions.SetRegion(regionInfo)
tc.Regions.SetRegion(regionInfo, false)
}

for i := 1; i <= 3; i++ {
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestRejectLeader(t *testing.T) {
break
}
}
tc.Regions.SetRegion(region)
tc.Regions.SetRegion(region, false)
ops, _ = sl.Schedule(tc, false)
testutil.CheckTransferLeader(re, ops[0], operator.OpLeader, 1, 2)
}
Expand Down
Loading

0 comments on commit 2519bab

Please sign in to comment.