Skip to content

Commit

Permalink
cherry pick tikv#3305 to release-3.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
rleungx authored and ti-srebot committed Dec 28, 2020
1 parent 635fef2 commit bb5b2f9
Show file tree
Hide file tree
Showing 3 changed files with 311 additions and 0 deletions.
118 changes: 118 additions & 0 deletions server/id/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package id

import (
"path"
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/etcdutil"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/kv"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// Allocator is the allocator to generate unique ID.
type Allocator interface {
Alloc() (uint64, error)
}

const allocStep = uint64(1000)

// AllocatorImpl is used to allocate ID.
type AllocatorImpl struct {
mu sync.Mutex
base uint64
end uint64

client *clientv3.Client
rootPath string
member string
}

// NewAllocatorImpl creates a new IDAllocator.
func NewAllocatorImpl(client *clientv3.Client, rootPath string, member string) *AllocatorImpl {
return &AllocatorImpl{client: client, rootPath: rootPath, member: member}
}

// Alloc returns a new id.
func (alloc *AllocatorImpl) Alloc() (uint64, error) {
alloc.mu.Lock()
defer alloc.mu.Unlock()

if alloc.base == alloc.end {
err := alloc.Generate()
if err != nil {
return 0, err
}

alloc.base = alloc.end - allocStep
}

alloc.base++

return alloc.base, nil
}

// Generate synchronizes and generates id range.
func (alloc *AllocatorImpl) Generate() error {
key := alloc.getAllocIDPath()
value, err := etcdutil.GetValue(alloc.client, key)
if err != nil {
return err
}

var (
cmp clientv3.Cmp
end uint64
)

if value == nil {
// create the key
cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
} else {
// update the key
end, err = typeutil.BytesToUint64(value)
if err != nil {
return err
}

cmp = clientv3.Compare(clientv3.Value(key), "=", string(value))
}

end += allocStep
value = typeutil.Uint64ToBytes(end)
txn := kv.NewSlowLogTxn(alloc.client)
leaderPath := path.Join(alloc.rootPath, "leader")
t := txn.If(append([]clientv3.Cmp{cmp}, clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member))...)
resp, err := t.Then(clientv3.OpPut(key, string(value))).Commit()
if err != nil {
return errs.ErrEtcdTxn.Wrap(err).GenWithStackByArgs()
}
if !resp.Succeeded {
return errs.ErrEtcdTxn.FastGenByArgs()
}

log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end))
idGauge.WithLabelValues("idalloc").Set(float64(end))
alloc.end = end
return nil
}

func (alloc *AllocatorImpl) getAllocIDPath() string {
return path.Join(alloc.rootPath, "alloc_id")
}
45 changes: 45 additions & 0 deletions server/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,48 @@ func (s *testAllocIDSuite) TestCommand(c *C) {
last = resp.GetId()
}
}

func (s *testAllocIDSuite) TestMonotonicID(c *C) {
var err error
cluster, err := tests.NewTestCluster(s.ctx, 2)
defer cluster.Destroy()
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

leaderServer := cluster.GetServer(cluster.GetLeader())
var last1 uint64
for i := uint64(0); i < 10; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last1)
last1 = id
}
err = cluster.ResignLeader()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer = cluster.GetServer(cluster.GetLeader())
var last2 uint64
for i := uint64(0); i < 10; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last2)
last2 = id
}
err = cluster.ResignLeader()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer = cluster.GetServer(cluster.GetLeader())
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last2)
var last3 uint64
for i := uint64(0); i < 1000; i++ {
id, err := leaderServer.GetAllocator().Alloc()
c.Assert(err, IsNil)
c.Assert(id, Greater, last3)
last3 = id
}
}
148 changes: 148 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,10 +796,158 @@ func (s *Server) SetMemberLeaderPriority(id uint64, priority int) error {
return nil
}

<<<<<<< HEAD
// DeleteMemberLeaderPriority removes a member's priority config.
func (s *Server) DeleteMemberLeaderPriority(id uint64) error {
key := s.getMemberLeaderPriorityPath(id)
res, err := s.leaderTxn().Then(clientv3.OpDelete(key)).Commit()
=======
func (s *Server) leaderLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

for {
if s.IsClosed() {
log.Info("server is closed, return pd leader loop")
return
}

leader, rev, checkAgain := s.member.CheckLeader()
if checkAgain {
continue
}
if leader != nil {
err := s.reloadConfigFromKV()
if err != nil {
log.Error("reload config failed", errs.ZapError(err))
continue
}
// Check the cluster dc-location after the PD leader is elected
go s.tsoAllocatorManager.ClusterDCLocationChecker()
syncer := s.cluster.GetRegionSyncer()
if s.persistOptions.IsUseRegionStorage() {
syncer.StartSyncWithLeader(leader.GetClientUrls()[0])
}
log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader))
// WatchLeader will keep looping and never return unless the PD leader has changed.
s.member.WatchLeader(s.serverLoopCtx, leader, rev)
syncer.StopSyncWithLeader()
log.Info("pd leader has changed, try to re-campaign a pd leader")
}

// To make sure the etcd leader and PD leader are on the same server.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
log.Info("skip campaigning of pd leader and check later",
zap.String("server-name", s.Name()),
zap.Uint64("etcd-leader-id", etcdLeader),
zap.Uint64("member-id", s.member.ID()))
time.Sleep(200 * time.Millisecond)
continue
}
s.campaignLeader()
}
}

func (s *Server) campaignLeader() {
log.Info("start to campaign pd leader", zap.String("campaign-pd-leader-name", s.Name()))
if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {
log.Error("campaign pd leader meet error", errs.ZapError(err))
return
}

// Start keepalive the leadership and enable TSO service.
// TSO service is strictly enabled/disabled by PD leader lease for 2 reasons:
// 1. lease based approach is not affected by thread pause, slow runtime schedule, etc.
// 2. load region could be slow. Based on lease we can recover TSO service faster.

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
defer s.member.ResetLeader()
// maintain the PD leader
go s.member.KeepLeader(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))

log.Info("setting up the global TSO allocator")
if err := s.tsoAllocatorManager.SetUpAllocator(ctx, config.GlobalDCLocation, s.member.GetLeadership()); err != nil {
log.Error("failed to set up the global TSO allocator", errs.ZapError(err))
return
}
// Check the cluster dc-location after the PD leader is elected
go s.tsoAllocatorManager.ClusterDCLocationChecker()

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
return
}

if err := s.encryptionKeyManager.SetLeadership(s.member.GetLeadership()); err != nil {
log.Error("failed to initialize encryption", errs.ZapError(err))
return
}

// Try to create raft cluster.
if err := s.createRaftCluster(); err != nil {
log.Error("failed to create raft cluster", errs.ZapError(err))
return
}
defer s.stopRaftCluster()
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
log.Error("failed to load persistOptions from etcd", errs.ZapError(err))
return
}
if err := s.idAllocator.Generate(); err != nil {
log.Error("failed to sync id from etcd", errs.ZapError(err))
return
}
s.member.EnableLeader()

CheckPDVersion(s.persistOptions)
log.Info("PD cluster leader is ready to serve", zap.String("pd-leader-name", s.Name()))

leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

for {
select {
case <-leaderTicker.C:
if !s.member.IsLeader() {
log.Info("no longer a leader because lease has expired, pd leader will step down")
return
}
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
return
}
}
}

func (s *Server) etcdLeaderLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration):
s.member.CheckPriority(ctx)
case <-ctx.Done():
log.Info("server is closed, exit etcd leader loop")
return
}
}
}

func (s *Server) reloadConfigFromKV() error {
err := s.persistOptions.Reload(s.storage)
>>>>>>> 65e08e2b... fix the id allocator is not monotonic (#3305)
if err != nil {
return errors.WithStack(err)
}
Expand Down

0 comments on commit bb5b2f9

Please sign in to comment.