Skip to content

Commit

Permalink
server: broadcast leader changed
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Sep 17, 2018
1 parent fd5ef74 commit 9125b83
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
20 changes: 14 additions & 6 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ type EtcdServer struct {
// done is closed when all goroutines from start() complete.
done chan struct{}
// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
leaderChanged chan struct{}
leaderChanged chan struct{}
leaderChangedMu sync.RWMutex

errorc chan error
id types.ID
Expand Down Expand Up @@ -754,7 +755,7 @@ func (s *EtcdServer) start() {
s.ctx, s.cancel = context.WithCancel(context.Background())
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
s.leaderChanged = make(chan struct{}, 1)
s.leaderChanged = make(chan struct{})
if s.ClusterVersion() != nil {
if lg != nil {
lg.Info(
Expand Down Expand Up @@ -942,10 +943,11 @@ func (s *EtcdServer) run() {
}
}
if newLeader {
select {
case s.leaderChanged <- struct{}{}:
default:
}
s.leaderChangedMu.Lock()
lc := s.leaderChanged
s.leaderChanged = make(chan struct{})
s.leaderChangedMu.Unlock()
close(lc)
}
// TODO: remove the nil checking
// current test utility does not provide the stats
Expand Down Expand Up @@ -1696,6 +1698,12 @@ func (s *EtcdServer) getLead() uint64 {
return atomic.LoadUint64(&s.lead)
}

func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
s.leaderChangedMu.RLock()
defer s.leaderChangedMu.RUnlock()
return s.leaderChanged
}

// RaftStatusGetter represents etcd server and Raft progress.
type RaftStatusGetter interface {
ID() types.ID
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func (s *EtcdServer) linearizableReadLoop() {
binary.BigEndian.PutUint64(ctxToSend, id1)

select {
case <-s.leaderChanged:
case <-s.leaderChangedNotify():
continue
case <-s.readwaitc:
case <-s.stopping:
Expand Down Expand Up @@ -694,7 +694,7 @@ func (s *EtcdServer) linearizableReadLoop() {
}
slowReadIndex.Inc()
}
case <-s.leaderChanged:
case <-s.leaderChangedNotify():
timeout = true
readIndexFailed.Inc()
// return a retryable error.
Expand Down

0 comments on commit 9125b83

Please sign in to comment.