From 6d82778a4e0bc2d92f49bc1d70a7be428d2613b3 Mon Sep 17 00:00:00 2001 From: Yanhao Mo Date: Wed, 21 Oct 2020 16:21:08 +0800 Subject: [PATCH] etcdserver: export method EtcdServer.leaderChangedNotify (#12378) --- server/etcdserver/api/etcdhttp/peer_test.go | 7 ++++--- server/etcdserver/api/v2http/client_test.go | 11 ++++++----- server/etcdserver/api/v2v3/server.go | 7 ++++--- server/etcdserver/server.go | 11 ++++++++++- server/etcdserver/v3_server.go | 2 +- 5 files changed, 25 insertions(+), 13 deletions(-) diff --git a/server/etcdserver/api/etcdhttp/peer_test.go b/server/etcdserver/api/etcdhttp/peer_test.go index 8fe300cd173..3770f29edc0 100644 --- a/server/etcdserver/api/etcdhttp/peer_test.go +++ b/server/etcdserver/api/etcdhttp/peer_test.go @@ -72,9 +72,10 @@ func (s *fakeServer) UpdateMember(ctx context.Context, updateMemb membership.Mem func (s *fakeServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { return nil, fmt.Errorf("PromoteMember not implemented in fakeServer") } -func (s *fakeServer) ClusterVersion() *semver.Version { return nil } -func (s *fakeServer) Cluster() api.Cluster { return s.cluster } -func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil } +func (s *fakeServer) ClusterVersion() *semver.Version { return nil } +func (s *fakeServer) Cluster() api.Cluster { return s.cluster } +func (s *fakeServer) Alarms() []*pb.AlarmMember { return nil } +func (s *fakeServer) LeaderChangedNotify() <-chan struct{} { return nil } var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("test data")) diff --git a/server/etcdserver/api/v2http/client_test.go b/server/etcdserver/api/v2http/client_test.go index bdb455c0e96..cb4be0b2688 100644 --- a/server/etcdserver/api/v2http/client_test.go +++ b/server/etcdserver/api/v2http/client_test.go @@ -95,11 +95,12 @@ type fakeServer struct { dummyStats } -func (s *fakeServer) Leader() types.ID { return types.ID(1) } -func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil } -func (s *fakeServer) Cluster() api.Cluster { return nil } -func (s *fakeServer) ClusterVersion() *semver.Version { return nil } -func (s *fakeServer) RaftHandler() http.Handler { return nil } +func (s *fakeServer) Leader() types.ID { return types.ID(1) } +func (s *fakeServer) Alarms() []*etcdserverpb.AlarmMember { return nil } +func (s *fakeServer) LeaderChangedNotify() <-chan struct{} { return nil } +func (s *fakeServer) Cluster() api.Cluster { return nil } +func (s *fakeServer) ClusterVersion() *semver.Version { return nil } +func (s *fakeServer) RaftHandler() http.Handler { return nil } func (s *fakeServer) Do(ctx context.Context, r etcdserverpb.Request) (rr etcdserver.Response, err error) { return } diff --git a/server/etcdserver/api/v2v3/server.go b/server/etcdserver/api/v2v3/server.go index 0cdb5c636f7..be1b43c523d 100644 --- a/server/etcdserver/api/v2v3/server.go +++ b/server/etcdserver/api/v2v3/server.go @@ -113,9 +113,10 @@ func v3MembersToMembership(v3membs []*pb.Member) []*membership.Member { return membs } -func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() } -func (s *v2v3Server) Cluster() api.Cluster { return s } -func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil } +func (s *v2v3Server) ClusterVersion() *semver.Version { return s.Version() } +func (s *v2v3Server) Cluster() api.Cluster { return s } +func (s *v2v3Server) Alarms() []*pb.AlarmMember { return nil } +func (s *v2v3Server) LeaderChangedNotify() <-chan struct{} { return nil } func (s *v2v3Server) Do(ctx context.Context, r pb.Request) (etcdserver.Response, error) { applier := etcdserver.NewApplierV2(s.lg, s.store, nil) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a4fa259ae08..5c5a9fb6780 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -185,6 +185,15 @@ type Server interface { ClusterVersion() *semver.Version Cluster() api.Cluster Alarms() []*pb.AlarmMember + + // LeaderChangedNotify returns a channel for application level code to be notified + // when etcd leader changes, this function is intend to be used only in application + // which embed etcd. + // Caution: + // 1. the returned channel is being closed when the leadership changes. + // 2. so the new channel needs to be obtained for each raft term. + // 3. user can loose some consecutive channel changes using this API. + LeaderChangedNotify() <-chan struct{} } // EtcdServer is the production implementation of the Server interface @@ -1743,7 +1752,7 @@ func (s *EtcdServer) getLead() uint64 { return atomic.LoadUint64(&s.lead) } -func (s *EtcdServer) leaderChangedNotify() <-chan struct{} { +func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} { s.leaderChangedMu.RLock() defer s.leaderChangedMu.RUnlock() return s.leaderChanged diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 4130d8e081f..3fa64f7413e 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -712,7 +712,7 @@ func (s *EtcdServer) linearizableReadLoop() { ctxToSend := make([]byte, 8) id1 := s.reqIDGen.Next() binary.BigEndian.PutUint64(ctxToSend, id1) - leaderChangedNotifier := s.leaderChangedNotify() + leaderChangedNotifier := s.LeaderChangedNotify() select { case <-leaderChangedNotifier: continue