diff --git a/go.sum b/go.sum index 46adfeadafc3..e4ce87ca4e75 100644 --- a/go.sum +++ b/go.sum @@ -400,8 +400,6 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220228094105-9bb22e5a97fc h1:s4ObV2nLm0n8Y1f71qNbJUseGe6r96p8Uq+XGQHb7Kc= -github.com/pingcap/kvproto v0.0.0-20220228094105-9bb22e5a97fc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27 h1:+Ax2NXyAFIITrzgSPWBo3SBZtX/D60VeELCG0B0hqiY= github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 752ed8fa5da0..9e91e9ce9e68 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -17,11 +17,6 @@ package cluster import ( "context" "fmt" - "net/http" - "strconv" - "sync" - "time" - "github.com/coreos/go-semver/semver" "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" @@ -50,6 +45,10 @@ import ( "github.com/tikv/pd/server/versioninfo" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" + "net/http" + "strconv" + "sync" + "time" ) var backgroundJobInterval = 10 * time.Second @@ -61,6 +60,7 @@ const ( // since the once the store is add or remove, we shouldn't return an error even if the store limit is failed to persist. persistLimitRetryTimes = 5 persistLimitWaitTime = 100 * time.Millisecond + retry = 3 ) // Server is the interface for cluster. @@ -593,7 +593,22 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { // processBucketHeartbeat update the bucket information. func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error { - c.core.PutBuckets(buckets) + c.RLock() + region := c.core.GetRegion(buckets.GetRegionId()) + c.RUnlock() + if region == nil { + return errors.Errorf("region %v not found", buckets.GetRegionId()) + } + for i := 0; i < retry; i++ { + old := region.GetBuckets() + // region should not update if the version of the buckets is less than the old one. + if old == nil || old.Version >= buckets.Version { + return errors.Errorf("region %v bucket version is too small ", buckets.GetRegionId()) + } + if ok := region.SetBuckets(buckets); ok { + break + } + } return nil } @@ -771,16 +786,6 @@ func (c *RaftCluster) GetRegions() []*core.RegionInfo { return c.core.GetRegions() } -// GetBucketByRegionID returns the bucket that the region belongs to. -func (c *RaftCluster) GetBucketByRegionID(regionID uint64) *metapb.Buckets { - return c.core.GetBucketByRegionID(regionID) -} - -// GetStoreRegions returns the bucket that key located. -func (c *RaftCluster) GetBucketByKey(key []byte) *metapb.Buckets { - return c.core.GetBucketByKey(key) -} - // GetRegionCount returns total count of regions func (c *RaftCluster) GetRegionCount() int { return c.core.GetRegionCount() diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 351e704e2414..fe5eeec84207 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -31,7 +31,6 @@ type BasicCluster struct { sync.RWMutex Stores *StoresInfo Regions *RegionsInfo - Buckets *BucketsInfo } // NewBasicCluster creates a BasicCluster. @@ -39,24 +38,9 @@ func NewBasicCluster() *BasicCluster { return &BasicCluster{ Stores: NewStoresInfo(), Regions: NewRegionsInfo(), - Buckets: NewBucketsInfo(), } } -// GetBuckets returns the buckets of the given buckets. -func (bc *BasicCluster) GetBucketByRegionID(regionID uint64) *metapb.Buckets { - bc.RLock() - defer bc.RUnlock() - return bc.Buckets.GetByRegionID(regionID) -} - -// GetBucketByKey returns the bucket of the given key. -func (bc *BasicCluster) GetBucketByKey(startKey []byte) *metapb.Buckets { - bc.RLock() - defer bc.RUnlock() - return bc.Buckets.GetKey(startKey) -} - // GetStores returns all Stores in the cluster. func (bc *BasicCluster) GetStores() []*StoreInfo { bc.RLock() @@ -370,13 +354,6 @@ func isRegionRecreated(region *RegionInfo) bool { return region.GetRegionEpoch().GetVersion() == 1 && region.GetRegionEpoch().GetConfVer() == 1 && (len(region.GetStartKey()) != 0 || len(region.GetEndKey()) != 0) } -// PutBuckets puts a buckets -func (bc *BasicCluster) PutBuckets(bucket *metapb.Buckets) []*metapb.Buckets { - bc.Lock() - defer bc.Unlock() - return bc.Buckets.SetBuckets(bucket) -} - // PreCheckPutRegion checks if the region is valid to put. func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { origin, overlaps := bc.getRelevantRegions(region) @@ -465,12 +442,6 @@ func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo { return bc.Regions.GetOverlaps(region) } -// BucketSetInformer provides access to a shared informer of buckets. -type BucketSetInformer interface { - GetBucketByRegionID(regionID uint64) *metapb.Buckets - GetBucketByKey(key []byte) *metapb.Buckets -} - // RegionSetInformer provides access to a shared informer of regions. type RegionSetInformer interface { GetRegionCount() int diff --git a/server/core/bucket.go b/server/core/bucket.go deleted file mode 100644 index 34ff47d3a86b..000000000000 --- a/server/core/bucket.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "bytes" - "sync" - - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/tikv/pd/pkg/btree" -) - -var _ btree.Item = ®ionKeyItem{} - -// bucketItem is a btree item. -type regionKeyItem struct { - ID uint64 - startKey []byte - endKey []byte -} - -// Less returns true if the region start key is less than the other. -func (r *regionKeyItem) Less(other btree.Item) bool { - left := r.startKey - right := other.(*regionKeyItem).startKey - return bytes.Compare(left, right) < 0 -} - -func (r *regionKeyItem) contains(key []byte) bool { - start, end := r.startKey, r.endKey - return bytes.Compare(key, start) >= 0 && (len(end) == 0 || bytes.Compare(key, end) < 0) -} - -// BucketsInfo for export -type BucketsInfo struct { - mu sync.RWMutex - buckets map[uint64]*metapb.Buckets // regionID -> buckets - tree *btree.BTree // for sort -} - -// NewBucketsInfo creates new BucketsInfo. -func NewBucketsInfo() *BucketsInfo { - return &BucketsInfo{ - buckets: make(map[uint64]*metapb.Buckets), - tree: btree.New(defaultBTreeDegree), - } -} - -// SetBuckets puts a buckets and the origin's bucket will be deleted. -func (b *BucketsInfo) SetBuckets(buckets *metapb.Buckets) []*metapb.Buckets { - count := len(buckets.Keys) - startKey := buckets.Keys[0] - endKey := buckets.Keys[count-1] - // the key ranges of the bucket don't change. - if origin := b.GetByRegionID(buckets.RegionId); origin != nil && origin.Version == buckets.Version { - // only update state if version is the same. - // todo: stats should be merged if the new buckets is the same as the old buckets. - b.buckets[buckets.RegionId] = buckets - return []*metapb.Buckets{origin} - } - - origins := b.GetByRange(startKey, endKey) - for _, origin := range origins { - item := regionKeyItem{ID: origin.RegionId, startKey: origin.Keys[0], endKey: origin.Keys[len(origin.Keys)-1]} - b.tree.Delete(&item) - } - // todo : stats should be merged if the new buckets is the same as the old buckets. - b.buckets[buckets.RegionId] = buckets - b.tree.ReplaceOrInsert(®ionKeyItem{ - ID: buckets.RegionId, - startKey: startKey, - endKey: endKey, - }) - return origins -} - -// GetByRange returns buckets array by key range. -func (b *BucketsInfo) GetByRange(startKey, endKey []byte) []*metapb.Buckets { - var res []*metapb.Buckets - b.scanRange(startKey, func(item *regionKeyItem) bool { - if len(endKey) > 0 && bytes.Compare(item.startKey, endKey) >= 0 { - return false - } - res = append(res, b.buckets[item.ID]) - return true - }) - return res -} - -// GetByRegionID returns buckets by regionID -func (b *BucketsInfo) GetByRegionID(regionID uint64) *metapb.Buckets { - b.mu.RLock() - defer b.mu.RUnlock() - return b.buckets[regionID] -} - -// GetKey returns the key of the bucket. -func (b *BucketsInfo) GetKey(key []byte) *metapb.Buckets { - b.mu.RLock() - defer b.mu.RUnlock() - item := b.find(key) - if item != nil { - return b.buckets[item.ID] - } - return nil -} - -func (b *BucketsInfo) find(startKey []byte) *regionKeyItem { - item := ®ionKeyItem{ - startKey: startKey, - } - var result *regionKeyItem - b.mu.RLock() - defer b.mu.RUnlock() - b.tree.DescendLessOrEqual(item, func(i btree.Item) bool { - result = i.(*regionKeyItem) - return false - }) - if result != nil && !result.contains(item.startKey) { - return nil - } - return result -} - -func (b *BucketsInfo) scanRange(startKey []byte, f func(item *regionKeyItem) bool) { - startItem := b.find(startKey) - if startItem == nil { - startItem = ®ionKeyItem{ - startKey: startKey, - } - } - b.tree.AscendGreaterOrEqual(startItem, func(item btree.Item) bool { - i := item.(*regionKeyItem) - return f(i) - }) -} diff --git a/server/core/bucket_test.go b/server/core/bucket_test.go deleted file mode 100644 index 4610ae1b5cec..000000000000 --- a/server/core/bucket_test.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package core - -import ( - . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/metapb" -) - -var _ = Suite(&testBucketSuite{}) - -type testBucketSuite struct{} - -func (b *testBucketSuite) TestBucket(c *C) { - bucketsInfo := NewBucketsInfo() - - // case1: add a new bucket - bucket := &metapb.Buckets{RegionId: 1, Version: 1, Keys: [][]byte{{1}, {10}, {20}}} - bucketsInfo.SetBuckets(bucket) - c.Assert(bucketsInfo.GetByRegionID(1), Equals, bucket) - c.Assert(bucketsInfo.GetByRange([]byte{1}, []byte{10}), HasLen, 1) - c.Assert(bucketsInfo.GetByRange([]byte{1}, []byte{10})[0], Equals, bucket) - - // case2: update a bucket with a new version and new range - // the origin bucket key range will be deleted - bucket = &metapb.Buckets{RegionId: 1, Version: 2, Keys: [][]byte{{2}, {10}}} - bucketsInfo.SetBuckets(bucket) - c.Assert(bucketsInfo.GetByRegionID(1), Equals, bucket) - c.Assert(bucketsInfo.GetByRange([]byte{1}, []byte{10}), HasLen, 1) - c.Assert(bucketsInfo.GetByRange([]byte{10}, []byte{20}), IsNil) -} diff --git a/server/core/region.go b/server/core/region.go index eb5b67b1a6ed..0e5727ce55e2 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -21,6 +21,7 @@ import ( "reflect" "sort" "strings" + "sync/atomic" "unsafe" "github.com/gogo/protobuf/proto" @@ -58,6 +59,7 @@ type RegionInfo struct { replicationStatus *replication_modepb.RegionReplicationStatus QueryStats *pdpb.QueryStats flowRoundDivisor uint64 + buckets unsafe.Pointer } // NewRegionInfo creates RegionInfo with region's meta and leader peer. @@ -406,6 +408,23 @@ func (r *RegionInfo) GetStat() *pdpb.RegionStat { } } +// SetBuckets sets the buckets of the region. +func (r *RegionInfo) SetBuckets(buckets *metapb.Buckets) bool { + // only need to update bucket keys,versions. + newBuckets := &metapb.Buckets{ + RegionId: buckets.RegionId, + Version: buckets.Version, + Keys: buckets.Keys, + } + return atomic.CompareAndSwapPointer(&r.buckets, r.buckets, unsafe.Pointer(newBuckets)) +} + +// GetBuckets returns the buckets of the region. +func (r *RegionInfo) GetBuckets() *metapb.Buckets { + buckets := atomic.LoadPointer(&r.buckets) + return (*metapb.Buckets)(buckets) +} + // GetApproximateSize returns the approximate size of the region. func (r *RegionInfo) GetApproximateSize() int64 { return r.approximateSize diff --git a/server/grpc_service.go b/server/grpc_service.go index 8a2735689818..6528c88029f0 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -781,6 +781,10 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { lastBind = time.Now() } start := time.Now() + buckets := request.GetBuckets() + if buckets == nil || len(buckets.Keys) == 0 { + continue + } err = rc.HandleBucketHeartbeat(request.Buckets) if err != nil { regionHeartbeatCounter.WithLabelValues("report", "err").Inc() diff --git a/server/schedule/cluster.go b/server/schedule/cluster.go index 41ab4c7a8e21..e5cee73645df 100644 --- a/server/schedule/cluster.go +++ b/server/schedule/cluster.go @@ -25,7 +25,6 @@ type Cluster interface { core.RegionSetInformer core.StoreSetInformer core.StoreSetController - core.BucketSetInformer statistics.RegionStatInformer statistics.StoreStatInformer