Skip to content

Commit

Permalink
feat: avoid transfer leader to follower not ready or applying snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Jun 18, 2021
1 parent 9f2eff2 commit 1181268
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 34 deletions.
22 changes: 18 additions & 4 deletions cluster/datanode_coord/data_node_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (dc *DataCoordinator) getNamespaceRaftLeader(nsInfo *cluster.PartitionMetaI
return m.NodeID
}

func (dc *DataCoordinator) transferMyNamespaceLeader(nsInfo *cluster.PartitionMetaInfo, nid string, force bool, checkAll bool) bool {
func (dc *DataCoordinator) TransferMyNamespaceLeader(nsInfo *cluster.PartitionMetaInfo, nid string, force bool, checkAll bool) bool {
nsNode := dc.localNSMgr.GetNamespaceNode(nsInfo.GetDesp())
if nsNode == nil {
return false
Expand Down Expand Up @@ -779,7 +779,7 @@ func (dc *DataCoordinator) checkForUnsyncedNamespaces() {
// removing node should transfer leader immediately since others partitions may wait for ready ,
// so it may never all ready for transfer. we transfer only check local partition.
_, removed := namespaceMeta.Removings[dc.GetMyID()]
done = dc.transferMyNamespaceLeader(namespaceMeta, isrList[0], false, !removed)
done = dc.TransferMyNamespaceLeader(namespaceMeta, isrList[0], false, !removed)
lastTransferCheckedTime = time.Now()
}
if !done {
Expand All @@ -797,7 +797,7 @@ func (dc *DataCoordinator) checkForUnsyncedNamespaces() {
// also we should avoid transfer leader while some node is catchuping while recover from restart
done := false
if time.Since(lastTransferCheckedTime) >= TransferLeaderWait {
done = dc.transferMyNamespaceLeader(namespaceMeta, isrList[0], false, true)
done = dc.TransferMyNamespaceLeader(namespaceMeta, isrList[0], false, true)
lastTransferCheckedTime = time.Now()
}
if !done {
Expand Down Expand Up @@ -1369,15 +1369,29 @@ func (dc *DataCoordinator) prepareLeavingCluster() {
if leader != dc.GetMyRegID() {
continue
}
// try not force for the first time, if failed try force again
transferSuccess := false
for _, newLeader := range nsInfo.GetISR() {
if newLeader == dc.GetMyID() {
continue
}
done := dc.transferMyNamespaceLeader(nsInfo.GetCopy(), newLeader, true, false)
done := dc.TransferMyNamespaceLeader(nsInfo.GetCopy(), newLeader, false, true)
if done {
transferSuccess = true
break
}
}
if !transferSuccess {
for _, newLeader := range nsInfo.GetISR() {
if newLeader == dc.GetMyID() {
continue
}
done := dc.TransferMyNamespaceLeader(nsInfo.GetCopy(), newLeader, true, false)
if done {
break
}
}
}
}
}
if dc.register != nil {
Expand Down
16 changes: 4 additions & 12 deletions node/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,11 @@ func (nn *NamespaceNode) Start(forceStandaloneCluster bool) error {
}

func (nn *NamespaceNode) TransferMyLeader(to uint64, toRaftID uint64) error {
waitTimeout := time.Duration(nn.Node.machineConfig.ElectionTick) * time.Duration(nn.Node.machineConfig.TickMs) * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), waitTimeout)
defer cancel()
oldLeader := nn.Node.rn.Lead()
nn.Node.rn.node.TransferLeadership(ctx, oldLeader, toRaftID)
for nn.Node.rn.Lead() != toRaftID {
select {
case <-ctx.Done():
return errTimeoutLeaderTransfer
case <-time.After(200 * time.Millisecond):
}
err := nn.Node.TransferLeadership(toRaftID)
if err != nil {
return err
}
nodeLog.Infof("finished transfer from %v to %v:%v", oldLeader, to, toRaftID)
nodeLog.Infof("finished transfer to %v:%v", to, toRaftID)
return nil
}

Expand Down
104 changes: 89 additions & 15 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
"github.com/youzan/ZanRedisDB/raft"
"github.com/youzan/ZanRedisDB/raft/raftpb"
"github.com/youzan/ZanRedisDB/rockredis"
"github.com/youzan/ZanRedisDB/settings"
"github.com/youzan/ZanRedisDB/transport/rafthttp"
)

var enableSnapTransferTest = false
var enableSnapSaveTest = false
var enableSnapApplyTest = false
var enableSnapApplyBlockingTest = false
var snapApplyBlockingC = make(chan time.Duration, 1)
var enableSnapApplyRestoreStorageTest = false
var UseRedisV2 = false

Expand All @@ -40,14 +43,24 @@ func EnableSnapForTest(transfer bool, save bool, apply bool, restore bool) {
enableSnapApplyRestoreStorageTest = restore
}

func EnableSnapBlockingForTest(b bool) {
enableSnapApplyBlockingTest = b
}

func PutSnapBlockingTime(d time.Duration) {
snapApplyBlockingC <- d
}

var (
errInvalidResponse = errors.New("Invalid response type")
errSyntaxError = errors.New("syntax error")
errUnknownData = errors.New("unknown request data type")
errTooMuchBatchSize = errors.New("the batch size exceed the limit")
errRaftNotReadyForWrite = errors.New("ERR_CLUSTER_CHANGED: the raft is not ready for write")
errWrongNumberArgs = errors.New("ERR wrong number of arguments for redis command")
ErrReadIndexTimeout = errors.New("wait read index timeout")
errInvalidResponse = errors.New("Invalid response type")
errSyntaxError = errors.New("syntax error")
errUnknownData = errors.New("unknown request data type")
errTooMuchBatchSize = errors.New("the batch size exceed the limit")
errRaftNotReadyForWrite = errors.New("ERR_CLUSTER_CHANGED: the raft is not ready for write")
errWrongNumberArgs = errors.New("ERR wrong number of arguments for redis command")
ErrReadIndexTimeout = errors.New("wait read index timeout")
ErrNotLeader = errors.New("not raft leader")
ErrTransferLeaderSelfErr = errors.New("transfer leader to self not allowed")
)

const (
Expand Down Expand Up @@ -215,11 +228,12 @@ type KVNode struct {
remoteSyncedStates *remoteSyncedStateMgr
applyWait wait.WaitTime
// used for read index
readMu sync.RWMutex
readWaitC chan struct{}
readNotifier *notifier
wrPools waitReqPoolArray
slowLimiter *SlowLimiter
readMu sync.RWMutex
readWaitC chan struct{}
readNotifier *notifier
wrPools waitReqPoolArray
slowLimiter *SlowLimiter
applyingSnapshot int32
}

type KVSnapInfo struct {
Expand Down Expand Up @@ -477,13 +491,20 @@ func (nd *KVNode) GetRaftStatus() raft.Status {

// this is used for leader to determine whether a follower is up to date.
func (nd *KVNode) IsReplicaRaftReady(raftID uint64) bool {
s := nd.rn.node.Status()
allowLagCnt := settings.Soft.LeaderTransferLag
s := nd.GetRaftStatus()
if s.Progress == nil {
return false
}
pg, ok := s.Progress[raftID]
if !ok {
return false
}
if pg.IsPaused() {
return false
}
if pg.State.String() == "ProgressStateReplicate" {
if pg.Match+maxInflightMsgs >= s.Commit {
if pg.Match+allowLagCnt >= s.Commit {
return true
}
} else if pg.State.String() == "ProgressStateProbe" {
Expand Down Expand Up @@ -1041,9 +1062,13 @@ func (nd *KVNode) IsRaftSynced(checkCommitIndex bool) bool {
// leader always raft synced.
return true
}
// here, the raft may still not started, so do not try wait raft event
ai := nd.GetAppliedIndex()
ci := nd.GetRaftStatus().Commit
nd.rn.Infof("check raft synced, apply: %v, commit: %v", ai, ci)
if nd.IsApplyingSnapshot() {
return false
}
if !checkCommitIndex {
return true
}
Expand Down Expand Up @@ -1166,6 +1191,8 @@ func (nd *KVNode) applySnapshot(np *nodeProgress, applyEvent *applyInfo) {
nodeLog.Panicf("snapshot index [%d] should > progress.appliedIndex [%d] + 1",
applyEvent.snapshot.Metadata.Index, np.appliedi)
}
atomic.StoreInt32(&nd.applyingSnapshot, 1)
defer atomic.StoreInt32(&nd.applyingSnapshot, 0)
err := nd.PrepareSnapshot(applyEvent.snapshot)
if enableSnapTransferTest {
err = errors.New("auto test failed in snapshot transfer")
Expand Down Expand Up @@ -1216,6 +1243,10 @@ func (nd *KVNode) applySnapshot(np *nodeProgress, applyEvent *applyInfo) {
<-nd.stopChan
return
}
if enableSnapApplyBlockingTest {
wt := <-snapApplyBlockingC
time.Sleep(wt)
}

np.confState = applyEvent.snapshot.Metadata.ConfState
np.snapi = applyEvent.snapshot.Metadata.Index
Expand Down Expand Up @@ -1551,16 +1582,35 @@ func (nd *KVNode) GetLastLeaderChangedTime() int64 {
return nd.rn.getLastLeaderChangedTime()
}

func (nd *KVNode) IsApplyingSnapshot() bool {
return atomic.LoadInt32(&nd.applyingSnapshot) == 1
}

func (nd *KVNode) ReportMeLeaderToCluster() {
if nd.clusterInfo == nil {
return
}
if nd.rn.IsLead() {
if nd.IsApplyingSnapshot() {
nd.rn.Errorf("should not update raft leader to me while applying snapshot")
// should give up the leader in raft
stats := nd.GetRaftStatus()
for rid, pr := range stats.Progress {
if pr.IsLearner || !nd.IsReplicaRaftReady(rid) {
continue
}
err := nd.TransferLeadership(rid)
if err == nil {
break
}
}
return
}
changed, err := nd.clusterInfo.UpdateMeForNamespaceLeader(nd.ns)
if err != nil {
nd.rn.Infof("update raft leader to me failed: %v", err)
} else if changed {
nd.rn.Infof("update %v raft leader to me : %v", nd.ns, nd.rn.config.ID)
nd.rn.Infof("update %v raft leader to me : %v, at %v-%v", nd.ns, nd.rn.config.ID, nd.GetAppliedIndex(), nd.GetRaftStatus().Commit)
}
}
}
Expand All @@ -1572,6 +1622,30 @@ func (nd *KVNode) OnRaftLeaderChanged() {
}
}

func (nd *KVNode) TransferLeadership(toRaftID uint64) error {
nd.rn.Infof("begin transfer leader to %v", toRaftID)
if !nd.rn.IsLead() {
return ErrNotLeader
}
oldLeader := nd.rn.Lead()
if oldLeader == toRaftID {
return ErrTransferLeaderSelfErr
}
waitTimeout := time.Duration(nd.machineConfig.ElectionTick) * time.Duration(nd.machineConfig.TickMs) * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), waitTimeout)
defer cancel()
nd.rn.node.TransferLeadership(ctx, oldLeader, toRaftID)
for nd.rn.Lead() != toRaftID {
select {
case <-ctx.Done():
return errTimeoutLeaderTransfer
case <-time.After(200 * time.Millisecond):
}
}
nd.rn.Infof("finished transfer from %v to %v", oldLeader, toRaftID)
return nil
}

func (nd *KVNode) Process(ctx context.Context, m raftpb.Message) error {
// avoid prepare snapshot while the node is starting
if m.Type == raftpb.MsgSnap && !raft.IsEmptySnap(m.Snapshot) {
Expand Down
7 changes: 4 additions & 3 deletions node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ func (rc *raftNode) processReady(rd raft.Ready) {
if !raft.IsEmptySnap(rd.Snapshot) {
applySnapshotTransferResult = make(chan error, 1)
if !waitApply {
// this is only needed if the recover from snapshot is not atomic
waitApply = true
applyWaitDone = make(chan struct{})
}
Expand Down Expand Up @@ -1110,13 +1111,13 @@ func (rc *raftNode) processReady(rd raft.Ready) {
rc.Infof("wait apply for pending configure or snapshot")
s := time.Now()
// wait and handle pending config change
confDone := false
for !confDone {
done := false
for !done {
select {
case cc := <-rc.node.ConfChangedCh():
rc.node.HandleConfChanged(cc)
case <-applyWaitDone:
confDone = true
done = true
case <-rc.stopc:
return
}
Expand Down
84 changes: 84 additions & 0 deletions pdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,90 @@ func TestTransferLeaderWhileReplicaNotReady(t *testing.T) {
// should only transfer leader when replica has almost the newest raft logs
}

func TestTransferLeaderWhileReplicaApplyingSnapshot(t *testing.T) {
// apply snapshot and transfer leader should fail
defer node.EnableSnapBlockingForTest(false)

ensureClusterReady(t, 3)
time.Sleep(time.Second)
ns := "test_cluster_transfer_leader_snap_applying"
partNum := 1

pduri := "http://127.0.0.1:" + pdHttpPort

ensureDataNodesReady(t, pduri, len(gkvList))
enableAutoBalance(t, pduri, true)
ensureNamespace(t, pduri, ns, partNum, 3)
defer ensureDeleteNamespace(t, pduri, ns)
dnw, nsNode := waitForLeader(t, ns, 0)
leader := dnw.s
assert.NotNil(t, leader)
// call this to propose some request to write raft logs
for i := 0; i < 5; i++ {
nsNode.Node.OptimizeDB("")
}
oldNs := getNsInfo(t, ns, 0)
t.Logf("old isr is: %v", oldNs)
assert.Equal(t, 3, len(oldNs.GetISR()))

foWrap, _ := getFollowerNode(t, ns, 0)
foWrap.s.Stop()

for i := 0; i < 50; i++ {
nsNode.Node.OptimizeDB("")
}
c := getTestRedisConn(t, dnw.redisPort)
defer c.Close()
key := fmt.Sprintf("%s:%s", ns, "snap_apply:k1")
rsp, err := goredis.String(c.Do("set", key, "1234"))
assert.Nil(t, err)
assert.Equal(t, "OK", rsp)

for i := 0; i < 50; i++ {
nsNode.Node.OptimizeDB("")
}
leaderV, err := goredis.String(c.Do("get", key))
assert.True(t, err == nil || err == goredis.ErrNil)
assert.Equal(t, "1234", leaderV)
time.Sleep(time.Second * 5)

// make sure the snapshot applying is blocked
// and then transfer leader to this follower
node.EnableSnapBlockingForTest(true)
foWrap.s.Start()
time.Sleep(time.Second)
node.PutSnapBlockingTime(time.Second * 20)
fn := foWrap.s.GetNamespaceFromFullName(ns + "-0")
assert.True(t, fn.Node.IsApplyingSnapshot())
foRaftID := fn.GetRaftID()
err = nsNode.Node.TransferLeadership(foRaftID)
assert.NotNil(t, err)
nsInfo := getNsInfo(t, ns, 0)
transferOK := leader.GetCoord().TransferMyNamespaceLeader(&nsInfo, foWrap.s.GetCoord().GetMyID(), false, true)
assert.False(t, transferOK, "should not transfer while snapshot applying")
transferOK = leader.GetCoord().TransferMyNamespaceLeader(&nsInfo, foWrap.s.GetCoord().GetMyID(), false, false)
assert.False(t, transferOK, "should not transfer while snapshot applying")
transferOK = leader.GetCoord().TransferMyNamespaceLeader(&nsInfo, foWrap.s.GetCoord().GetMyID(), true, false)
assert.False(t, transferOK, "should not transfer while snapshot applying")

time.Sleep(time.Second * 20)
assert.False(t, fn.Node.IsApplyingSnapshot())
transferOK = leader.GetCoord().TransferMyNamespaceLeader(&nsInfo, foWrap.s.GetCoord().GetMyID(), false, true)
assert.True(t, transferOK, "should transfer ok")
_, newLeaderNode := waitForLeader(t, ns, 0)
assert.Equal(t, foRaftID, newLeaderNode.GetRaftID())

waitForAllFullReady(t, ns, 0)
followerConn := getTestRedisConn(t, foWrap.redisPort)
defer followerConn.Close()

getV, err := goredis.String(followerConn.Do("get", key))
assert.True(t, err == nil || err == goredis.ErrNil)
assert.Equal(t, "1234", getV)
}

func TestTransferLeaderWhileReplicaLagToomuch(t *testing.T) {
}
func TestClusterRestartNodeCatchup(t *testing.T) {
// test restarted node catchup while writing
ensureClusterReady(t, 3)
Expand Down
Loading

0 comments on commit 1181268

Please sign in to comment.