Skip to content

Commit

Permalink
resolve the conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Feb 6, 2024
1 parent b5e9460 commit 61253b9
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 81 deletions.
1 change: 0 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,6 @@ func (c *RaftCluster) checkStores() {
if err := c.ReadyToServe(storeID); err != nil {
log.Error("change store to serving failed",
zap.Stringer("store", store.GetMeta()),
zap.Int("region-count", c.GetTotalRegionCount()),
errs.ZapError(err))
}
} else if c.IsPrepared() {
Expand Down
31 changes: 28 additions & 3 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.uber.org/zap"
)

var scanRegionLimit = 1000

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
Stores struct {
Expand Down Expand Up @@ -384,9 +386,32 @@ func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo {

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 {
bc.Regions.mu.RLock()
defer bc.Regions.mu.RUnlock()
return bc.Regions.GetRegionSizeByRange(startKey, endKey)
var size int64
for {
bc.Regions.mu.RLock()
var cnt int
bc.Regions.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
if cnt >= scanRegionLimit {
return false
}
cnt++
startKey = region.GetEndKey()
size += region.GetApproximateSize()
return true
})
bc.Regions.mu.RUnlock()
if cnt == 0 {
break
}
if len(startKey) == 0 {
break
}
}

return size
}

func (bc *BasicCluster) getWriteRate(
Expand Down
38 changes: 0 additions & 38 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ import (
"go.uber.org/zap"
)

<<<<<<< HEAD:server/core/region.go
=======
const (
randomRegionMaxRetry = 10
scanRegionLimit = 1000
)

>>>>>>> d651c6b91 (core: batch get region size (#7252)):pkg/core/region.go
// errRegionIsStale is error info for region is stale.
func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
Expand Down Expand Up @@ -1199,36 +1191,6 @@ func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(regio
r.tree.scanRange(startKey, iterator)
}

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
var size int64
for {
r.t.RLock()
var cnt int
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
if cnt >= scanRegionLimit {
return false
}
cnt++
startKey = region.GetEndKey()
size += region.GetApproximateSize()
return true
})
r.t.RUnlock()
if cnt == 0 {
break
}
if len(startKey) == 0 {
break
}
}

return size
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) {
p, n := r.tree.getAdjacentRegions(region)
Expand Down
44 changes: 17 additions & 27 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ package core
import (
"fmt"
"math"
<<<<<<< HEAD:server/core/region_test.go
"math/rand"
=======
mrand "math/rand"
>>>>>>> d651c6b91 (core: batch get region size (#7252)):pkg/core/region_test.go
"strconv"
"strings"
"testing"
Expand All @@ -30,6 +26,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/server/id"
)
Expand Down Expand Up @@ -668,7 +665,7 @@ func BenchmarkRandomRegion(b *testing.B) {
}

func BenchmarkRandomSetRegion(b *testing.B) {
regions := NewRegionsInfo()
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
Expand All @@ -678,22 +675,20 @@ func BenchmarkRandomSetRegion(b *testing.B) {
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
item.approximateSize = int64(20)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
cluster.Regions.SetRegion(item)
}
}

func TestGetRegionSizeByRange(t *testing.T) {
regions := NewRegionsInfo()
cluster := NewBasicCluster()
nums := 1000010
for i := 0; i < nums; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
Expand All @@ -707,21 +702,20 @@ func TestGetRegionSizeByRange(t *testing.T) {
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: endKey,
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
cluster.Regions.SetRegion(region)
}
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(""))
totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(""))
require.Equal(t, int64(nums*10), totalSize)
for i := 1; i < 10; i++ {
verifyNum := nums / i
endKey := fmt.Sprintf("%20d", verifyNum)
totalSize := regions.GetRegionSizeByRange([]byte(""), []byte(endKey))
totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(endKey))
require.Equal(t, int64(verifyNum*10), totalSize)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) {
regions := NewRegionsInfo()
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
Expand All @@ -731,27 +725,25 @@ func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) {
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer, SetApproximateSize(10))
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
cluster.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
origin, overlaps, rangeChanged := regions.SetRegion(item)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
cluster.Regions.SetRegion(item)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) {
regions := NewRegionsInfo()
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
Expand All @@ -761,25 +753,23 @@ func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) {
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
origin, overlaps, rangeChanged := regions.SetRegion(region)
regions.UpdateSubTree(region, origin, overlaps, rangeChanged)
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
regions.GetRegionSizeByRange([]byte(""), []byte(""))
cluster.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()

b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
item := items[mrand.Intn(len(items))]
item := items[rand.Intn(len(items))]
n := item.Clone(SetApproximateSize(20))
origin, overlaps, rangeChanged := regions.SetRegion(n)
regions.UpdateSubTree(item, origin, overlaps, rangeChanged)
cluster.Regions.SetRegion(n)
}
},
)
Expand Down
15 changes: 3 additions & 12 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,7 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
pdctl.MustPutStore(c, leader.GetServer(), store)
}
for i := 0; i < 100; i++ {
<<<<<<< HEAD
pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10))
=======
tests.MustPutRegion(re, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10))
>>>>>>> d651c6b91 (core: batch get region size (#7252))
pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10))
}
// no store preparing
output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound)
Expand All @@ -717,13 +713,8 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
c.Assert(p.LeftSeconds, Equals, math.MaxFloat64)

// update size
<<<<<<< HEAD
pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40))
=======
tests.MustPutRegion(re, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10))
tests.MustPutRegion(re, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40))
>>>>>>> d651c6b91 (core: batch get region size (#7252))
pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40))
time.Sleep(2 * time.Second)
output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK)
c.Assert(json.Unmarshal(output, &p), IsNil)
Expand Down

0 comments on commit 61253b9

Please sign in to comment.