From 132da6af90c9ffa936b90c50ed088c35f672a1bd Mon Sep 17 00:00:00 2001 From: youjiali1995 Date: Thu, 12 Mar 2020 12:08:02 +0800 Subject: [PATCH] erver: fix the race condition of region heartbeats from different regions (#2198) Signed-off-by: youjiali1995 --- server/cluster_info.go | 59 ++++++++++++++++++++++++------------- server/cluster_info_test.go | 37 +++++++++++++++++++++++ 2 files changed, 75 insertions(+), 21 deletions(-) diff --git a/server/cluster_info.go b/server/cluster_info.go index b2bab4afec5..13dab77aede 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -19,6 +19,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/gogo/protobuf/proto" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" log "github.com/pingcap/log" @@ -600,32 +601,24 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { } } - if saveKV && c.kv != nil { - if err := c.kv.SaveRegion(region.GetMeta()); err != nil { - // Not successfully saved to kv is not fatal, it only leads to longer warm-up - // after restart. Here we only log the error then go on updating cache. - log.Error("fail to save region to kv", - zap.Uint64("region-id", region.GetID()), - zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())), - zap.Error(err)) - } - regionEventCounter.WithLabelValues("update_kv").Inc() - select { - case c.changedRegions <- region: - default: - } - } - if !isWriteUpdate && !isReadUpdate && !saveCache && !isNew { + if !isWriteUpdate && !isReadUpdate && !saveKV && !saveCache && !isNew { return nil } - c.Lock() - defer c.Unlock() - if isNew { - c.prepareChecker.collect(region) - } + failpoint.Inject("concurrentRegionHeartbeat", func() { + time.Sleep(500 * time.Millisecond) + }) + c.Lock() if saveCache { + // To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one, + // check its validation again here. + // + // However it can't solve the race condition of concurrent heartbeats from the same region. + if _, err := c.core.PreCheckPutRegion(region); err != nil { + c.Unlock() + return err + } overlaps := c.core.Regions.SetRegion(region) if c.kv != nil { for _, item := range overlaps { @@ -656,6 +649,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { regionEventCounter.WithLabelValues("update_cache").Inc() } + if isNew { + c.prepareChecker.collect(region) + } + if c.regionStats != nil { c.regionStats.Observe(region, c.takeRegionStoresLocked(region)) } @@ -667,6 +664,26 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { if isReadUpdate { c.hotSpotCache.Update(key, readItem, statistics.ReadFlow) } + c.Unlock() + + // If there are concurrent heartbeats from the same region, the last write will win even if + // writes to storage in the critical area. So don't use mutex to protect it. + if saveKV && c.kv != nil { + if err := c.kv.SaveRegion(region.GetMeta()); err != nil { + // Not successfully saved to kv is not fatal, it only leads to longer warm-up + // after restart. Here we only log the error then go on updating cache. + log.Error("fail to save region to kv", + zap.Uint64("region-id", region.GetID()), + zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())), + zap.Error(err)) + } + regionEventCounter.WithLabelValues("update_kv").Inc() + select { + case c.changedRegions <- region: + default: + } + } + return nil } diff --git a/server/cluster_info_test.go b/server/cluster_info_test.go index 96e6ea98f2b..a51120ede99 100644 --- a/server/cluster_info_test.go +++ b/server/cluster_info_test.go @@ -15,8 +15,11 @@ package server import ( "math/rand" + "sync" + "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/v3/pkg/mock/mockid" @@ -557,6 +560,40 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { } } +func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) { + _, opt, err := newTestScheduleConfig() + c.Assert(err, IsNil) + cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + + region := core.NewRegion([]byte{}, []byte{}) + regions := core.SplitRegions([]*metapb.Region{region}) + heartbeatRegions(c, cluster, regions) + + // Merge regions manually + source, target := regions[0], regions[1] + target.StartKey = []byte{} + target.EndKey = []byte{} + source.GetRegionEpoch().Version++ + if source.GetRegionEpoch().GetVersion() > target.GetRegionEpoch().GetVersion() { + target.GetRegionEpoch().Version = source.GetRegionEpoch().GetVersion() + } + target.GetRegionEpoch().Version++ + sourceRegionInfo, targetRegionInfo := core.NewRegionInfo(source, nil), core.NewRegionInfo(target, nil) + + var wg sync.WaitGroup + wg.Add(1) + c.Assert(failpoint.Enable("github.com/pingcap/pd/server/concurrentRegionHeartbeat", "return(true)"), IsNil) + go func() { + defer wg.Done() + cluster.handleRegionHeartbeat(sourceRegionInfo) + }() + time.Sleep(100 * time.Millisecond) + c.Assert(failpoint.Disable("github.com/pingcap/pd/server/concurrentRegionHeartbeat"), IsNil) + c.Assert(cluster.handleRegionHeartbeat(targetRegionInfo), IsNil) + wg.Wait() + checkRegion(c, cluster.searchRegion([]byte{}), targetRegionInfo) +} + func heartbeatRegions(c *C, cluster *clusterInfo, regions []*metapb.Region) { // Heartbeat and check region one by one. for _, region := range regions {