Skip to content

Commit

Permalink
core: batch get region size (#7252) (#7332)
Browse files Browse the repository at this point in the history
close #7248

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: nolouch <nolouch@gmail.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 8, 2023
1 parent 710ffcd commit da1e92d
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 13 deletions.
36 changes: 27 additions & 9 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ import (
"go.uber.org/zap"
)

const randomRegionMaxRetry = 10
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
)

// errRegionIsStale is error info for region is stale.
func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
Expand Down Expand Up @@ -1610,16 +1613,31 @@ func (r *RegionsInfo) ScanRegionWithIterator(startKey []byte, iterator func(regi

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
r.t.RLock()
defer r.t.RUnlock()
var size int64
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
for {
r.t.RLock()
var cnt int
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
if cnt >= scanRegionLimit {
return false
}
cnt++
startKey = region.GetEndKey()
size += region.GetApproximateSize()
return true
})
r.t.RUnlock()
if cnt == 0 {
break
}
size += region.GetApproximateSize()
return true
})
if len(startKey) == 0 {
break
}
}

return size
}

Expand Down
120 changes: 120 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"crypto/rand"
"fmt"
"math"
mrand "math/rand"
"strconv"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -658,6 +660,124 @@ func BenchmarkRandomRegion(b *testing.B) {
}
}

func BenchmarkRandomSetRegion(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
item.approximateSize = int64(20)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
}

func TestGetRegionSizeByRange(t *testing.T) {
regions := NewRegionsInfo()
nums := 1000010
for i := 0; i < nums; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
endKey := []byte(fmt.Sprintf("%20d", i+1))
if i == nums-1 {
endKey = []byte("")
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: endKey,
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
}
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(""))
require.Equal(t, int64(nums*10), totalSize)
for i := 1; i < 10; i++ {
verifyNum := nums / i
endKey := fmt.Sprintf("%20d", verifyNum)
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey))
require.Equal(t, int64(verifyNum*10), totalSize)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) {
regions := NewRegionsInfo()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()

b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
item := items[mrand.Intn(len(items))]
n := item.Clone(SetApproximateSize(20))
origin, overlaps, rangeChanged := regions.SetRegion(n)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
}
},
)
}

const keyLength = 100

func randomBytes(n int) []byte {
Expand Down
3 changes: 2 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,12 +1841,13 @@ func (c *RaftCluster) checkStores() {
if err := c.ReadyToServe(storeID); err != nil {
log.Error("change store to serving failed",
zap.Stringer("store", store.GetMeta()),
zap.Int("region-count", c.GetTotalRegionCount()),
errs.ZapError(err))
}
} else if c.IsPrepared() {
threshold := c.getThreshold(stores, store)
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold))
regionSize := float64(store.GetRegionSize())
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize))
if regionSize >= threshold {
if err := c.ReadyToServe(storeID); err != nil {
log.Error("change store to serving failed",
Expand Down
6 changes: 3 additions & 3 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ func TestPreparingProgress(t *testing.T) {
tests.MustPutStore(re, cluster, store)
}
for i := 0; i < 100; i++ {
tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10))
tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10))
}
// no store preparing
output := sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound)
Expand All @@ -941,8 +941,8 @@ func TestPreparingProgress(t *testing.T) {
re.Equal(math.MaxFloat64, p.LeftSeconds)

// update size
tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10))
tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40))
tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10))
tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40))
time.Sleep(2 * time.Second)
output = sendRequest(re, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK)
re.NoError(json.Unmarshal(output, &p))
Expand Down

0 comments on commit da1e92d

Please sign in to comment.