From dba39168e3cc8f92df396d0bd4b42d0f368dbab2 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 28 Sep 2021 12:25:41 +0800 Subject: [PATCH 1/8] allow cancel load region Signed-off-by: disksing --- server/cluster/cluster.go | 2 +- server/core/region_storage.go | 11 +++++++++++ server/core/storage.go | 13 +++++++------ server/core/storage_test.go | 9 +++++---- server/region_syncer/client.go | 12 ++++++------ server/region_syncer/server.go | 5 +++-- tests/server/cluster/cluster_test.go | 2 +- 7 files changed, 34 insertions(+), 20 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 30dbb1f5eb2..30ad5e390ce 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -299,7 +299,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) { start = time.Now() // used to load region from kv storage to cache storage. - if err := c.storage.LoadRegionsOnce(c.core.CheckAndPutRegion); err != nil { + if err := c.storage.LoadRegionsOnce(c.ctx, c.core.CheckAndPutRegion); err != nil { return nil, err } log.Info("load regions", diff --git a/server/core/region_storage.go b/server/core/region_storage.go index 68518303cfb..2b216a01d95 100644 --- a/server/core/region_storage.go +++ b/server/core/region_storage.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/encryption" @@ -130,6 +131,7 @@ func deleteRegion(kv kv.Base, region *metapb.Region) error { } func loadRegions( + ctx context.Context, kv kv.Base, encryptionKeyManager *encryptionkm.KeyManager, f func(region *RegionInfo) []*RegionInfo, @@ -142,6 +144,10 @@ func loadRegions( // a variable rangeLimit to work around. rangeLimit := maxKVRangeLimit for { + failpoint.Inject("slowLoadRegion", func() { + rangeLimit = 1 + time.Sleep(time.Second) + }) startKey := regionPath(nextID) _, res, err := kv.LoadRange(startKey, endKey, rangeLimit) if err != nil { @@ -150,6 +156,11 @@ func loadRegions( } return err } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } for _, s := range res { region := &metapb.Region{} diff --git a/server/core/storage.go b/server/core/storage.go index da9ef691322..bd40c9695f4 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -15,6 +15,7 @@ package core import ( + "context" "encoding/json" "fmt" "math" @@ -195,22 +196,22 @@ func (s *Storage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, e } // LoadRegions loads all regions from storage to RegionsInfo. -func (s *Storage) LoadRegions(f func(region *RegionInfo) []*RegionInfo) error { +func (s *Storage) LoadRegions(ctx context.Context, f func(region *RegionInfo) []*RegionInfo) error { if atomic.LoadInt32(&s.useRegionStorage) > 0 { - return loadRegions(s.regionStorage, s.encryptionKeyManager, f) + return loadRegions(ctx, s.regionStorage, s.encryptionKeyManager, f) } - return loadRegions(s.Base, s.encryptionKeyManager, f) + return loadRegions(ctx, s.Base, s.encryptionKeyManager, f) } // LoadRegionsOnce loads all regions from storage to RegionsInfo.Only load one time from regionStorage. -func (s *Storage) LoadRegionsOnce(f func(region *RegionInfo) []*RegionInfo) error { +func (s *Storage) LoadRegionsOnce(ctx context.Context, f func(region *RegionInfo) []*RegionInfo) error { if atomic.LoadInt32(&s.useRegionStorage) == 0 { - return loadRegions(s.Base, s.encryptionKeyManager, f) + return loadRegions(ctx, s.Base, s.encryptionKeyManager, f) } s.mu.Lock() defer s.mu.Unlock() if s.regionLoaded == 0 { - if err := loadRegions(s.regionStorage, s.encryptionKeyManager, f); err != nil { + if err := loadRegions(ctx, s.regionStorage, s.encryptionKeyManager, f); err != nil { return err } s.regionLoaded = 1 diff --git a/server/core/storage_test.go b/server/core/storage_test.go index a2e4a094d12..7fd272790d9 100644 --- a/server/core/storage_test.go +++ b/server/core/storage_test.go @@ -15,6 +15,7 @@ package core import ( + "context" "encoding/json" "fmt" "math" @@ -144,7 +145,7 @@ func (s *testKVSuite) TestLoadRegions(c *C) { n := 10 regions := mustSaveRegions(c, storage, n) - c.Assert(storage.LoadRegions(cache.SetRegion), IsNil) + c.Assert(storage.LoadRegions(context.Background(), cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { @@ -158,7 +159,7 @@ func (s *testKVSuite) TestLoadRegionsToCache(c *C) { n := 10 regions := mustSaveRegions(c, storage, n) - c.Assert(storage.LoadRegionsOnce(cache.SetRegion), IsNil) + c.Assert(storage.LoadRegionsOnce(context.Background(), cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { @@ -167,7 +168,7 @@ func (s *testKVSuite) TestLoadRegionsToCache(c *C) { n = 20 mustSaveRegions(c, storage, n) - c.Assert(storage.LoadRegionsOnce(cache.SetRegion), IsNil) + c.Assert(storage.LoadRegionsOnce(context.Background(), cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) } @@ -177,7 +178,7 @@ func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) { n := 1000 regions := mustSaveRegions(c, storage, n) - c.Assert(storage.LoadRegions(cache.SetRegion), IsNil) + c.Assert(storage.LoadRegions(context.Background(), cache.SetRegion), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { c.Assert(region, DeepEquals, regions[region.GetId()]) diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 3991fab5e17..90613875ac2 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -42,8 +42,8 @@ const ( func (s *RegionSyncer) StopSyncWithLeader() { s.reset() s.mu.Lock() - close(s.mu.closed) - s.mu.closed = make(chan struct{}) + s.mu.clientCancel() + s.mu.clientCtx, s.mu.clientCancel = context.WithCancel(context.Background()) s.mu.Unlock() s.wg.Wait() } @@ -130,14 +130,14 @@ var regionGuide = core.GenerateRegionGuideFunc(false) func (s *RegionSyncer) StartSyncWithLeader(addr string) { s.wg.Add(1) s.mu.RLock() - closed := s.mu.closed + ctx := s.mu.clientCtx s.mu.RUnlock() go func() { defer s.wg.Done() // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() storage := s.server.GetStorage() - err := storage.LoadRegionsOnce(bc.CheckAndPutRegion) + err := storage.LoadRegionsOnce(ctx, bc.CheckAndPutRegion) if err != nil { log.Warn("failed to load regions.", errs.ZapError(err)) } @@ -145,7 +145,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { var conn *grpc.ClientConn for { select { - case <-closed: + case <-ctx.Done(): return default: } @@ -161,7 +161,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { // Start syncing data. for { select { - case <-closed: + case <-ctx.Done(): return default: } diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 010bdc83314..3b022b7f633 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -73,7 +73,8 @@ type RegionSyncer struct { streams map[string]ServerStream regionSyncerCtx context.Context regionSyncerCancel context.CancelFunc - closed chan struct{} + clientCtx context.Context + clientCancel context.CancelFunc } server Server wg sync.WaitGroup @@ -95,7 +96,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) - syncer.mu.closed = make(chan struct{}) + syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(context.Background()) return syncer } diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index c1f2a49127e..afb7ccd0297 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -727,7 +727,7 @@ func (s *clusterTestSuite) TestLoadClusterInfo(c *C) { for _, region := range regions { c.Assert(storage.SaveRegion(region), IsNil) } - raftCluster.GetStorage().LoadRegionsOnce(raftCluster.GetCacheCluster().PutRegion) + raftCluster.GetStorage().LoadRegionsOnce(s.ctx, raftCluster.GetCacheCluster().PutRegion) c.Assert(raftCluster.GetRegionCount(), Equals, n) } From 201f21a3a5149f5c35d5ccac64dfd106a4d27b67 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 28 Sep 2021 15:26:23 +0800 Subject: [PATCH 2/8] add test Signed-off-by: disksing --- server/region_syncer/client_test.go | 104 ++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 server/region_syncer/client_test.go diff --git a/server/region_syncer/client_test.go b/server/region_syncer/client_test.go new file mode 100644 index 00000000000..c8c698bc3aa --- /dev/null +++ b/server/region_syncer/client_test.go @@ -0,0 +1,104 @@ +// Copyright 2018 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "context" + "os" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/grpcutil" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/kv" +) + +var _ = Suite(&testClientSuite{}) + +type testClientSuite struct{} + +// For issue https://github.com/tikv/pd/issues/3936 +func (t *testClientSuite) TestLoadRegion(c *C) { + tempDir, err := os.MkdirTemp(os.TempDir(), "region_syncer_load_region") + c.Assert(err, IsNil) + defer os.RemoveAll(tempDir) + rs, err := core.NewRegionStorage(context.Background(), tempDir, nil) + c.Assert(err, IsNil) + + server := &mockServer{ + ctx: context.Background(), + storage: core.NewStorage(kv.NewMemoryKV(), core.WithRegionStorage(rs)), + bc: core.NewBasicCluster(), + } + for i := 0; i < 30; i++ { + rs.SaveRegion(&metapb.Region{Id: uint64(i) + 1}) + } + c.Assert(failpoint.Enable("github.com/tikv/pd/server/core/slowLoadRegion", "return(true)"), IsNil) + defer func() { c.Assert(failpoint.Disable("github.com/tikv/pd/server/core/slowLoadRegion"), IsNil) }() + + rc := NewRegionSyncer(server) + start := time.Now() + rc.StartSyncWithLeader("") + time.Sleep(time.Second) + rc.StopSyncWithLeader() + c.Assert(time.Since(start), Greater, time.Second) // make sure failpoint is injected + c.Assert(time.Since(start), Less, time.Second*2) +} + +type mockServer struct { + ctx context.Context + member, leader *pdpb.Member + storage *core.Storage + bc *core.BasicCluster +} + +func (s *mockServer) LoopContext() context.Context { + return s.ctx +} + +func (s *mockServer) ClusterID() uint64 { + return 1 +} + +func (s *mockServer) GetMemberInfo() *pdpb.Member { + return s.member +} + +func (s *mockServer) GetLeader() *pdpb.Member { + return s.leader +} + +func (s *mockServer) GetStorage() *core.Storage { + return s.storage +} + +func (s *mockServer) Name() string { + return "mock-server" +} + +func (s *mockServer) GetRegions() []*core.RegionInfo { + return s.bc.GetRegions() +} + +func (s *mockServer) GetTLSConfig() *grpcutil.TLSConfig { + return &grpcutil.TLSConfig{} +} + +func (s *mockServer) GetBasicCluster() *core.BasicCluster { + return s.bc +} From 4081e7551796b9cdb21d475fab7d53ff04fa365e Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 28 Sep 2021 15:27:08 +0800 Subject: [PATCH 3/8] minor update Signed-off-by: disksing --- server/region_syncer/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/region_syncer/client_test.go b/server/region_syncer/client_test.go index c8c698bc3aa..e0268bf52a6 100644 --- a/server/region_syncer/client_test.go +++ b/server/region_syncer/client_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 TiKV Project Authors. +// Copyright 2021 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. From 5b69c4a2928aebcebbfffb5f422d91c266ab7add Mon Sep 17 00:00:00 2001 From: disksing Date: Sun, 10 Oct 2021 10:54:32 +0800 Subject: [PATCH 4/8] address comment Signed-off-by: disksing --- server/region_syncer/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 3b022b7f633..8f9c4fdefb4 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -96,7 +96,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) - syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(context.Background()) + syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(s.LoopContext()) return syncer } From e80e59118e0ab241ab593ef47b5d55ce7a8e97f0 Mon Sep 17 00:00:00 2001 From: disksing Date: Mon, 11 Oct 2021 13:32:13 +0800 Subject: [PATCH 5/8] Revert "address comment" This reverts commit 5b69c4a2928aebcebbfffb5f422d91c266ab7add. Signed-off-by: disksing --- server/region_syncer/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 8f9c4fdefb4..3b022b7f633 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -96,7 +96,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) - syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(s.LoopContext()) + syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(context.Background()) return syncer } From 94e3effaa30d6a4f372dba95aeec5257a79079ee Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 12 Oct 2021 15:23:14 +0800 Subject: [PATCH 6/8] add log Signed-off-by: disksing --- server/region_syncer/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 90613875ac2..e218217610f 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -137,7 +137,10 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() storage := s.server.GetStorage() + log.Info("region syncer start load region") + start := time.Now() err := storage.LoadRegionsOnce(ctx, bc.CheckAndPutRegion) + log.Info("region syncer finished load region", zap.Duration("time_cost", time.Since(start))) if err != nil { log.Warn("failed to load regions.", errs.ZapError(err)) } From 886b78874a9ba9eda78bb4aff9e9186264b63c40 Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Tue, 12 Oct 2021 16:48:19 +0800 Subject: [PATCH 7/8] merge clientCtx and regionSyncerCtx Signed-off-by: HunDunDM --- server/region_syncer/client.go | 45 +++++++++++----------------------- server/region_syncer/server.go | 9 +++---- 2 files changed, 17 insertions(+), 37 deletions(-) diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index e218217610f..bc01712dc61 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -41,10 +41,6 @@ const ( // StopSyncWithLeader stop to sync the region with leader. func (s *RegionSyncer) StopSyncWithLeader() { s.reset() - s.mu.Lock() - s.mu.clientCancel() - s.mu.clientCtx, s.mu.clientCancel = context.WithCancel(context.Background()) - s.mu.Unlock() s.wg.Wait() } @@ -52,19 +48,15 @@ func (s *RegionSyncer) reset() { s.mu.Lock() defer s.mu.Unlock() - if s.mu.regionSyncerCancel == nil { - return + if s.mu.clientCancel != nil { + s.mu.clientCancel() } - s.mu.regionSyncerCancel() - s.mu.regionSyncerCancel, s.mu.regionSyncerCtx = nil, nil + s.mu.clientCancel, s.mu.clientCtx = nil, nil } -func (s *RegionSyncer) establish(addr string) (*grpc.ClientConn, error) { - s.reset() - ctx, cancel := context.WithCancel(s.server.LoopContext()) +func (s *RegionSyncer) establish(ctx context.Context, addr string) (*grpc.ClientConn, error) { tlsCfg, err := s.tlsConfig.ToTLSConfig() if err != nil { - cancel() return nil, err } cc, err := grpcutil.GetClientConn( @@ -89,28 +81,16 @@ func (s *RegionSyncer) establish(addr string) (*grpc.ClientConn, error) { grpc.WithBlock(), ) if err != nil { - cancel() return nil, errors.WithStack(err) } - - s.mu.Lock() - s.mu.regionSyncerCtx, s.mu.regionSyncerCancel = ctx, cancel - s.mu.Unlock() return cc, nil } -func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) { +func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (ClientStream, error) { cli := pdpb.NewPDClient(conn) - var ctx context.Context - s.mu.RLock() - ctx = s.mu.regionSyncerCtx - s.mu.RUnlock() - if ctx == nil { - return nil, errors.New("syncRegion failed due to regionSyncerCtx is nil") - } syncStream, err := cli.SyncRegions(ctx) if err != nil { - return syncStream, errs.ErrGRPCCreateStream.Wrap(err).FastGenWithCause() + return nil, errs.ErrGRPCCreateStream.Wrap(err).FastGenWithCause() } err = syncStream.Send(&pdpb.SyncRegionRequest{ Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()}, @@ -118,7 +98,7 @@ func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) { StartIndex: s.history.GetNextIndex(), }) if err != nil { - return syncStream, errs.ErrGRPCSend.Wrap(err).FastGenWithCause() + return nil, errs.ErrGRPCSend.Wrap(err).FastGenWithCause() } return syncStream, nil @@ -129,9 +109,12 @@ var regionGuide = core.GenerateRegionGuideFunc(false) // StartSyncWithLeader starts to sync with leader. func (s *RegionSyncer) StartSyncWithLeader(addr string) { s.wg.Add(1) - s.mu.RLock() + + s.mu.Lock() + defer s.mu.Unlock() + s.mu.clientCtx, s.mu.clientCancel = context.WithCancel(s.server.LoopContext()) ctx := s.mu.clientCtx - s.mu.RUnlock() + go func() { defer s.wg.Done() // used to load region from kv storage to cache storage. @@ -152,7 +135,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { return default: } - conn, err = s.establish(addr) + conn, err = s.establish(ctx, addr) if err != nil { log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) continue @@ -169,7 +152,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { default: } - stream, err := s.syncRegion(conn) + stream, err := s.syncRegion(ctx, conn) if err != nil { if ev, ok := status.FromError(err); ok { if ev.Code() == codes.Canceled { diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 3b022b7f633..b6a5dfdcaba 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -70,11 +70,9 @@ type Server interface { type RegionSyncer struct { mu struct { sync.RWMutex - streams map[string]ServerStream - regionSyncerCtx context.Context - regionSyncerCancel context.CancelFunc - clientCtx context.Context - clientCancel context.CancelFunc + streams map[string]ServerStream + clientCtx context.Context + clientCancel context.CancelFunc } server Server wg sync.WaitGroup @@ -96,7 +94,6 @@ func NewRegionSyncer(s Server) *RegionSyncer { tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) - syncer.mu.clientCtx, syncer.mu.clientCancel = context.WithCancel(context.Background()) return syncer } From 3dba7043b1fe697f9ff1b10bd3735696ec8f3c24 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 12 Oct 2021 21:46:48 +0800 Subject: [PATCH 8/8] Update server/region_syncer/client.go Signed-off-by: disksing Co-authored-by: Ryan Leung --- server/region_syncer/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index bc01712dc61..2ba9ffd11cf 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -123,7 +123,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { log.Info("region syncer start load region") start := time.Now() err := storage.LoadRegionsOnce(ctx, bc.CheckAndPutRegion) - log.Info("region syncer finished load region", zap.Duration("time_cost", time.Since(start))) + log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start))) if err != nil { log.Warn("failed to load regions.", errs.ZapError(err)) }