Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: batch get region size (#7252) #7693

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,8 +1453,8 @@ func (c *RaftCluster) checkStores() {
}
} else if c.IsPrepared() {
threshold := c.getThreshold(stores, store)
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold))
regionSize := float64(store.GetRegionSize())
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize))
if regionSize >= threshold {
if err := c.ReadyToServe(storeID); err != nil {
log.Error("change store to serving failed",
Expand Down
31 changes: 28 additions & 3 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.uber.org/zap"
)

var scanRegionLimit = 1000

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
Stores struct {
Expand Down Expand Up @@ -384,9 +386,32 @@ func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo {

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 {
bc.Regions.mu.RLock()
defer bc.Regions.mu.RUnlock()
return bc.Regions.GetRegionSizeByRange(startKey, endKey)
var size int64
for {
bc.Regions.mu.RLock()
var cnt int
bc.Regions.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
if cnt >= scanRegionLimit {
return false
}
cnt++
startKey = region.GetEndKey()
size += region.GetApproximateSize()
return true
})
bc.Regions.mu.RUnlock()
if cnt == 0 {
break
}
if len(startKey) == 0 {
break
}
}

return size
}

func (bc *BasicCluster) getWriteRate(
Expand Down
13 changes: 0 additions & 13 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,19 +1191,6 @@ func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(regio
r.tree.scanRange(startKey, iterator)
}

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
var size int64
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
size += region.GetApproximateSize()
return true
})
return size
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) {
p, n := r.tree.getAdjacentRegions(region)
Expand Down
113 changes: 113 additions & 0 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"strconv"
"strings"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/server/id"
)
Expand Down Expand Up @@ -662,6 +664,117 @@ func BenchmarkRandomRegion(b *testing.B) {
}
}

func BenchmarkRandomSetRegion(b *testing.B) {
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
item.approximateSize = int64(20)
cluster.Regions.SetRegion(item)
}
}

func TestGetRegionSizeByRange(t *testing.T) {
cluster := NewBasicCluster()
nums := 1000010
for i := 0; i < nums; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
endKey := []byte(fmt.Sprintf("%20d", i+1))
if i == nums-1 {
endKey = []byte("")
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: endKey,
}, peer, SetApproximateSize(10))
cluster.Regions.SetRegion(region)
}
totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(""))
require.Equal(t, int64(nums*10), totalSize)
for i := 1; i < 10; i++ {
verifyNum := nums / i
endKey := fmt.Sprintf("%20d", verifyNum)
totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(endKey))
require.Equal(t, int64(verifyNum*10), totalSize)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) {
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer, SetApproximateSize(10))
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
cluster.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
cluster.Regions.SetRegion(item)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) {
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
cluster.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()

b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
item := items[rand.Intn(len(items))]
n := item.Clone(SetApproximateSize(20))
cluster.Regions.SetRegion(n)
}
},
)
}

const keyLength = 100

func randomBytes(n int) []byte {
Expand Down
6 changes: 3 additions & 3 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
pdctl.MustPutStore(c, leader.GetServer(), store)
}
for i := 0; i < 100; i++ {
pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10))
}
// no store preparing
output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound)
Expand All @@ -713,8 +713,8 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
c.Assert(p.LeftSeconds, Equals, math.MaxFloat64)

// update size
pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40))
pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40))
time.Sleep(2 * time.Second)
output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK)
c.Assert(json.Unmarshal(output, &p), IsNil)
Expand Down
Loading