Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-4097-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Nov 24, 2021
2 parents 783302c + f11d191 commit 9fe3052
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 72 deletions.
8 changes: 4 additions & 4 deletions metrics/alertmanager/pd.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ groups:
summary: PD_pending_peer_region_count

- alert: PD_leader_change
expr: count( changes(pd_server_tso{type="save"}[10m]) > 0 ) >= 2
expr: count( changes(pd_tso_events{type="save"}[10m]) > 0 ) >= 2
for: 1m
labels:
env: ENV_LABELS_ENV
level: warning
expr: count( changes(pd_server_tso{type="save"}[10m]) > 0 ) >= 2
expr: count( changes(pd_tso_events{type="save"}[10m]) > 0 ) >= 2
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values:{{ $value }}'
value: '{{ $value }}'
Expand All @@ -146,12 +146,12 @@ groups:
summary: TiKV_space_used_more_than_80%

- alert: PD_system_time_slow
expr: changes(pd_server_tso{type="system_time_slow"}[10m]) >= 1
expr: changes(pd_tso_events{type="system_time_slow"}[10m]) >= 1
for: 1m
labels:
env: ENV_LABELS_ENV
level: warning
expr: changes(pd_server_tso{type="system_time_slow"}[10m]) >= 1
expr: changes(pd_tso_events{type="system_time_slow"}[10m]) >= 1
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}'
value: '{{ $value }}'
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
if err := c.storage.LoadRegionsOnce(c.core.CheckAndPutRegion); err != nil {
if err := c.storage.LoadRegionsOnce(c.ctx, c.core.CheckAndPutRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down Expand Up @@ -353,8 +353,8 @@ func (c *RaftCluster) Stop() {
}

c.running = false
close(c.quit)
c.coordinator.stop()
close(c.quit)
c.Unlock()
c.wg.Wait()
log.Info("raftcluster is stopped")
Expand Down
8 changes: 7 additions & 1 deletion server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *coordinator) patrolRegions() {
}
checkerIsBusy, ops := c.checkers.CheckRegion(region)
if checkerIsBusy {
continue
break
}
if len(ops) > 0 {
c.opController.AddWaitingOperator(ops...)
Expand Down Expand Up @@ -691,6 +691,12 @@ func (s *scheduleController) Stop() {

func (s *scheduleController) Schedule() []*operator.Operator {
for i := 0; i < maxScheduleRetries; i++ {
// no need to retry if schedule should stop to speed exit
select {
case <-s.ctx.Done():
return nil
default:
}
// If we have schedule, reset interval to the minimal interval.
if op := s.Scheduler.Schedule(s.cluster); op != nil {
s.nextInterval = s.Scheduler.GetMinInterval()
Expand Down
12 changes: 11 additions & 1 deletion server/core/region_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -118,7 +119,7 @@ func deleteRegion(kv kv.Base, region *metapb.Region) error {
return kv.Remove(regionPath(region.GetId()))
}

func loadRegions(kv kv.Base, f func(region *RegionInfo) []*RegionInfo) error {
func loadRegions(ctx context.Context, kv kv.Base, f func(region *RegionInfo) []*RegionInfo) error {
nextID := uint64(0)
endKey := regionPath(math.MaxUint64)

Expand All @@ -127,6 +128,10 @@ func loadRegions(kv kv.Base, f func(region *RegionInfo) []*RegionInfo) error {
// a variable rangeLimit to work around.
rangeLimit := maxKVRangeLimit
for {
failpoint.Inject("slowLoadRegion", func() {
rangeLimit = 1
time.Sleep(time.Second)
})
startKey := regionPath(nextID)
_, res, err := kv.LoadRange(startKey, endKey, rangeLimit)
if err != nil {
Expand All @@ -135,6 +140,11 @@ func loadRegions(kv kv.Base, f func(region *RegionInfo) []*RegionInfo) error {
}
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}

for _, s := range res {
region := &metapb.Region{}
Expand Down
13 changes: 7 additions & 6 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package core

import (
"context"
"encoding/json"
"fmt"
"math"
Expand Down Expand Up @@ -160,22 +161,22 @@ func (s *Storage) LoadRegion(regionID uint64, region *metapb.Region) (bool, erro
}

// LoadRegions loads all regions from storage to RegionsInfo.
func (s *Storage) LoadRegions(f func(region *RegionInfo) []*RegionInfo) error {
func (s *Storage) LoadRegions(ctx context.Context, f func(region *RegionInfo) []*RegionInfo) error {
if atomic.LoadInt32(&s.useRegionStorage) > 0 {
return loadRegions(s.regionStorage, f)
return loadRegions(ctx, s.regionStorage, f)
}
return loadRegions(s.Base, f)
return loadRegions(ctx, s.Base, f)
}

// LoadRegionsOnce loads all regions from storage to RegionsInfo.Only load one time from regionStorage.
func (s *Storage) LoadRegionsOnce(f func(region *RegionInfo) []*RegionInfo) error {
func (s *Storage) LoadRegionsOnce(ctx context.Context, f func(region *RegionInfo) []*RegionInfo) error {
if atomic.LoadInt32(&s.useRegionStorage) == 0 {
return loadRegions(s.Base, f)
return loadRegions(ctx, s.Base, f)
}
s.mu.Lock()
defer s.mu.Unlock()
if s.regionLoaded == 0 {
if err := loadRegions(s.regionStorage, f); err != nil {
if err := loadRegions(ctx, s.regionStorage, f); err != nil {
return err
}
s.regionLoaded = 1
Expand Down
9 changes: 5 additions & 4 deletions server/core/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package core

import (
"context"
"encoding/json"
"fmt"
"math"
Expand Down Expand Up @@ -142,7 +143,7 @@ func (s *testKVSuite) TestLoadRegions(c *C) {

n := 10
regions := mustSaveRegions(c, storage, n)
c.Assert(storage.LoadRegions(cache.SetRegion), IsNil)
c.Assert(storage.LoadRegions(context.Background(), cache.SetRegion), IsNil)

c.Assert(cache.GetRegionCount(), Equals, n)
for _, region := range cache.GetMetaRegions() {
Expand All @@ -156,7 +157,7 @@ func (s *testKVSuite) TestLoadRegionsToCache(c *C) {

n := 10
regions := mustSaveRegions(c, storage, n)
c.Assert(storage.LoadRegionsOnce(cache.SetRegion), IsNil)
c.Assert(storage.LoadRegionsOnce(context.Background(), cache.SetRegion), IsNil)

c.Assert(cache.GetRegionCount(), Equals, n)
for _, region := range cache.GetMetaRegions() {
Expand All @@ -165,7 +166,7 @@ func (s *testKVSuite) TestLoadRegionsToCache(c *C) {

n = 20
mustSaveRegions(c, storage, n)
c.Assert(storage.LoadRegionsOnce(cache.SetRegion), IsNil)
c.Assert(storage.LoadRegionsOnce(context.Background(), cache.SetRegion), IsNil)
c.Assert(cache.GetRegionCount(), Equals, n)
}

Expand All @@ -175,7 +176,7 @@ func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) {

n := 1000
regions := mustSaveRegions(c, storage, n)
c.Assert(storage.LoadRegions(cache.SetRegion), IsNil)
c.Assert(storage.LoadRegions(context.Background(), cache.SetRegion), IsNil)
c.Assert(cache.GetRegionCount(), Equals, n)
for _, region := range cache.GetMetaRegions() {
c.Assert(region, DeepEquals, regions[region.GetId()])
Expand Down
56 changes: 25 additions & 31 deletions server/region_syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

Expand Down Expand Up @@ -40,30 +41,22 @@ const (
// StopSyncWithLeader stop to sync the region with leader.
func (s *RegionSyncer) StopSyncWithLeader() {
s.reset()
s.Lock()
close(s.closed)
s.closed = make(chan struct{})
s.Unlock()
s.wg.Wait()
}

func (s *RegionSyncer) reset() {
s.Lock()
defer s.Unlock()
s.mu.Lock()
defer s.mu.Unlock()

if s.regionSyncerCancel == nil {
return
if s.mu.clientCancel != nil {
s.mu.clientCancel()
}
s.regionSyncerCancel()
s.regionSyncerCancel, s.regionSyncerCtx = nil, nil
s.mu.clientCancel, s.mu.clientCtx = nil, nil
}

func (s *RegionSyncer) establish(addr string) (*grpc.ClientConn, error) {
s.reset()
ctx, cancel := context.WithCancel(s.server.LoopContext())
func (s *RegionSyncer) establish(ctx context.Context, addr string) (*grpc.ClientConn, error) {
tlsCfg, err := s.securityConfig.ToTLSConfig()
if err != nil {
cancel()
return nil, err
}
cc, err := grpcutil.GetClientConn(
Expand All @@ -88,29 +81,24 @@ func (s *RegionSyncer) establish(addr string) (*grpc.ClientConn, error) {
grpc.WithBlock(),
)
if err != nil {
cancel()
return nil, errors.WithStack(err)
}

s.Lock()
s.regionSyncerCtx, s.regionSyncerCancel = ctx, cancel
s.Unlock()
return cc, nil
}

func (s *RegionSyncer) syncRegion(conn *grpc.ClientConn) (ClientStream, error) {
func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (ClientStream, error) {
cli := pdpb.NewPDClient(conn)
syncStream, err := cli.SyncRegions(s.regionSyncerCtx)
syncStream, err := cli.SyncRegions(ctx)
if err != nil {
return syncStream, errs.ErrGRPCCreateStream.Wrap(err).FastGenWithCause()
return nil, errs.ErrGRPCCreateStream.Wrap(err).FastGenWithCause()
}
err = syncStream.Send(&pdpb.SyncRegionRequest{
Header: &pdpb.RequestHeader{ClusterId: s.server.ClusterID()},
Member: s.server.GetMemberInfo(),
StartIndex: s.history.GetNextIndex(),
})
if err != nil {
return syncStream, errs.ErrGRPCSend.Wrap(err).FastGenWithCause()
return nil, errs.ErrGRPCSend.Wrap(err).FastGenWithCause()
}

return syncStream, nil
Expand All @@ -121,27 +109,33 @@ var regionGuide = core.GenerateRegionGuideFunc(false)
// StartSyncWithLeader starts to sync with leader.
func (s *RegionSyncer) StartSyncWithLeader(addr string) {
s.wg.Add(1)
s.RLock()
closed := s.closed
s.RUnlock()

s.mu.Lock()
defer s.mu.Unlock()
s.mu.clientCtx, s.mu.clientCancel = context.WithCancel(s.server.LoopContext())
ctx := s.mu.clientCtx

go func() {
defer s.wg.Done()
// used to load region from kv storage to cache storage.
bc := s.server.GetBasicCluster()
storage := s.server.GetStorage()
err := storage.LoadRegionsOnce(bc.CheckAndPutRegion)
log.Info("region syncer start load region")
start := time.Now()
err := storage.LoadRegionsOnce(ctx, bc.CheckAndPutRegion)
log.Info("region syncer finished load region", zap.Duration("time-cost", time.Since(start)))
if err != nil {
log.Warn("failed to load regions.", errs.ZapError(err))
}
// establish client.
var conn *grpc.ClientConn
for {
select {
case <-closed:
case <-ctx.Done():
return
default:
}
conn, err = s.establish(addr)
conn, err = s.establish(ctx, addr)
if err != nil {
log.Error("cannot establish connection with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err))
continue
Expand All @@ -153,12 +147,12 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
// Start syncing data.
for {
select {
case <-closed:
case <-ctx.Done():
return
default:
}

stream, err := s.syncRegion(conn)
stream, err := s.syncRegion(ctx, conn)
if err != nil {
if ev, ok := status.FromError(err); ok {
if ev.Code() == codes.Canceled {
Expand Down
Loading

0 comments on commit 9fe3052

Please sign in to comment.