Skip to content

Commit

Permalink
put bucket into region
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Mar 8, 2022
1 parent ea4c1d7 commit 36d68c7
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 238 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,6 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220228094105-9bb22e5a97fc h1:s4ObV2nLm0n8Y1f71qNbJUseGe6r96p8Uq+XGQHb7Kc=
github.com/pingcap/kvproto v0.0.0-20220228094105-9bb22e5a97fc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27 h1:+Ax2NXyAFIITrzgSPWBo3SBZtX/D60VeELCG0B0hqiY=
github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
37 changes: 21 additions & 16 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ package cluster
import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/coreos/go-semver/semver"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -50,6 +45,10 @@ import (
"github.com/tikv/pd/server/versioninfo"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"net/http"
"strconv"
"sync"
"time"
)

var backgroundJobInterval = 10 * time.Second
Expand All @@ -61,6 +60,7 @@ const (
// since the once the store is add or remove, we shouldn't return an error even if the store limit is failed to persist.
persistLimitRetryTimes = 5
persistLimitWaitTime = 100 * time.Millisecond
retry = 3
)

// Server is the interface for cluster.
Expand Down Expand Up @@ -593,7 +593,22 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {

// processBucketHeartbeat update the bucket information.
func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error {
c.core.PutBuckets(buckets)
c.RLock()
region := c.core.GetRegion(buckets.GetRegionId())
c.RUnlock()
if region == nil {
return errors.Errorf("region %v not found", buckets.GetRegionId())
}
for i := 0; i < retry; i++ {
old := region.GetBuckets()
// region should not update if the version of the buckets is less than the old one.
if old == nil || old.Version >= buckets.Version {
return errors.Errorf("region %v bucket version is too small ", buckets.GetRegionId())
}
if ok := region.SetBuckets(buckets); ok {
break
}
}
return nil
}

Expand Down Expand Up @@ -771,16 +786,6 @@ func (c *RaftCluster) GetRegions() []*core.RegionInfo {
return c.core.GetRegions()
}

// GetBucketByRegionID returns the bucket that the region belongs to.
func (c *RaftCluster) GetBucketByRegionID(regionID uint64) *metapb.Buckets {
return c.core.GetBucketByRegionID(regionID)
}

// GetStoreRegions returns the bucket that key located.
func (c *RaftCluster) GetBucketByKey(key []byte) *metapb.Buckets {
return c.core.GetBucketByKey(key)
}

// GetRegionCount returns total count of regions
func (c *RaftCluster) GetRegionCount() int {
return c.core.GetRegionCount()
Expand Down
29 changes: 0 additions & 29 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,16 @@ type BasicCluster struct {
sync.RWMutex
Stores *StoresInfo
Regions *RegionsInfo
Buckets *BucketsInfo
}

// NewBasicCluster creates a BasicCluster.
func NewBasicCluster() *BasicCluster {
return &BasicCluster{
Stores: NewStoresInfo(),
Regions: NewRegionsInfo(),
Buckets: NewBucketsInfo(),
}
}

// GetBuckets returns the buckets of the given buckets.
func (bc *BasicCluster) GetBucketByRegionID(regionID uint64) *metapb.Buckets {
bc.RLock()
defer bc.RUnlock()
return bc.Buckets.GetByRegionID(regionID)
}

// GetBucketByKey returns the bucket of the given key.
func (bc *BasicCluster) GetBucketByKey(startKey []byte) *metapb.Buckets {
bc.RLock()
defer bc.RUnlock()
return bc.Buckets.GetKey(startKey)
}

// GetStores returns all Stores in the cluster.
func (bc *BasicCluster) GetStores() []*StoreInfo {
bc.RLock()
Expand Down Expand Up @@ -370,13 +354,6 @@ func isRegionRecreated(region *RegionInfo) bool {
return region.GetRegionEpoch().GetVersion() == 1 && region.GetRegionEpoch().GetConfVer() == 1 && (len(region.GetStartKey()) != 0 || len(region.GetEndKey()) != 0)
}

// PutBuckets puts a buckets
func (bc *BasicCluster) PutBuckets(bucket *metapb.Buckets) []*metapb.Buckets {
bc.Lock()
defer bc.Unlock()
return bc.Buckets.SetBuckets(bucket)
}

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
origin, overlaps := bc.getRelevantRegions(region)
Expand Down Expand Up @@ -465,12 +442,6 @@ func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo {
return bc.Regions.GetOverlaps(region)
}

// BucketSetInformer provides access to a shared informer of buckets.
type BucketSetInformer interface {
GetBucketByRegionID(regionID uint64) *metapb.Buckets
GetBucketByKey(key []byte) *metapb.Buckets
}

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
GetRegionCount() int
Expand Down
148 changes: 0 additions & 148 deletions server/core/bucket.go

This file was deleted.

42 changes: 0 additions & 42 deletions server/core/bucket_test.go

This file was deleted.

19 changes: 19 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"strings"
"sync/atomic"
"unsafe"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -58,6 +59,7 @@ type RegionInfo struct {
replicationStatus *replication_modepb.RegionReplicationStatus
QueryStats *pdpb.QueryStats
flowRoundDivisor uint64
buckets unsafe.Pointer
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
Expand Down Expand Up @@ -406,6 +408,23 @@ func (r *RegionInfo) GetStat() *pdpb.RegionStat {
}
}

// SetBuckets sets the buckets of the region.
func (r *RegionInfo) SetBuckets(buckets *metapb.Buckets) bool {
// only need to update bucket keys,versions.
newBuckets := &metapb.Buckets{
RegionId: buckets.RegionId,
Version: buckets.Version,
Keys: buckets.Keys,
}
return atomic.CompareAndSwapPointer(&r.buckets, r.buckets, unsafe.Pointer(newBuckets))
}

// GetBuckets returns the buckets of the region.
func (r *RegionInfo) GetBuckets() *metapb.Buckets {
buckets := atomic.LoadPointer(&r.buckets)
return (*metapb.Buckets)(buckets)
}

// GetApproximateSize returns the approximate size of the region.
func (r *RegionInfo) GetApproximateSize() int64 {
return r.approximateSize
Expand Down
4 changes: 4 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,10 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error {
lastBind = time.Now()
}
start := time.Now()
buckets := request.GetBuckets()
if buckets == nil || len(buckets.Keys) == 0 {
continue
}
err = rc.HandleBucketHeartbeat(request.Buckets)
if err != nil {
regionHeartbeatCounter.WithLabelValues("report", "err").Inc()
Expand Down
1 change: 0 additions & 1 deletion server/schedule/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type Cluster interface {
core.RegionSetInformer
core.StoreSetInformer
core.StoreSetController
core.BucketSetInformer

statistics.RegionStatInformer
statistics.StoreStatInformer
Expand Down

0 comments on commit 36d68c7

Please sign in to comment.