Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: pd can support bucket steam and save them. #4670

Merged
merged 23 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,35 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

// processReportBuckets update the bucket information.
func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
region := c.core.GetRegion(buckets.GetRegionId())
if region == nil {
bucketEventCounter.WithLabelValues("region_cache_miss").Inc()
return errors.Errorf("region %v not found", buckets.GetRegionId())
}
// use CAS to update the bucket information.
// the two request(A:3,B:2) get the same region and need to update the buckets.
// the A will pass the check and set the version to 3, the B will fail because the region.bucket has changed.
// the retry should keep the old version and the new version will be set to the region.bucket, like two requests (A:2,B:3).
for retry := 0; retry < 3; retry++ {
old := region.GetBuckets()
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
// region should not update if the version of the buckets is less than the old one.
if old != nil && buckets.GetVersion() <= old.GetVersion() {
bucketEventCounter.WithLabelValues("version_not_match").Inc()
return nil
}
failpoint.Inject("concurrentBucketHeartbeat", func() {
time.Sleep(500 * time.Millisecond)
})
if ok := region.UpdateBuckets(buckets, old); ok {
return nil
}
}
bucketEventCounter.WithLabelValues("update_failed").Inc()
return nil
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand All @@ -623,7 +652,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if err != nil {
return err
}
region.CorrectApproximateSize(origin)
region.Inherit(origin)

hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
Expand Down Expand Up @@ -880,6 +909,11 @@ func (c *RaftCluster) GetStores() []*core.StoreInfo {
return c.core.GetStores()
}

// GetLeaderStoreByRegionID returns the leader store of the given region.
func (c *RaftCluster) GetLeaderStoreByRegionID(regionID uint64) *core.StoreInfo {
return c.core.GetLeaderStoreByRegionID(regionID)
}

// GetStore gets store from cluster.
func (c *RaftCluster) GetStore(storeID uint64) *core.StoreInfo {
return c.core.GetStore(storeID)
Expand Down
66 changes: 66 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,46 @@ func (s *testClusterInfoSuite) TestRegionHeartbeatHotStat(c *C) {
c.Assert(stats[4], HasLen, 1)
}

func (s *testClusterInfoSuite) TestBucketHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, nil)

// case1: region is not exist
buckets := &metapb.Buckets{
RegionId: 0,
Version: 1,
Keys: [][]byte{{'1'}, {'2'}},
}
c.Assert(cluster.processReportBuckets(buckets), NotNil)

// case2: bucket can be processed after the region update.
stores := newTestStores(3, "2.0.0")
n, np := uint64(1), uint64(1)
regions := newTestRegions(n, np)
for _, store := range stores {
c.Assert(cluster.putStoreLocked(store), IsNil)
}

c.Assert(cluster.processRegionHeartbeat(regions[0]), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), IsNil)
c.Assert(cluster.processReportBuckets(buckets), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), DeepEquals, buckets)

// case3: the bucket version is same.
c.Assert(cluster.processReportBuckets(buckets), IsNil)
// case4: the bucket version is changed.
buckets.Version = 3
c.Assert(cluster.processReportBuckets(buckets), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), DeepEquals, buckets)

//case5: region update should inherit buckets.
newRegion := regions[0].Clone(core.WithIncConfVer())
c.Assert(cluster.processRegionHeartbeat(newRegion), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), NotNil)
}

func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -662,6 +702,32 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) {
c.Assert(newRegion.GetBytesRead(), Equals, uint64(1000))
}

func (s *testClusterInfoSuite) TestConcurrentReportBucket(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, nil)

regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})}
heartbeatRegions(c, cluster, regions)
c.Assert(cluster.GetRegion(0), NotNil)

bucket1 := &metapb.Buckets{RegionId: 0, Version: 3}
bucket2 := &metapb.Buckets{RegionId: 0, Version: 2}
var wg sync.WaitGroup
wg.Add(1)
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/concurrentBucketHeartbeat", "return(true)"), IsNil)
go func() {
defer wg.Done()
cluster.processReportBuckets(bucket1)
}()
time.Sleep(100 * time.Millisecond)
c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/concurrentBucketHeartbeat"), IsNil)
c.Assert(cluster.processReportBuckets(bucket2), IsNil)
wg.Wait()
c.Assert(cluster.GetRegion(0).GetBuckets(), DeepEquals, bucket1)
}

func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
Expand Down
5 changes: 5 additions & 0 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,8 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque
zap.Int("total", last))
return &pdpb.ReportBatchSplitResponse{}, nil
}

// HandleReportBuckets processes RegionInfo reports from client
func (c *RaftCluster) HandleReportBuckets(buckets *metapb.Buckets) error {
return c.processReportBuckets(buckets)
}
9 changes: 9 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ var (
Help: "Counter of the region event",
}, []string{"event"})

bucketEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "cluster",
Name: "bucket_event",
Help: "Counter of the bucket event",
}, []string{"event"})

schedulerStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Expand Down Expand Up @@ -90,4 +98,5 @@ func init() {
prometheus.MustRegister(clusterStateCPUGauge)
prometheus.MustRegister(clusterStateCurrent)
prometheus.MustRegister(regionListGauge)
prometheus.MustRegister(bucketEventCounter)
}
11 changes: 11 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ func (bc *BasicCluster) GetFollowerStores(region *RegionInfo) []*StoreInfo {
return Stores
}

// GetLeaderStoreByRegionID returns the leader store of the given region.
func (bc *BasicCluster) GetLeaderStoreByRegionID(regionID uint64) *StoreInfo {
bc.RLock()
defer bc.RUnlock()
region := bc.Regions.GetRegion(regionID)
if region == nil || region.GetLeader() == nil {
return nil
}
return bc.Stores.GetStore(region.GetLeader().GetStoreId())
}

// GetLeaderStore returns all Stores that contains the region's leader peer.
func (bc *BasicCluster) GetLeaderStore(region *RegionInfo) *StoreInfo {
bc.RLock()
Expand Down
53 changes: 42 additions & 11 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"strings"
"sync/atomic"
"unsafe"

"github.com/gogo/protobuf/proto"
Expand All @@ -39,7 +40,8 @@ func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
}

// RegionInfo records detail region info.
// Read-Only once created.
// the properties are Read-Only once created except buckets.
// the `buckets` could be modified by the request `report buckets` with greater version.
type RegionInfo struct {
term uint64
meta *metapb.Region
Expand All @@ -58,6 +60,8 @@ type RegionInfo struct {
replicationStatus *replication_modepb.RegionReplicationStatus
QueryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
buckets unsafe.Pointer
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -165,18 +169,20 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
return region
}

// CorrectApproximateSize correct approximate size by the previous size if here exists an reported RegionInfo.
//
// Inherit inherits the buckets and region size from the parent region.
// correct approximate size and buckets by the previous size if here exists a reported RegionInfo.
// See https://github.com/tikv/tikv/issues/11114
func (r *RegionInfo) CorrectApproximateSize(origin *RegionInfo) {
if r.approximateSize != 0 {
return
func (r *RegionInfo) Inherit(origin *RegionInfo) {
// regionSize should not be zero if region is not empty.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can check whether the origin is nil at the beginning of this function and return directly if it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

region size will be 1 if origin is nil and the size of the new region is zero.

if r.GetApproximateSize() == 0 {
if origin != nil {
r.approximateSize = origin.approximateSize
} else {
r.approximateSize = EmptyRegionApproximateSize
}
}

if origin != nil {
r.approximateSize = origin.approximateSize
} else {
r.approximateSize = EmptyRegionApproximateSize
if origin != nil && r.buckets == nil {
r.buckets = origin.buckets
}
}

Expand Down Expand Up @@ -205,6 +211,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
approximateKeys: r.approximateKeys,
interval: proto.Clone(r.interval).(*pdpb.TimeInterval),
replicationStatus: r.replicationStatus,
buckets: r.buckets,
}

for _, opt := range opts {
Expand Down Expand Up @@ -406,6 +413,30 @@ func (r *RegionInfo) GetStat() *pdpb.RegionStat {
}
}

// UpdateBuckets sets the buckets of the region.
func (r *RegionInfo) UpdateBuckets(buckets, old *metapb.Buckets) bool {
// the bucket can't be nil except in the test cases.
if buckets == nil {
return true
}
// only need to update bucket keys, versions.
newBuckets := &metapb.Buckets{
RegionId: buckets.GetRegionId(),
Version: buckets.GetVersion(),
Keys: buckets.GetKeys(),
}
return atomic.CompareAndSwapPointer(&r.buckets, unsafe.Pointer(old), unsafe.Pointer(newBuckets))
}

// GetBuckets returns the buckets of the region.
func (r *RegionInfo) GetBuckets() *metapb.Buckets {
if r == nil || r.buckets == nil {
return nil
}
buckets := atomic.LoadPointer(&r.buckets)
return (*metapb.Buckets)(buckets)
}

// GetApproximateSize returns the approximate size of the region.
func (r *RegionInfo) GetApproximateSize() int64 {
return r.approximateSize
Expand Down
44 changes: 42 additions & 2 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ func (s *testRegionInfoSuite) TestSortedEqual(c *C) {
}
}

func (s *testRegionInfoSuite) TestCorrectRegionApproximateSize(c *C) {
func (s *testRegionInfoSuite) TestInherit(c *C) {
// size in MB
// case for approximateSize
testcases := []struct {
originExists bool
originSize uint64
Expand All @@ -179,9 +180,36 @@ func (s *testRegionInfoSuite) TestCorrectRegionApproximateSize(c *C) {
}
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.approximateSize = int64(t.size)
r.CorrectApproximateSize(origin)
r.Inherit(origin)
c.Assert(r.approximateSize, Equals, int64(t.expect))
}

// bucket
data := []struct {
originBuckets *metapb.Buckets
buckets *metapb.Buckets
same bool
}{
{nil, nil, true},
{nil, &metapb.Buckets{RegionId: 1, Version: 2}, false},
{&metapb.Buckets{RegionId: 1, Version: 2}, &metapb.Buckets{RegionId: 1, Version: 3}, false},
{&metapb.Buckets{RegionId: 1, Version: 2}, nil, true},
}
for _, d := range data {
var origin *RegionInfo
if d.originBuckets != nil {
origin = NewRegionInfo(&metapb.Region{Id: 100}, nil)
origin.UpdateBuckets(d.originBuckets, origin.GetBuckets())
}
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.UpdateBuckets(d.buckets, r.GetBuckets())
r.Inherit(origin)
if d.same {
c.Assert(r.GetBuckets(), DeepEquals, d.originBuckets)
} else {
c.Assert(r.GetBuckets(), Not(DeepEquals), d.originBuckets)
}
}
}

func (s *testRegionInfoSuite) TestRegionRoundingFlow(c *C) {
Expand Down Expand Up @@ -572,6 +600,18 @@ func checkRegions(c *C, regions *RegionsInfo) {
}
}

func BenchmarkUpdateBuckets(b *testing.B) {
region := NewTestRegionInfo([]byte{}, []byte{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
buckets := &metapb.Buckets{RegionId: 0, Version: uint64(i)}
region.UpdateBuckets(buckets, region.GetBuckets())
}
if region.GetBuckets().GetVersion() != uint64(b.N-1) {
b.Fatal("update buckets failed")
}
}

func BenchmarkRandomRegion(b *testing.B) {
regions := NewRegionsInfo()
for i := 0; i < 5000000; i++ {
Expand Down
Loading