From 93976a9b81c405496393274719d7e3a2d5c4dd18 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Thu, 31 Mar 2022 15:17:52 +0800 Subject: [PATCH] solve ABA Signed-off-by: bufferflies <1045931706@qq.com> --- server/cluster/cluster.go | 21 +++++++++++++++------ server/core/region.go | 9 +++++---- server/core/region_test.go | 15 +++++++-------- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c437e8b9aab0..6dbfea9314d5 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -613,13 +613,22 @@ func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error { bucketEventCounter.WithLabelValues("region_cache_miss").Inc() return errors.Errorf("region %v not found", buckets.GetRegionId()) } - - // region should not update if the version of the buckets is less than the old one. - if old := region.GetBuckets(); old != nil && old.Version >= buckets.Version { - bucketEventCounter.WithLabelValues("version_not_match").Inc() - return nil + // use CAS to update the bucket information. + // the two request(A:3,B:2) get the same region and need to update the buckets. + // the A will pass the check and set the version to 3, the B will fail because the region.bucket has changed. + // the retry should keep the old version and the new version will be set to the region.bucket, like two requests (A:2,B:3). + for retry := 0; retry < 3; retry++ { + old := region.GetBuckets() + // region should not update if the version of the buckets is less than the old one. + if old != nil && buckets.GetVersion() <= old.GetVersion() { + bucketEventCounter.WithLabelValues("version_not_match").Inc() + return nil + } + if ok := region.UpdateBuckets(buckets, old); ok { + return nil + } } - region.UpdateBuckets(buckets) + bucketEventCounter.WithLabelValues("bucket_update_failed").Inc() return nil } diff --git a/server/core/region.go b/server/core/region.go index ebf34a3a5dda..8336f645b6fc 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -61,6 +61,7 @@ type RegionInfo struct { QueryStats *pdpb.QueryStats flowRoundDivisor uint64 // buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version. + version uint64 buckets unsafe.Pointer } @@ -414,18 +415,18 @@ func (r *RegionInfo) GetStat() *pdpb.RegionStat { } // UpdateBuckets sets the buckets of the region. -func (r *RegionInfo) UpdateBuckets(buckets *metapb.Buckets) { +func (r *RegionInfo) UpdateBuckets(buckets, old *metapb.Buckets) bool { // the bucket can't be nil except in the test cases. if buckets == nil { - return + return true } - // only need to update bucket keys,versions. + // only need to update bucket keys, versions. newBuckets := &metapb.Buckets{ RegionId: buckets.GetRegionId(), Version: buckets.GetVersion(), Keys: buckets.GetKeys(), } - atomic.StorePointer(&r.buckets, unsafe.Pointer(newBuckets)) + return atomic.CompareAndSwapPointer(&r.buckets, unsafe.Pointer(old), unsafe.Pointer(newBuckets)) } // GetBuckets returns the buckets of the region. diff --git a/server/core/region_test.go b/server/core/region_test.go index cb776ec9e4c6..2b37ba73e25d 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -186,24 +186,23 @@ func (s *testRegionInfoSuite) TestInherit(c *C) { // bucket data := []struct { - originExist bool originBuckets *metapb.Buckets buckets *metapb.Buckets same bool }{ - {false, nil, nil, true}, - {false, nil, &metapb.Buckets{RegionId: 1, Version: 2}, false}, - {true, &metapb.Buckets{RegionId: 1, Version: 2}, &metapb.Buckets{RegionId: 1, Version: 3}, false}, - {true, &metapb.Buckets{RegionId: 1, Version: 2}, nil, true}, + {nil, nil, true}, + {nil, &metapb.Buckets{RegionId: 1, Version: 2}, false}, + {&metapb.Buckets{RegionId: 1, Version: 2}, &metapb.Buckets{RegionId: 1, Version: 3}, false}, + {&metapb.Buckets{RegionId: 1, Version: 2}, nil, true}, } for _, d := range data { var origin *RegionInfo - if d.originExist { + if d.originBuckets != nil { origin = NewRegionInfo(&metapb.Region{Id: 100}, nil) - origin.UpdateBuckets(d.originBuckets) + origin.UpdateBuckets(d.originBuckets, origin.GetBuckets()) } r := NewRegionInfo(&metapb.Region{Id: 100}, nil) - r.UpdateBuckets(d.buckets) + r.UpdateBuckets(d.buckets, r.GetBuckets()) r.Inherit(origin) if d.same { c.Assert(r.GetBuckets(), DeepEquals, d.originBuckets)