Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix the issue that loadcluster does not remove overlap regions #2022

Merged
merged 9 commits into from
Dec 17, 2019
35 changes: 17 additions & 18 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ const (
defaultChangedRegionsLimit = 10000
)

// ErrRegionIsStale is error info for region is stale.
var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}

// Server is the interface for cluster.
type Server interface {
GetAllocator() *id.AllocatorImpl
Expand Down Expand Up @@ -242,7 +237,17 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {

start = time.Now()

if err := c.storage.LoadRegions(c.core.PutRegion); err != nil {
// used to load region from kv storage to cache storage.
putRegion := func(region *core.RegionInfo) []*core.RegionInfo {
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta()))
// return the state region to delete.
return []*core.RegionInfo{region}
}
return c.core.PutRegion(region)
}
if err := c.storage.LoadRegions(putRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down Expand Up @@ -394,14 +399,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
origin := c.GetRegion(region.GetID())
if origin == nil {
for _, item := range c.core.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
c.RUnlock()
return ErrRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
c.RUnlock()
return err
}
writeItems := c.CheckWriteStatus(region)
readItems := c.CheckReadStatus(region)
Expand All @@ -420,10 +421,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}
if r.GetVersion() > o.GetVersion() {
log.Info("region Version changed",
zap.Uint64("region-id", region.GetID()),
Expand Down Expand Up @@ -1454,6 +1451,8 @@ func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.Hot
return c.hotSpotCache.CheckRead(region, c.storesStats)
}

// TODO: remove me.
// only used in test.
func (c *RaftCluster) putRegion(region *core.RegionInfo) error {
c.Lock()
defer c.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error {
e := region.GetRegionEpoch()

if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() {
return ErrRegionIsStale(region, origin)
return core.ErrRegionIsStale(region, origin)
}

return nil
Expand Down
23 changes: 23 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,29 @@ func (bc *BasicCluster) TakeStore(storeID uint64) *StoreInfo {
return bc.Stores.TakeStore(storeID)
}

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
bc.RLock()
for _, item := range bc.Regions.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
bc.RUnlock()
return nil, ErrRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
origin := bc.Regions.GetRegion(region.GetID())
bc.RUnlock()
if origin == nil {
return nil, nil
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}
return origin, nil
}

// PutRegion put a region.
func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo {
bc.Lock()
Expand Down
7 changes: 7 additions & 0 deletions server/core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"net/http"

"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
)

var (
Expand Down Expand Up @@ -61,3 +63,8 @@ func (e StoreBlockedErr) Error() string {

// Code returns StoreBlockedCode
func (e StoreBlockedErr) Code() errcode.Code { return StoreBlockedCode }

// ErrRegionIsStale is error info for region is stale.
var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}
3 changes: 3 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ func (r *RegionInfo) GetID() uint64 {

// GetMeta returns the meta information of the region.
func (r *RegionInfo) GetMeta() *metapb.Region {
if r == nil {
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this?

Copy link
Contributor Author

@nolouch nolouch Dec 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not may panic in this line.

return r.meta
}

Expand Down
26 changes: 26 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,29 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
// merge case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add test with different merge order and epoch?

// region2 -> region1 -> region0
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[2])
c.Assert(err, IsNil)

// merge case
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// region3 -> region4
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[4])
c.Assert(err, IsNil)

// merge case
// region0 -> region4
// merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver
regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[4])
c.Assert(err, IsNil)
regions = regions[4:]
regionLen = len(regions)

// ensure flush to region storage, we use a duration larger than the
// region storage flush rate limit (3s).
time.Sleep(4 * time.Second)
Expand All @@ -104,6 +127,9 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
c.Assert(leaderServer, NotNil)
loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions()
c.Assert(len(loadRegions), Equals, regionLen)
for _, region := range regions {
c.Assert(leaderServer.GetRegionInfoByID(region.GetID()).GetMeta(), DeepEquals, region.GetMeta())
}
}

func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) {
Expand Down