From b5e9460ae1bbc169d38fc7cd69a7dc6316b8b5af Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 8 Nov 2023 11:18:42 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #7252 close tikv/pd#7248 Signed-off-by: ti-chi-bot --- server/cluster/cluster.go | 3 +- server/core/region.go | 37 +++++++++-- server/core/region_test.go | 123 +++++++++++++++++++++++++++++++++++ tests/server/api/api_test.go | 9 +++ 4 files changed, 165 insertions(+), 7 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 24eb2ad7047..5df8b739897 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1447,12 +1447,13 @@ func (c *RaftCluster) checkStores() { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", zap.Stringer("store", store.GetMeta()), + zap.Int("region-count", c.GetTotalRegionCount()), errs.ZapError(err)) } } else if c.IsPrepared() { threshold := c.getThreshold(stores, store) - log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold)) regionSize := float64(store.GetRegionSize()) + log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize)) if regionSize >= threshold { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", diff --git a/server/core/region.go b/server/core/region.go index e43ddd032ad..34e66bb915a 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -36,6 +36,14 @@ import ( "go.uber.org/zap" ) +<<<<<<< HEAD:server/core/region.go +======= +const ( + randomRegionMaxRetry = 10 + scanRegionLimit = 1000 +) + +>>>>>>> d651c6b91 (core: batch get region size (#7252)):pkg/core/region.go // errRegionIsStale is error info for region is stale. func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error { return errors.Errorf("region is stale: region %v origin %v", region, origin) @@ -1194,13 +1202,30 @@ func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(regio // GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { var size int64 - r.tree.scanRange(startKey, func(region *RegionInfo) bool { - if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { - return false + for { + r.t.RLock() + var cnt int + r.tree.scanRange(startKey, func(region *RegionInfo) bool { + if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { + return false + } + if cnt >= scanRegionLimit { + return false + } + cnt++ + startKey = region.GetEndKey() + size += region.GetApproximateSize() + return true + }) + r.t.RUnlock() + if cnt == 0 { + break } - size += region.GetApproximateSize() - return true - }) + if len(startKey) == 0 { + break + } + } + return size } diff --git a/server/core/region_test.go b/server/core/region_test.go index c1ed83b7f46..2e2a1b611d0 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -17,10 +17,15 @@ package core import ( "fmt" "math" +<<<<<<< HEAD:server/core/region_test.go "math/rand" +======= + mrand "math/rand" +>>>>>>> d651c6b91 (core: batch get region size (#7252)):pkg/core/region_test.go "strconv" "strings" "testing" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -662,6 +667,124 @@ func BenchmarkRandomRegion(b *testing.B) { } } +func BenchmarkRandomSetRegion(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + item.approximateSize = int64(20) + origin, overlaps, rangeChanged := regions.SetRegion(item) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } +} + +func TestGetRegionSizeByRange(t *testing.T) { + regions := NewRegionsInfo() + nums := 1000010 + for i := 0; i < nums; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + endKey := []byte(fmt.Sprintf("%20d", i+1)) + if i == nums-1 { + endKey = []byte("") + } + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: endKey, + }, peer, SetApproximateSize(10)) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + } + totalSize := regions.GetRegionSizeByRange([]byte(""), []byte("")) + require.Equal(t, int64(nums*10), totalSize) + for i := 1; i < 10; i++ { + verifyNum := nums / i + endKey := fmt.Sprintf("%20d", verifyNum) + totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey)) + require.Equal(t, int64(verifyNum*10), totalSize) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer, SetApproximateSize(10)) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + regions.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + for i := 0; i < b.N; i++ { + item := items[i%len(items)] + item.approximateKeys = int64(200000) + origin, overlaps, rangeChanged := regions.SetRegion(item) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } +} + +func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { + regions := NewRegionsInfo() + var items []*RegionInfo + for i := 0; i < 1000000; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + items = append(items, region) + } + b.ResetTimer() + go func() { + for { + regions.GetRegionSizeByRange([]byte(""), []byte("")) + time.Sleep(time.Millisecond) + } + }() + + b.RunParallel( + func(pb *testing.PB) { + for pb.Next() { + item := items[mrand.Intn(len(items))] + n := item.Clone(SetApproximateSize(20)) + origin, overlaps, rangeChanged := regions.SetRegion(n) + regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + } + }, + ) +} + const keyLength = 100 func randomBytes(n int) []byte { diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 8102532102f..526461316a5 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -686,7 +686,11 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { pdctl.MustPutStore(c, leader.GetServer(), store) } for i := 0; i < 100; i++ { +<<<<<<< HEAD pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) +======= + tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10)) +>>>>>>> d651c6b91 (core: batch get region size (#7252)) } // no store preparing output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) @@ -713,8 +717,13 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { c.Assert(p.LeftSeconds, Equals, math.MaxFloat64) // update size +<<<<<<< HEAD pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) +======= + tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10)) + tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40)) +>>>>>>> d651c6b91 (core: batch get region size (#7252)) time.Sleep(2 * time.Second) output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) c.Assert(json.Unmarshal(output, &p), IsNil) From 61253b9cc791eb57157b18a666e3d89255ac2aa2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 6 Feb 2024 12:12:05 +0800 Subject: [PATCH 2/2] resolve the conflicts Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 1 - server/core/basic_cluster.go | 31 ++++++++++++++++++++++--- server/core/region.go | 38 ------------------------------- server/core/region_test.go | 44 ++++++++++++++---------------------- tests/server/api/api_test.go | 15 +++--------- 5 files changed, 48 insertions(+), 81 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 5df8b739897..fa8e6c642b6 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1447,7 +1447,6 @@ func (c *RaftCluster) checkStores() { if err := c.ReadyToServe(storeID); err != nil { log.Error("change store to serving failed", zap.Stringer("store", store.GetMeta()), - zap.Int("region-count", c.GetTotalRegionCount()), errs.ZapError(err)) } } else if c.IsPrepared() { diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index eee97c11d11..d82852c6bca 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -26,6 +26,8 @@ import ( "go.uber.org/zap" ) +var scanRegionLimit = 1000 + // BasicCluster provides basic data member and interface for a tikv cluster. type BasicCluster struct { Stores struct { @@ -384,9 +386,32 @@ func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo { // GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 { - bc.Regions.mu.RLock() - defer bc.Regions.mu.RUnlock() - return bc.Regions.GetRegionSizeByRange(startKey, endKey) + var size int64 + for { + bc.Regions.mu.RLock() + var cnt int + bc.Regions.tree.scanRange(startKey, func(region *RegionInfo) bool { + if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { + return false + } + if cnt >= scanRegionLimit { + return false + } + cnt++ + startKey = region.GetEndKey() + size += region.GetApproximateSize() + return true + }) + bc.Regions.mu.RUnlock() + if cnt == 0 { + break + } + if len(startKey) == 0 { + break + } + } + + return size } func (bc *BasicCluster) getWriteRate( diff --git a/server/core/region.go b/server/core/region.go index 34e66bb915a..f97c6aea7e6 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -36,14 +36,6 @@ import ( "go.uber.org/zap" ) -<<<<<<< HEAD:server/core/region.go -======= -const ( - randomRegionMaxRetry = 10 - scanRegionLimit = 1000 -) - ->>>>>>> d651c6b91 (core: batch get region size (#7252)):pkg/core/region.go // errRegionIsStale is error info for region is stale. func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error { return errors.Errorf("region is stale: region %v origin %v", region, origin) @@ -1199,36 +1191,6 @@ func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(regio r.tree.scanRange(startKey, iterator) } -// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range. -func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 { - var size int64 - for { - r.t.RLock() - var cnt int - r.tree.scanRange(startKey, func(region *RegionInfo) bool { - if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 { - return false - } - if cnt >= scanRegionLimit { - return false - } - cnt++ - startKey = region.GetEndKey() - size += region.GetApproximateSize() - return true - }) - r.t.RUnlock() - if cnt == 0 { - break - } - if len(startKey) == 0 { - break - } - } - - return size -} - // GetAdjacentRegions returns region's info that is adjacent with specific region func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) { p, n := r.tree.getAdjacentRegions(region) diff --git a/server/core/region_test.go b/server/core/region_test.go index 2e2a1b611d0..8717b5d6be3 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -17,11 +17,7 @@ package core import ( "fmt" "math" -<<<<<<< HEAD:server/core/region_test.go "math/rand" -======= - mrand "math/rand" ->>>>>>> d651c6b91 (core: batch get region size (#7252)):pkg/core/region_test.go "strconv" "strings" "testing" @@ -30,6 +26,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/server/id" ) @@ -668,7 +665,7 @@ func BenchmarkRandomRegion(b *testing.B) { } func BenchmarkRandomSetRegion(b *testing.B) { - regions := NewRegionsInfo() + cluster := NewBasicCluster() var items []*RegionInfo for i := 0; i < 1000000; i++ { peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} @@ -678,8 +675,7 @@ func BenchmarkRandomSetRegion(b *testing.B) { StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: []byte(fmt.Sprintf("%20d", i+1)), }, peer) - origin, overlaps, rangeChanged := regions.SetRegion(region) - regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + cluster.Regions.SetRegion(region) items = append(items, region) } b.ResetTimer() @@ -687,13 +683,12 @@ func BenchmarkRandomSetRegion(b *testing.B) { item := items[i%len(items)] item.approximateKeys = int64(200000) item.approximateSize = int64(20) - origin, overlaps, rangeChanged := regions.SetRegion(item) - regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + cluster.Regions.SetRegion(item) } } func TestGetRegionSizeByRange(t *testing.T) { - regions := NewRegionsInfo() + cluster := NewBasicCluster() nums := 1000010 for i := 0; i < nums; i++ { peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} @@ -707,21 +702,20 @@ func TestGetRegionSizeByRange(t *testing.T) { StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: endKey, }, peer, SetApproximateSize(10)) - origin, overlaps, rangeChanged := regions.SetRegion(region) - regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + cluster.Regions.SetRegion(region) } - totalSize := regions.GetRegionSizeByRange([]byte(""), []byte("")) + totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte("")) require.Equal(t, int64(nums*10), totalSize) for i := 1; i < 10; i++ { verifyNum := nums / i endKey := fmt.Sprintf("%20d", verifyNum) - totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey)) + totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(endKey)) require.Equal(t, int64(verifyNum*10), totalSize) } } func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { - regions := NewRegionsInfo() + cluster := NewBasicCluster() var items []*RegionInfo for i := 0; i < 1000000; i++ { peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} @@ -731,27 +725,25 @@ func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) { StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: []byte(fmt.Sprintf("%20d", i+1)), }, peer, SetApproximateSize(10)) - origin, overlaps, rangeChanged := regions.SetRegion(region) - regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + cluster.Regions.SetRegion(region) items = append(items, region) } b.ResetTimer() go func() { for { - regions.GetRegionSizeByRange([]byte(""), []byte("")) + cluster.GetRegionSizeByRange([]byte(""), []byte("")) time.Sleep(time.Millisecond) } }() for i := 0; i < b.N; i++ { item := items[i%len(items)] item.approximateKeys = int64(200000) - origin, overlaps, rangeChanged := regions.SetRegion(item) - regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + cluster.Regions.SetRegion(item) } } func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { - regions := NewRegionsInfo() + cluster := NewBasicCluster() var items []*RegionInfo for i := 0; i < 1000000; i++ { peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} @@ -761,14 +753,13 @@ func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { StartKey: []byte(fmt.Sprintf("%20d", i)), EndKey: []byte(fmt.Sprintf("%20d", i+1)), }, peer) - origin, overlaps, rangeChanged := regions.SetRegion(region) - regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + cluster.Regions.SetRegion(region) items = append(items, region) } b.ResetTimer() go func() { for { - regions.GetRegionSizeByRange([]byte(""), []byte("")) + cluster.GetRegionSizeByRange([]byte(""), []byte("")) time.Sleep(time.Millisecond) } }() @@ -776,10 +767,9 @@ func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) { b.RunParallel( func(pb *testing.PB) { for pb.Next() { - item := items[mrand.Intn(len(items))] + item := items[rand.Intn(len(items))] n := item.Clone(SetApproximateSize(20)) - origin, overlaps, rangeChanged := regions.SetRegion(n) - regions.UpdateSubTree(item, origin, overlaps, rangeChanged) + cluster.Regions.SetRegion(n) } }, ) diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 526461316a5..ab96e312962 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -686,11 +686,7 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { pdctl.MustPutStore(c, leader.GetServer(), store) } for i := 0; i < 100; i++ { -<<<<<<< HEAD - pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10)) -======= - tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10)) ->>>>>>> d651c6b91 (core: batch get region size (#7252)) + pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10)) } // no store preparing output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound) @@ -717,13 +713,8 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) { c.Assert(p.LeftSeconds, Equals, math.MaxFloat64) // update size -<<<<<<< HEAD - pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10)) - pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40)) -======= - tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10)) - tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40)) ->>>>>>> d651c6b91 (core: batch get region size (#7252)) + pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10)) + pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40)) time.Sleep(2 * time.Second) output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK) c.Assert(json.Unmarshal(output, &p), IsNil)