Skip to content

Commit

Permalink
Merge branch 'master' into region-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Aug 28, 2023
2 parents f179e8e + 50368e5 commit e46e90a
Show file tree
Hide file tree
Showing 10 changed files with 419 additions and 134 deletions.
156 changes: 96 additions & 60 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

const (
watchLoopUnhealthyTimeout = 60 * time.Second
detectHealthyInterval = 10 * time.Second
)
const watchLoopUnhealthyTimeout = 60 * time.Second

// GetLeader gets the corresponding leader from etcd by given leaderPath (as the key).
func GetLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
Expand Down Expand Up @@ -136,7 +134,7 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
newLease.Close()
return errs.ErrEtcdTxnConflict.FastGenByArgs()
}
log.Info("write leaderData to leaderPath ok", zap.String("leaderPath", ls.leaderKey), zap.String("purpose", ls.purpose))
log.Info("write leaderData to leaderPath ok", zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return nil
}

Expand Down Expand Up @@ -178,7 +176,7 @@ func (ls *Leadership) DeleteLeaderKey() error {
}
// Reset the lease as soon as possible.
ls.Reset()
log.Info("delete the leader key ok", zap.String("leaderPath", ls.leaderKey), zap.String("purpose", ls.purpose))
log.Info("delete the leader key ok", zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return nil
}

Expand All @@ -189,106 +187,144 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}

interval := detectHealthyInterval
var (
watcher clientv3.Watcher
watcherCancel context.CancelFunc
)
defer func() {
if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
}()

unhealthyTimeout := watchLoopUnhealthyTimeout
failpoint.Inject("fastTick", func() {
unhealthyTimeout = 5 * time.Second
interval = 1 * time.Second
})
ticker := time.NewTicker(interval)
ticker := time.NewTicker(etcdutil.RequestProgressInterval)
defer ticker.Stop()
lastHealthyTime := time.Now()
lastReceivedResponseTime := time.Now()

watcher := clientv3.NewWatcher(ls.client)
defer watcher.Close()
var watchChanCancel context.CancelFunc
defer func() {
if watchChanCancel != nil {
watchChanCancel()
}
}()
for {
failpoint.Inject("delayWatcher", nil)
if watchChanCancel != nil {
watchChanCancel()
}
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watchChanCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watchChanCancel = cancel

// When etcd is not available, the watcher.Watch will block,
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > unhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
if time.Since(lastReceivedResponseTime) > unhealthyTimeout {
log.Error("the connection is unhealthy for a while, exit leader watch loop",
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
log.Warn("the connection maybe unhealthy, retry to watch later",
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
select {
case <-serverCtx.Done():
log.Info("server is closed, exit leader watch loop",
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
case <-ticker.C:
continue // continue to check the etcd availability
}
}

if watcherCancel != nil {
watcherCancel()
}
if watcher != nil {
watcher.Close()
}
watcher = clientv3.NewWatcher(ls.client)
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
watcherCtx, cancel := context.WithCancel(clientv3.WithRequireLeader(serverCtx))
watcherCancel = cancel

done := make(chan struct{})
go grpcutil.CheckStream(watcherCtx, watcherCancel, done)
watchChan := watcher.Watch(watcherCtx, ls.leaderKey,
clientv3.WithRev(revision), clientv3.WithProgressNotify())
done <- struct{}{}
if err := watcherCtx.Err(); err != nil {
log.Warn("error occurred while creating watch channel and retry it later in watch loop", zap.Error(err),
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
select {
case <-serverCtx.Done():
log.Info("server is closed, exit leader watch loop",
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
case <-ticker.C:
// continue to check the etcd availability
continue
}
}
log.Info("watch channel is created", zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))

watchChan := watcher.Watch(watchChanCtx, ls.leaderKey, clientv3.WithRev(revision))
WatchChanLoop:
watchChanLoop:
select {
case <-serverCtx.Done():
log.Info("server is closed, exit leader watch loop",
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
case <-ticker.C:
// When etcd is not available, the watcher.RequestProgress will block,
// so we check the etcd availability first.
if !etcdutil.IsHealthy(serverCtx, ls.client) {
if time.Since(lastHealthyTime) > unhealthyTimeout {
log.Error("the connect of leadership watcher is unhealthy",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
return
}
goto WatchChanLoop
log.Warn("the connection maybe unhealthy, retry to watch later",
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
continue
}
// We need to request progress to etcd to prevent etcd hold the watchChan,
// note: the ctx must be from watcherCtx, otherwise, the RequestProgress request cannot be sent properly.
ctx, cancel := context.WithTimeout(watcherCtx, etcdutil.DefaultRequestTimeout)
if err := watcher.RequestProgress(ctx); err != nil {
log.Warn("failed to request progress in leader watch loop",
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose), zap.Error(err))
}
cancel()
// If no message comes from an etcd watchChan for WatchChTimeoutDuration,
// create a new one and need not to reset lastReceivedResponseTime.
if time.Since(lastReceivedResponseTime) >= etcdutil.WatchChTimeoutDuration {
log.Warn("watch channel is blocked for a long time, recreating a new one",
zap.Duration("timeout", time.Since(lastReceivedResponseTime)),
zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
continue
}
case wresp := <-watchChan:
// meet compacted error, use the compact revision.
failpoint.Inject("watchChanBlock", func() {
// watchChanBlock is used to simulate the case that the watchChan is blocked for a long time.
// So we discard these responses when the failpoint is injected.
failpoint.Goto("watchChanLoop")
})
lastReceivedResponseTime = time.Now()
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
zap.Int64("required-revision", revision), zap.Int64("compact-revision", wresp.CompactRevision),
zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
revision = wresp.CompactRevision
lastHealthyTime = time.Now()
continue
} else if wresp.Err() != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with",
zap.Int64("revision", revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
} else if err := wresp.Err(); err != nil { // wresp.Err() contains CompactRevision not equal to 0
log.Error("leadership watcher is canceled with", errs.ZapError(errs.ErrEtcdWatcherCancel, err),
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
} else if wresp.IsProgressNotify() {
log.Debug("watcher receives progress notify in watch loop",
zap.Int64("revision", revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
goto watchChanLoop
}

for _, ev := range wresp.Events {
if ev.Type == mvccpb.DELETE {
log.Info("current leadership is deleted",
zap.Int64("revision", wresp.Header.Revision),
zap.String("leader-key", ls.leaderKey),
zap.String("purpose", ls.purpose))
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
}
revision = wresp.Header.Revision + 1
}
lastHealthyTime = time.Now()
goto WatchChanLoop // use goto to avoid to create a new watchChan
goto watchChanLoop // Use goto to avoid creating a new watchChan
}
}

Expand Down
Loading

0 comments on commit e46e90a

Please sign in to comment.