From 88641d0e4b646ae253d73908cb0b32935df79150 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 16 Dec 2019 02:19:57 +0800 Subject: [PATCH 1/5] *: fix the issue that loadcluster does not remove overlap regions Signed-off-by: nolouch --- server/cluster/cluster.go | 35 +++++++++---------- server/cluster/cluster_test.go | 2 +- server/core/basic_cluster.go | 23 ++++++++++++ server/core/errors.go | 7 ++++ server/core/region.go | 3 ++ .../region_syncer/region_syncer_test.go | 7 ++++ 6 files changed, 58 insertions(+), 19 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3d4eef97ef3..f5ce219a87e 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -51,11 +51,6 @@ const ( defaultChangedRegionsLimit = 10000 ) -// ErrRegionIsStale is error info for region is stale. -var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error { - return errors.Errorf("region is stale: region %v origin %v", region, origin) -} - // Server is the interface for cluster. type Server interface { GetAllocator() *id.AllocatorImpl @@ -242,7 +237,17 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() - if err := c.storage.LoadRegions(c.core.PutRegion); err != nil { + // used to load region from kv storage to cache storage. + putRegionToCache := func(region *core.RegionInfo) []*core.RegionInfo { + origin, err := c.core.PreCheckPutRegion(region) + if err != nil { + log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta())) + // return the state region to delete. + return []*core.RegionInfo{region} + } + return c.core.PutRegion(region) + } + if err := c.storage.LoadRegions(putRegionToCache); err != nil { return nil, err } log.Info("load regions", @@ -394,14 +399,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { // processRegionHeartbeat updates the region information. func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.RLock() - origin := c.GetRegion(region.GetID()) - if origin == nil { - for _, item := range c.core.GetOverlaps(region) { - if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() { - c.RUnlock() - return ErrRegionIsStale(region.GetMeta(), item.GetMeta()) - } - } + origin, err := c.core.PreCheckPutRegion(region) + if err != nil { + c.RUnlock() + return err } writeItems := c.CheckWriteStatus(region) readItems := c.CheckReadStatus(region) @@ -420,10 +421,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() - // Region meta is stale, return an error. - if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { - return ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) - } if r.GetVersion() > o.GetVersion() { log.Info("region Version changed", zap.Uint64("region-id", region.GetID()), @@ -1454,6 +1451,8 @@ func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.Hot return c.hotSpotCache.CheckRead(region, c.storesStats) } +// TODO: remove me. +// only used in test. func (c *RaftCluster) putRegion(region *core.RegionInfo) error { c.Lock() defer c.Unlock() diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index b000937a6bd..ef33c14165b 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -758,7 +758,7 @@ func checkStaleRegion(origin *metapb.Region, region *metapb.Region) error { e := region.GetRegionEpoch() if e.GetVersion() < o.GetVersion() || e.GetConfVer() < o.GetConfVer() { - return ErrRegionIsStale(region, origin) + return core.ErrRegionIsStale(region, origin) } return nil diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 3730ff5b85c..b53c403127c 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -282,6 +282,29 @@ func (bc *BasicCluster) TakeStore(storeID uint64) *StoreInfo { return bc.Stores.TakeStore(storeID) } +// PreCheckPutRegion checks if the region is valid to put. +func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) { + bc.RLock() + for _, item := range bc.Regions.GetOverlaps(region) { + if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() { + bc.RUnlock() + return nil, ErrRegionIsStale(region.GetMeta(), item.GetMeta()) + } + } + origin := bc.Regions.GetRegion(region.GetID()) + bc.RUnlock() + if origin == nil { + return nil, nil + } + r := region.GetRegionEpoch() + o := origin.GetRegionEpoch() + // Region meta is stale, return an error. + if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { + return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) + } + return origin, nil +} + // PutRegion put a region. func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo { bc.Lock() diff --git a/server/core/errors.go b/server/core/errors.go index a58b9adcc53..5d0e0ea48b6 100644 --- a/server/core/errors.go +++ b/server/core/errors.go @@ -21,6 +21,8 @@ import ( "net/http" "github.com/pingcap/errcode" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pkg/errors" ) var ( @@ -61,3 +63,8 @@ func (e StoreBlockedErr) Error() string { // Code returns StoreBlockedCode func (e StoreBlockedErr) Code() errcode.Code { return StoreBlockedCode } + +// ErrRegionIsStale is error info for region is stale. +var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error { + return errors.Errorf("region is stale: region %v origin %v", region, origin) +} diff --git a/server/core/region.go b/server/core/region.go index ad83611d43f..f30bf0a3b79 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -304,6 +304,9 @@ func (r *RegionInfo) GetID() uint64 { // GetMeta returns the meta information of the region. func (r *RegionInfo) GetMeta() *metapb.Region { + if r == nil { + return nil + } return r.meta } diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 45f3044fc35..c9482995640 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -94,6 +94,13 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } + // merge case + // region2-> region 1 -> region 0 + regions[2] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[2]) + c.Assert(err, IsNil) + regionLen -= 2 + // ensure flush to region storage, we use a duration larger than the // region storage flush rate limit (3s). time.Sleep(4 * time.Second) From de3d6f2ead7719c9ac01c5607c16dc92aa211308 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 16 Dec 2019 23:24:47 +0800 Subject: [PATCH 2/5] address comments Signed-off-by: nolouch --- pkg/ui/pd-web | 2 +- server/cluster/cluster.go | 4 ++-- .../region_syncer/region_syncer_test.go | 22 +++++++++++++++++-- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pkg/ui/pd-web b/pkg/ui/pd-web index df8bf7a0171..62112c1e5a5 160000 --- a/pkg/ui/pd-web +++ b/pkg/ui/pd-web @@ -1 +1 @@ -Subproject commit df8bf7a0171c470a0f1670f2b5bb21b1bf4c6546 +Subproject commit 62112c1e5a5c0918f5ee469237c0d4b91fb67eb2 diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index f5ce219a87e..8b14345fcf2 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -238,7 +238,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() // used to load region from kv storage to cache storage. - putRegionToCache := func(region *core.RegionInfo) []*core.RegionInfo { + putRegion := func(region *core.RegionInfo) []*core.RegionInfo { origin, err := c.core.PreCheckPutRegion(region) if err != nil { log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta())) @@ -247,7 +247,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { } return c.core.PutRegion(region) } - if err := c.storage.LoadRegions(putRegionToCache); err != nil { + if err := c.storage.LoadRegions(putRegion); err != nil { return nil, err } log.Info("load regions", diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index c9482995640..89b83d055fa 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -95,12 +95,27 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { c.Assert(err, IsNil) } // merge case - // region2-> region 1 -> region 0 - regions[2] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) + // region2 -> region1 -> region0 + regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[2]) c.Assert(err, IsNil) regionLen -= 2 + // merge case + // region3 -> region4 + regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[4]) + c.Assert(err, IsNil) + regionLen -= 1 + + // merge case + // region0 -> region4 + regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) + err = rc.HandleRegionHeartbeat(regions[4]) + c.Assert(err, IsNil) + regionLen -= 1 + regions = regions[4:] + // ensure flush to region storage, we use a duration larger than the // region storage flush rate limit (3s). time.Sleep(4 * time.Second) @@ -111,6 +126,9 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { c.Assert(leaderServer, NotNil) loadRegions := leaderServer.GetServer().GetRaftCluster().GetRegions() c.Assert(len(loadRegions), Equals, regionLen) + for _, region := range regions { + c.Assert(leaderServer.GetRegionInfoByID(region.GetID()).GetMeta(), DeepEquals, region.GetMeta()) + } } func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) { From 8fb7e0faa337e38bf9d2645f7a8ff2c493c680ab Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 16 Dec 2019 23:26:41 +0800 Subject: [PATCH 3/5] fix ci Signed-off-by: nolouch --- tests/server/region_syncer/region_syncer_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 89b83d055fa..0d15cf79568 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -99,22 +99,20 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[2]) c.Assert(err, IsNil) - regionLen -= 2 // merge case // region3 -> region4 regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[4]) c.Assert(err, IsNil) - regionLen -= 1 // merge case // region0 -> region4 regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[4]) c.Assert(err, IsNil) - regionLen -= 1 regions = regions[4:] + regionLen = len(regions) // ensure flush to region storage, we use a duration larger than the // region storage flush rate limit (3s). From 62401c11ae162aa194f98f82e15c23f812dc78b8 Mon Sep 17 00:00:00 2001 From: nolouch Date: Mon, 16 Dec 2019 23:32:55 +0800 Subject: [PATCH 4/5] do not update pd-web Signed-off-by: nolouch --- pkg/ui/pd-web | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ui/pd-web b/pkg/ui/pd-web index 62112c1e5a5..df8bf7a0171 160000 --- a/pkg/ui/pd-web +++ b/pkg/ui/pd-web @@ -1 +1 @@ -Subproject commit 62112c1e5a5c0918f5ee469237c0d4b91fb67eb2 +Subproject commit df8bf7a0171c470a0f1670f2b5bb21b1bf4c6546 From 375a57ff2ee8fd7aa5e128cc670e09de6d8b2a5d Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 17 Dec 2019 13:11:47 +0800 Subject: [PATCH 5/5] address comments Signed-off-by: nolouch --- tests/server/region_syncer/region_syncer_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index 0d15cf79568..9588deb2ed5 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -96,18 +96,21 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { } // merge case // region2 -> region1 -> region0 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver regions[0] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[2]) c.Assert(err, IsNil) // merge case // region3 -> region4 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver regions[4] = regions[3].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[4]) c.Assert(err, IsNil) // merge case // region0 -> region4 + // merge A to B will increases version to max(versionA, versionB)+1, but does not increase conver regions[4] = regions[0].Clone(core.WithEndKey(regions[4].GetEndKey()), core.WithIncVersion(), core.WithIncVersion()) err = rc.HandleRegionHeartbeat(regions[4]) c.Assert(err, IsNil)