Skip to content

Commit

Permalink
Merge pull request #12410 from Yanhao/master
Browse files Browse the repository at this point in the history
etcdserver: export method EtcdServer.leaderChangedNotify (#12378)
  • Loading branch information
ptabor authored Feb 2, 2021
2 parents e897daa + 6d82778 commit 50ca440
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 13 deletions.
7 changes: 4 additions & 3 deletions server/etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func (s *fakeServer) UpdateMember(ctx context.Context, updateMemb membership.Mem
func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
return nil, fmt.Errorf("PromoteMember not implemented in fakeServer")
}
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
func (s *fakeServer) Cluster() api.Cluster { return s.cluster }
func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil }
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
func (s *fakeServer) Cluster() api.Cluster { return s.cluster }
func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil }
func (s *fakeServer) LeaderChangedNotify() <-chan struct{} { return nil }

var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
Expand Down
11 changes: 6 additions & 5 deletions server/etcdserver/api/v2http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@ type fakeServer struct {
dummyStats
}

func (s *fakeServer) Leader() types.ID { return types.ID(1) }
func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil }
func (s *fakeServer) Cluster() api.Cluster { return nil }
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
func (s *fakeServer) RaftHandler() http.Handler { return nil }
func (s *fakeServer) Leader() types.ID { return types.ID(1) }
func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil }
func (s *fakeServer) LeaderChangedNotify() <-chan struct{} { return nil }
func (s *fakeServer) Cluster() api.Cluster { return nil }
func (s *fakeServer) ClusterVersion() *semver.Version { return nil }
func (s *fakeServer) RaftHandler() http.Handler { return nil }
func (s *fakeServer) Do(ctx context.Context, r etcdserverpb.Request) (rr etcdserver.Response, err error) {
return
}
Expand Down
7 changes: 4 additions & 3 deletions server/etcdserver/api/v2v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ func v3MembersToMembership(v3membs []*pb.Member) []*membership.Member {
return membs
}

func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() }
func (s *v2v3Server) Cluster() api.Cluster { return s }
func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil }
func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() }
func (s *v2v3Server) Cluster() api.Cluster { return s }
func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil }
func (s *v2v3Server) LeaderChangedNotify() <-chan struct{} { return nil }

func (s *v2v3Server) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) {
applier := etcdserver.NewApplierV2(s.lg, s.store, nil)
Expand Down
11 changes: 10 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ type Server interface {
ClusterVersion() *semver.Version
Cluster() api.Cluster
Alarms() []*pb.AlarmMember

// LeaderChangedNotify returns a channel for application level code to be notified
// when etcd leader changes, this function is intend to be used only in application
// which embed etcd.
// Caution:
// 1. the returned channel is being closed when the leadership changes.
// 2. so the new channel needs to be obtained for each raft term.
// 3. user can loose some consecutive channel changes using this API.
LeaderChangedNotify() <-chan struct{}
}

// EtcdServer is the production implementation of the Server interface
Expand Down Expand Up @@ -1743,7 +1752,7 @@ func (s *EtcdServer) getLead() uint64 {
return atomic.LoadUint64(&s.lead)
}

func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
s.leaderChangedMu.RLock()
defer s.leaderChangedMu.RUnlock()
return s.leaderChanged
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (s *EtcdServer) linearizableReadLoop() {
ctxToSend := make([]byte, 8)
id1 := s.reqIDGen.Next()
binary.BigEndian.PutUint64(ctxToSend, id1)
leaderChangedNotifier := s.leaderChangedNotify()
leaderChangedNotifier := s.LeaderChangedNotify()
select {
case <-leaderChangedNotifier:
continue
Expand Down

0 comments on commit 50ca440

Please sign in to comment.