Skip to content

Commit

Permalink
erver: fix the race condition of region heartbeats from different reg…
Browse files Browse the repository at this point in the history
…ions (#2198) (#2233)

Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 authored Mar 12, 2020
1 parent 94bb17c commit 84c952a
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 21 deletions.
59 changes: 38 additions & 21 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/pingcap/log"
Expand Down Expand Up @@ -600,32 +601,24 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
}
}

if saveKV && c.kv != nil {
if err := c.kv.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to kv is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Error("fail to save region to kv",
zap.Uint64("region-id", region.GetID()),
zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
zap.Error(err))
}
regionEventCounter.WithLabelValues("update_kv").Inc()
select {
case c.changedRegions <- region:
default:
}
}
if !isWriteUpdate && !isReadUpdate && !saveCache && !isNew {
if !isWriteUpdate && !isReadUpdate && !saveKV && !saveCache && !isNew {
return nil
}

c.Lock()
defer c.Unlock()
if isNew {
c.prepareChecker.collect(region)
}
failpoint.Inject("concurrentRegionHeartbeat", func() {
time.Sleep(500 * time.Millisecond)
})

c.Lock()
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if _, err := c.core.PreCheckPutRegion(region); err != nil {
c.Unlock()
return err
}
overlaps := c.core.Regions.SetRegion(region)
if c.kv != nil {
for _, item := range overlaps {
Expand Down Expand Up @@ -656,6 +649,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if isNew {
c.prepareChecker.collect(region)
}

if c.regionStats != nil {
c.regionStats.Observe(region, c.takeRegionStoresLocked(region))
}
Expand All @@ -667,6 +664,26 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
if isReadUpdate {
c.hotSpotCache.Update(key, readItem, statistics.ReadFlow)
}
c.Unlock()

// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
if saveKV && c.kv != nil {
if err := c.kv.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to kv is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Error("fail to save region to kv",
zap.Uint64("region-id", region.GetID()),
zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
zap.Error(err))
}
regionEventCounter.WithLabelValues("update_kv").Inc()
select {
case c.changedRegions <- region:
default:
}
}

return nil
}

Expand Down
37 changes: 37 additions & 0 deletions server/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ package server

import (
"math/rand"
"sync"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/v3/pkg/mock/mockid"
Expand Down Expand Up @@ -557,6 +560,40 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
}
}

func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV()))

region := core.NewRegion([]byte{}, []byte{})
regions := core.SplitRegions([]*metapb.Region{region})
heartbeatRegions(c, cluster, regions)

// Merge regions manually
source, target := regions[0], regions[1]
target.StartKey = []byte{}
target.EndKey = []byte{}
source.GetRegionEpoch().Version++
if source.GetRegionEpoch().GetVersion() > target.GetRegionEpoch().GetVersion() {
target.GetRegionEpoch().Version = source.GetRegionEpoch().GetVersion()
}
target.GetRegionEpoch().Version++
sourceRegionInfo, targetRegionInfo := core.NewRegionInfo(source, nil), core.NewRegionInfo(target, nil)

var wg sync.WaitGroup
wg.Add(1)
c.Assert(failpoint.Enable("github.com/pingcap/pd/server/concurrentRegionHeartbeat", "return(true)"), IsNil)
go func() {
defer wg.Done()
cluster.handleRegionHeartbeat(sourceRegionInfo)
}()
time.Sleep(100 * time.Millisecond)
c.Assert(failpoint.Disable("github.com/pingcap/pd/server/concurrentRegionHeartbeat"), IsNil)
c.Assert(cluster.handleRegionHeartbeat(targetRegionInfo), IsNil)
wg.Wait()
checkRegion(c, cluster.searchRegion([]byte{}), targetRegionInfo)
}

func heartbeatRegions(c *C, cluster *clusterInfo, regions []*metapb.Region) {
// Heartbeat and check region one by one.
for _, region := range regions {
Expand Down

0 comments on commit 84c952a

Please sign in to comment.