Skip to content

Commit

Permalink
server: fix the race condition of region heartbeats from different re…
Browse files Browse the repository at this point in the history
…gions (#2198)

Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 authored Mar 10, 2020
1 parent 52f003b commit 06ec3cd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 23 deletions.
62 changes: 39 additions & 23 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,34 +493,24 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
}

if saveKV && c.storage != nil {
if err := c.storage.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Error("failed to save region to storage",
zap.Uint64("region-id", region.GetID()),
zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
zap.Error(err))
}
regionEventCounter.WithLabelValues("update_kv").Inc()
}
if saveKV || statsChange {
select {
case c.changedRegions <- region:
default:
}
}
if len(writeItems) == 0 && len(readItems) == 0 && !saveCache && !isNew {
if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew {
return nil
}

c.Lock()
defer c.Unlock()
if isNew {
c.prepareChecker.collect(region)
}
failpoint.Inject("concurrentRegionHeartbeat", func() {
time.Sleep(500 * time.Millisecond)
})

c.Lock()
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
// check its validation again here.
//
// However it can't solve the race condition of concurrent heartbeats from the same region.
if _, err := c.core.PreCheckPutRegion(region); err != nil {
c.Unlock()
return err
}
overlaps := c.core.PutRegion(region)
if c.storage != nil {
for _, item := range overlaps {
Expand Down Expand Up @@ -551,6 +541,10 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if isNew {
c.prepareChecker.collect(region)
}

if c.regionStats != nil {
c.regionStats.Observe(region, c.takeRegionStoresLocked(region))
}
Expand All @@ -561,6 +555,28 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
for _, readItem := range readItems {
c.hotSpotCache.Update(readItem)
}
c.Unlock()

// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
if saveKV && c.storage != nil {
if err := c.storage.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Error("failed to save region to storage",
zap.Uint64("region-id", region.GetID()),
zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
zap.Error(err))
}
regionEventCounter.WithLabelValues("update_kv").Inc()
}
if saveKV || statsChange {
select {
case c.changedRegions <- region:
default:
}
}

return nil
}

Expand Down
36 changes: 36 additions & 0 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package cluster
import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/v4/pkg/mock/mockid"
Expand Down Expand Up @@ -308,6 +311,39 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
}
}

func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV()), core.NewBasicCluster())

regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})}
regions = core.SplitRegions(regions)
heartbeatRegions(c, cluster, regions)

// Merge regions manually
source, target := regions[0], regions[1]
target.GetMeta().StartKey = []byte{}
target.GetMeta().EndKey = []byte{}
source.GetMeta().GetRegionEpoch().Version++
if source.GetMeta().GetRegionEpoch().GetVersion() > target.GetMeta().GetRegionEpoch().GetVersion() {
target.GetMeta().GetRegionEpoch().Version = source.GetMeta().GetRegionEpoch().GetVersion()
}
target.GetMeta().GetRegionEpoch().Version++

var wg sync.WaitGroup
wg.Add(1)
c.Assert(failpoint.Enable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat", "return(true)"), IsNil)
go func() {
defer wg.Done()
cluster.processRegionHeartbeat(source)
}()
time.Sleep(100 * time.Millisecond)
c.Assert(failpoint.Disable("github.com/pingcap/pd/server/cluster/concurrentRegionHeartbeat"), IsNil)
c.Assert(cluster.processRegionHeartbeat(target), IsNil)
wg.Wait()
checkRegion(c, cluster.GetRegionInfoByKey([]byte{}), target)
}

func heartbeatRegions(c *C, cluster *RaftCluster, regions []*core.RegionInfo) {
// Heartbeat and check region one by one.
for _, r := range regions {
Expand Down

0 comments on commit 06ec3cd

Please sign in to comment.