Skip to content

Commit

Permalink
solve ABA
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Mar 31, 2022
1 parent a78e632 commit 93976a9
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 18 deletions.
21 changes: 15 additions & 6 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,22 @@ func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error {
bucketEventCounter.WithLabelValues("region_cache_miss").Inc()
return errors.Errorf("region %v not found", buckets.GetRegionId())
}

// region should not update if the version of the buckets is less than the old one.
if old := region.GetBuckets(); old != nil && old.Version >= buckets.Version {
bucketEventCounter.WithLabelValues("version_not_match").Inc()
return nil
// use CAS to update the bucket information.
// the two request(A:3,B:2) get the same region and need to update the buckets.
// the A will pass the check and set the version to 3, the B will fail because the region.bucket has changed.
// the retry should keep the old version and the new version will be set to the region.bucket, like two requests (A:2,B:3).
for retry := 0; retry < 3; retry++ {
old := region.GetBuckets()
// region should not update if the version of the buckets is less than the old one.
if old != nil && buckets.GetVersion() <= old.GetVersion() {
bucketEventCounter.WithLabelValues("version_not_match").Inc()
return nil
}
if ok := region.UpdateBuckets(buckets, old); ok {
return nil
}
}
region.UpdateBuckets(buckets)
bucketEventCounter.WithLabelValues("bucket_update_failed").Inc()
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type RegionInfo struct {
QueryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
version uint64
buckets unsafe.Pointer
}

Expand Down Expand Up @@ -414,18 +415,18 @@ func (r *RegionInfo) GetStat() *pdpb.RegionStat {
}

// UpdateBuckets sets the buckets of the region.
func (r *RegionInfo) UpdateBuckets(buckets *metapb.Buckets) {
func (r *RegionInfo) UpdateBuckets(buckets, old *metapb.Buckets) bool {
// the bucket can't be nil except in the test cases.
if buckets == nil {
return
return true
}
// only need to update bucket keys,versions.
// only need to update bucket keys, versions.
newBuckets := &metapb.Buckets{
RegionId: buckets.GetRegionId(),
Version: buckets.GetVersion(),
Keys: buckets.GetKeys(),
}
atomic.StorePointer(&r.buckets, unsafe.Pointer(newBuckets))
return atomic.CompareAndSwapPointer(&r.buckets, unsafe.Pointer(old), unsafe.Pointer(newBuckets))
}

// GetBuckets returns the buckets of the region.
Expand Down
15 changes: 7 additions & 8 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,24 +186,23 @@ func (s *testRegionInfoSuite) TestInherit(c *C) {

// bucket
data := []struct {
originExist bool
originBuckets *metapb.Buckets
buckets *metapb.Buckets
same bool
}{
{false, nil, nil, true},
{false, nil, &metapb.Buckets{RegionId: 1, Version: 2}, false},
{true, &metapb.Buckets{RegionId: 1, Version: 2}, &metapb.Buckets{RegionId: 1, Version: 3}, false},
{true, &metapb.Buckets{RegionId: 1, Version: 2}, nil, true},
{nil, nil, true},
{nil, &metapb.Buckets{RegionId: 1, Version: 2}, false},
{&metapb.Buckets{RegionId: 1, Version: 2}, &metapb.Buckets{RegionId: 1, Version: 3}, false},
{&metapb.Buckets{RegionId: 1, Version: 2}, nil, true},
}
for _, d := range data {
var origin *RegionInfo
if d.originExist {
if d.originBuckets != nil {
origin = NewRegionInfo(&metapb.Region{Id: 100}, nil)
origin.UpdateBuckets(d.originBuckets)
origin.UpdateBuckets(d.originBuckets, origin.GetBuckets())
}
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.UpdateBuckets(d.buckets)
r.UpdateBuckets(d.buckets, r.GetBuckets())
r.Inherit(origin)
if d.same {
c.Assert(r.GetBuckets(), DeepEquals, d.originBuckets)
Expand Down

0 comments on commit 93976a9

Please sign in to comment.