From f0710fe506a24d903adaa187259a6f82a2dadf3b Mon Sep 17 00:00:00 2001 From: youjiali1995 Date: Thu, 5 Mar 2020 16:51:22 +0800 Subject: [PATCH 1/3] server: fix the race condition of region heartbeats from different regions Signed-off-by: youjiali1995 --- server/cluster/cluster.go | 54 ++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e2440f2e26c..a14e92cc547 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -466,32 +466,20 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } } - if saveKV && c.storage != nil { - if err := c.storage.SaveRegion(region.GetMeta()); err != nil { - // Not successfully saved to storage is not fatal, it only leads to longer warm-up - // after restart. Here we only log the error then go on updating cache. - log.Error("failed to save region to storage", - zap.Uint64("region-id", region.GetID()), - zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())), - zap.Error(err)) - } - regionEventCounter.WithLabelValues("update_kv").Inc() - select { - case c.changedRegions <- region: - default: - } - } - if len(writeItems) == 0 && len(readItems) == 0 && !saveCache && !isNew { + if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew { return nil } c.Lock() - defer c.Unlock() - if isNew { - c.prepareChecker.collect(region) - } - if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if _, err := c.core.PreCheckPutRegion(region); err != nil { + c.Unlock() + return err + } overlaps := c.core.PutRegion(region) if c.storage != nil { for _, item := range overlaps { @@ -522,6 +510,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionEventCounter.WithLabelValues("update_cache").Inc() } + if isNew { + c.prepareChecker.collect(region) + } + if c.regionStats != nil { c.regionStats.Observe(region, c.takeRegionStoresLocked(region)) } @@ -532,6 +524,26 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { for _, readItem := range readItems { c.hotSpotCache.Update(readItem) } + c.Unlock() + + // If there are concurrent heartbeats from the same region, the last write will win even if + // writes to storage in the critical area. So don't use mutex to protect it. + if saveKV && c.storage != nil { + if err := c.storage.SaveRegion(region.GetMeta()); err != nil { + // Not successfully saved to storage is not fatal, it only leads to longer warm-up + // after restart. Here we only log the error then go on updating cache. + log.Error("failed to save region to storage", + zap.Uint64("region-id", region.GetID()), + zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())), + zap.Error(err)) + } + regionEventCounter.WithLabelValues("update_kv").Inc() + select { + case c.changedRegions <- region: + default: + } + } + return nil } From 81d7bf6d05e961d7994db599d2e7d437ce44b99c Mon Sep 17 00:00:00 2001 From: youjiali1995 Date: Fri, 6 Mar 2020 16:05:34 +0800 Subject: [PATCH 2/3] add test Signed-off-by: youjiali1995 --- server/cluster/cluster.go | 4 ++++ server/cluster/cluster_test.go | 36 ++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a14e92cc547..71632821817 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -470,6 +470,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { return nil } + failpoint.Inject("concurrentRegionHeartbeat", func() { + time.Sleep(500 * time.Millisecond) + }) + c.Lock() if saveCache { // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 882d0c78882..2025a60b389 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -16,9 +16,12 @@ package cluster import ( "fmt" "math/rand" + "sync" "testing" + "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/mock/mockid" @@ -308,6 +311,39 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { } } +func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) { + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + + regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})} + regions = core.SplitRegions(regions) + heartbeatRegions(c, cluster, regions) + + // Merge regions manually + source, target := regions[0], regions[1] + target.GetMeta().StartKey = []byte{} + target.GetMeta().EndKey = []byte{} + source.GetMeta().GetRegionEpoch().Version++ + if source.GetMeta().GetRegionEpoch().GetVersion() > target.GetMeta().GetRegionEpoch().GetVersion() { + target.GetMeta().GetRegionEpoch().Version = source.GetMeta().GetRegionEpoch().GetVersion() + } + target.GetMeta().GetRegionEpoch().Version++ + + var wg sync.WaitGroup + wg.Add(1) + c.Assert(failpoint.Enable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat", "return(true)"), IsNil) + go func() { + defer wg.Done() + cluster.processRegionHeartbeat(source) + }() + time.Sleep(200 * time.Millisecond) + c.Assert(failpoint.Disable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat"), IsNil) + c.Assert(cluster.processRegionHeartbeat(target), IsNil) + wg.Wait() + checkRegion(c, cluster.GetRegionInfoByKey([]byte{}), target) +} + func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) { // Heartbeat and check region one by one. for _, r := range regions { From bd5b629a3301ef8e7a4a523eda1e7644e5013f29 Mon Sep 17 00:00:00 2001 From: youjiali1995 Date: Fri, 6 Mar 2020 16:11:29 +0800 Subject: [PATCH 3/3] shrink sleep duration Signed-off-by: youjiali1995 --- server/cluster/cluster_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 8fe33d93aea..c4868a7884e 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -337,7 +337,7 @@ func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) { defer wg.Done() cluster.processRegionHeartbeat(source) }() - time.Sleep(200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) c.Assert(failpoint.Disable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat"), IsNil) c.Assert(cluster.processRegionHeartbeat(target), IsNil) wg.Wait()