From 4566481aa2e7d951e1ca34b17855cba2efcccd2c Mon Sep 17 00:00:00 2001 From: Lei Zhao Date: Tue, 10 Mar 2020 14:59:03 +0800 Subject: [PATCH] server: fix the race condition of region heartbeats from different regions (#2198) Signed-off-by: youjiali1995 --- server/cluster/cluster.go | 58 ++++++++++++++++++++++------------ server/cluster/cluster_test.go | 36 +++++++++++++++++++++ 2 files changed, 73 insertions(+), 21 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 27740e46b7d..03a68cf6802 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -482,32 +482,24 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } } - if saveKV && c.storage != nil { - if err := c.storage.SaveRegion(region.GetMeta()); err != nil { - // Not successfully saved to storage is not fatal, it only leads to longer warm-up - // after restart. Here we only log the error then go on updating cache. - log.Error("failed to save region to storage", - zap.Uint64("region-id", region.GetID()), - zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())), - zap.Error(err)) - } - regionEventCounter.WithLabelValues("update_kv").Inc() - select { - case c.changedRegions <- region: - default: - } - } - if len(writeItems) == 0 && len(readItems) == 0 && !saveCache && !isNew { + if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew { return nil } - c.Lock() - defer c.Unlock() - if isNew { - c.prepareChecker.collect(region) - } + failpoint.Inject("concurrentRegionHeartbeat", func() { + time.Sleep(500 * time.Millisecond) + }) + c.Lock() if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if _, err := c.core.PreCheckPutRegion(region); err != nil { + c.Unlock() + return err + } overlaps := c.core.PutRegion(region) if c.storage != nil { for _, item := range overlaps { @@ -538,6 +530,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { regionEventCounter.WithLabelValues("update_cache").Inc() } + if isNew { + c.prepareChecker.collect(region) + } + if c.regionStats != nil { c.regionStats.Observe(region, c.takeRegionStoresLocked(region)) } @@ -548,6 +544,26 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { for _, readItem := range readItems { c.hotSpotCache.Update(readItem) } + c.Unlock() + + // If there are concurrent heartbeats from the same region, the last write will win even if + // writes to storage in the critical area. So don't use mutex to protect it. + if saveKV && c.storage != nil { + if err := c.storage.SaveRegion(region.GetMeta()); err != nil { + // Not successfully saved to storage is not fatal, it only leads to longer warm-up + // after restart. Here we only log the error then go on updating cache. + log.Error("failed to save region to storage", + zap.Uint64("region-id", region.GetID()), + zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())), + zap.Error(err)) + } + regionEventCounter.WithLabelValues("update_kv").Inc() + select { + case c.changedRegions <- region: + default: + } + } + return nil } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 5777a4c07f1..cf8b71b9e23 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -16,9 +16,12 @@ package cluster import ( "fmt" "math/rand" + "sync" "testing" + "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/v3/pkg/mock/mockid" @@ -308,6 +311,39 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { } } +func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) { + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster()) + + regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})} + regions = core.SplitRegions(regions) + heartbeatRegions(c, cluster, regions) + + // Merge regions manually + source, target := regions[0], regions[1] + target.GetMeta().StartKey = []byte{} + target.GetMeta().EndKey = []byte{} + source.GetMeta().GetRegionEpoch().Version++ + if source.GetMeta().GetRegionEpoch().GetVersion() > target.GetMeta().GetRegionEpoch().GetVersion() { + target.GetMeta().GetRegionEpoch().Version = source.GetMeta().GetRegionEpoch().GetVersion() + } + target.GetMeta().GetRegionEpoch().Version++ + + var wg sync.WaitGroup + wg.Add(1) + c.Assert(failpoint.Enable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat", "return(true)"), IsNil) + go func() { + defer wg.Done() + cluster.processRegionHeartbeat(source) + }() + time.Sleep(100 * time.Millisecond) + c.Assert(failpoint.Disable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat"), IsNil) + c.Assert(cluster.processRegionHeartbeat(target), IsNil) + wg.Wait() + checkRegion(c, cluster.GetRegionInfoByKey([]byte{}), target) +} + func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) { // Heartbeat and check region one by one. for _, r := range regions {