Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: pd can support bucket steam and save them. #4670

Merged
merged 23 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (
// since the once the store is add or remove, we shouldn't return an error even if the store limit is failed to persist.
persistLimitRetryTimes = 5
persistLimitWaitTime = 100 * time.Millisecond
retry = 3
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -606,6 +607,33 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

// processBucketHeartbeat update the bucket information.
func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error {
c.RLock()
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
region := c.core.GetRegion(buckets.GetRegionId())
c.RUnlock()
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
if region == nil {
bucketEventCounter.WithLabelValues("region_cache_miss").Inc()
return errors.Errorf("region %v not found", buckets.GetRegionId())
}

for i := 0; i < retry; i++ {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
old := region.GetBuckets()
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
// region should not update if the version of the buckets is less than the old one.
if old != nil && old.Version >= buckets.Version {
bucketEventCounter.WithLabelValues("version_not_match").Inc()
return nil
}
if ok := region.UpdateBuckets(buckets); ok {
log.Info("update buckets successful", zap.Uint64("region-id", buckets.GetRegionId()),
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
zap.Uint64("version", buckets.Version))
bucketEventCounter.WithLabelValues("update_cache").Inc()
return nil
}
}
return nil
}

var regionGuide = core.GenerateRegionGuideFunc(true)

// processRegionHeartbeat updates the region information.
Expand All @@ -620,7 +648,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if err != nil {
return err
}
region.CorrectApproximateSize(origin)
region.Inherit(origin)

hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
Expand Down
41 changes: 41 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,51 @@ func (s *testClusterInfoSuite) TestRegionHeartbeatHotStat(c *C) {
c.Assert(stats[4], HasLen, 1)
}

func (s *testClusterInfoSuite) TestBucketHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, nil)

// case1: region is not exist
buckets := &metapb.Buckets{
RegionId: 0,
Version: 1,
Keys: [][]byte{{'1'}, {'2'}},
}
c.Assert(cluster.processBucketHeartbeat(buckets), NotNil)

// case2: bucket can be processed after the region update.
stores := newTestStores(3, "2.0.0")
n, np := uint64(1), uint64(1)
regions := newTestRegions(n, np)
for _, store := range stores {
c.Assert(cluster.putStoreLocked(store), IsNil)
}

c.Assert(cluster.processRegionHeartbeat(regions[0]), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), IsNil)
c.Assert(cluster.processBucketHeartbeat(buckets), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), DeepEquals, buckets)

// case3: the bucket version is same.
c.Assert(cluster.processBucketHeartbeat(buckets), IsNil)
// case4: the bucket version is changed.
buckets.Version = 3
c.Assert(cluster.processBucketHeartbeat(buckets), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), DeepEquals, buckets)

//case5: region update should inherit buckets.
newRegion := regions[0].Clone(core.WithIncConfVer())
c.Assert(cluster.processRegionHeartbeat(newRegion), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), NotNil)
}

func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())

bufferflies marked this conversation as resolved.
Show resolved Hide resolved
cluster.coordinator = newCoordinator(s.ctx, cluster, nil)

n, np := uint64(3), uint64(3)
Expand Down
5 changes: 5 additions & 0 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,8 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque
zap.Int("total", last))
return &pdpb.ReportBatchSplitResponse{}, nil
}

// HandleBucketHeartbeat processes RegionInfo reports from client
func (c *RaftCluster) HandleBucketHeartbeat(buckets *metapb.Buckets) error {
return c.processBucketHeartbeat(buckets)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this abstruction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the first, HandleBucketHeartbeat should handle the change of the bucket meta(keys and version), but in the second , it will handle the statistics.

}
9 changes: 9 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ var (
Help: "Counter of the region event",
}, []string{"event"})

bucketEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "cluster",
Name: "bucket_event",
Help: "Counter of the bucket event",
}, []string{"event"})

schedulerStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Expand Down Expand Up @@ -90,4 +98,5 @@ func init() {
prometheus.MustRegister(clusterStateCPUGauge)
prometheus.MustRegister(clusterStateCurrent)
prometheus.MustRegister(regionListGauge)
prometheus.MustRegister(bucketEventCounter)
}
49 changes: 40 additions & 9 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"strings"
"sync/atomic"
"unsafe"

"github.com/gogo/protobuf/proto"
Expand All @@ -39,7 +40,8 @@ func errRegionIsStale(region *metapb.Region, origin *metapb.Region) error {
}

// RegionInfo records detail region info.
// Read-Only once created.
// most properties is Read-Only once created exclude buckets.
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
// buckets should be modified through report buckets and the version is greater than the current.
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
type RegionInfo struct {
term uint64
meta *metapb.Region
Expand All @@ -58,6 +60,7 @@ type RegionInfo struct {
replicationStatus *replication_modepb.RegionReplicationStatus
QueryStats *pdpb.QueryStats
flowRoundDivisor uint64
buckets unsafe.Pointer
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -165,18 +168,21 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
return region
}

// Inherit inherits the buckets and region size from the parent region.
// CorrectApproximateSize correct approximate size by the previous size if here exists an reported RegionInfo.
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
//
// See https://github.com/tikv/tikv/issues/11114
func (r *RegionInfo) CorrectApproximateSize(origin *RegionInfo) {
if r.approximateSize != 0 {
return
func (r *RegionInfo) Inherit(origin *RegionInfo) {
// regionSize should not be zero if region is not empty.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can check whether the origin is nil at the beginning of this function and return directly if it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

region size will be 1 if origin is nil and the size of the new region is zero.

if r.GetApproximateSize() == 0 {
if origin != nil {
r.approximateSize = origin.approximateSize
} else {
r.approximateSize = EmptyRegionApproximateSize
}
}

if origin != nil {
r.approximateSize = origin.approximateSize
} else {
r.approximateSize = EmptyRegionApproximateSize
if origin != nil && r.buckets == nil {
r.buckets = origin.buckets
}
}

Expand Down Expand Up @@ -205,6 +211,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
approximateKeys: r.approximateKeys,
interval: proto.Clone(r.interval).(*pdpb.TimeInterval),
replicationStatus: r.replicationStatus,
buckets: r.buckets,
}

for _, opt := range opts {
Expand Down Expand Up @@ -406,6 +413,30 @@ func (r *RegionInfo) GetStat() *pdpb.RegionStat {
}
}

// UpdateBuckets sets the buckets of the region.
func (r *RegionInfo) UpdateBuckets(buckets *metapb.Buckets) bool {
// bucket is only nil in test cases.
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
if buckets == nil {
return true
}
// only need to update bucket keys,versions.
newBuckets := &metapb.Buckets{
RegionId: buckets.GetRegionId(),
Version: buckets.GetVersion(),
Keys: buckets.GetKeys(),
}
return atomic.CompareAndSwapPointer(&r.buckets, r.buckets, unsafe.Pointer(newBuckets))
}

// GetBuckets returns the buckets of the region.
func (r *RegionInfo) GetBuckets() *metapb.Buckets {
if r == nil || r.buckets == nil {
return nil
}
buckets := atomic.LoadPointer(&r.buckets)
return (*metapb.Buckets)(buckets)
}

// GetApproximateSize returns the approximate size of the region.
func (r *RegionInfo) GetApproximateSize() int64 {
return r.approximateSize
Expand Down
33 changes: 31 additions & 2 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ func (s *testRegionInfoSuite) TestSortedEqual(c *C) {
}
}

func (s *testRegionInfoSuite) TestCorrectRegionApproximateSize(c *C) {
func (s *testRegionInfoSuite) TestInherit(c *C) {
// size in MB
// case for approximateSize
testcases := []struct {
originExists bool
originSize uint64
Expand All @@ -179,9 +180,37 @@ func (s *testRegionInfoSuite) TestCorrectRegionApproximateSize(c *C) {
}
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.approximateSize = int64(t.size)
r.CorrectApproximateSize(origin)
r.Inherit(origin)
c.Assert(r.approximateSize, Equals, int64(t.expect))
}

// bucket
data := []struct {
originExist bool
originBuckets *metapb.Buckets
buckets *metapb.Buckets
same bool
}{
{false, nil, nil, true},
{false, nil, &metapb.Buckets{RegionId: 1, Version: 2}, false},
{true, &metapb.Buckets{RegionId: 1, Version: 2}, &metapb.Buckets{RegionId: 1, Version: 3}, false},
{true, &metapb.Buckets{RegionId: 1, Version: 2}, nil, true},
}
for _, d := range data {
var origin *RegionInfo
if d.originExist {
origin = NewRegionInfo(&metapb.Region{Id: 100}, nil)
origin.UpdateBuckets(d.originBuckets)
}
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.UpdateBuckets(d.buckets)
r.Inherit(origin)
if d.same {
c.Assert(r.GetBuckets(), DeepEquals, d.originBuckets)
} else {
c.Assert(r.GetBuckets(), Not(DeepEquals), d.originBuckets)
}
}
}

func (s *testRegionInfoSuite) TestRegionRoundingFlow(c *C) {
Expand Down
Loading