Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/leader: use the last modify revision to watch leader #1317

Merged
merged 3 commits into from
Nov 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/integration_test/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ func (s *testServer) GetClusterID() uint64 {
return s.server.ClusterID()
}

func (s *testServer) GetLeader() *pdpb.Member {
s.RLock()
defer s.RUnlock()
return s.server.GetLeader()
}

func (s *testServer) GetClusterVersion() semver.Version {
s.RLock()
defer s.RUnlock()
Expand Down
58 changes: 58 additions & 0 deletions pkg/integration_test/leader_watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"time"

gofail "github.com/etcd-io/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/pd/pkg/testutil"
)

func (s *integrationTestSuite) TestWatcher(c *C) {
c.Parallel()
cluster, err := newTestCluster(1)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
pd1 := cluster.GetServer(cluster.GetLeader())
c.Assert(pd1, NotNil)

pd2, err := cluster.Join()
c.Assert(err, IsNil)
err = pd2.Run(context.TODO())
c.Assert(err, IsNil)
cluster.WaitLeader()

time.Sleep(5 * time.Second)
pd3, err := cluster.Join()
c.Assert(err, IsNil)
gofail.Enable("github.com/pingcap/pd/server/delayWatcher", `sleep("15s")`)
err = pd3.Run(context.Background())
c.Assert(err, IsNil)
time.Sleep(200 * time.Millisecond)
c.Assert(pd3.GetLeader().GetName(), Equals, pd1.GetConfig().Name)
pd1.Stop()
cluster.WaitLeader()
c.Assert(pd2.GetLeader().GetName(), Equals, pd2.GetConfig().Name)
testutil.WaitUntil(c, func(c *C) bool {
return c.Check(pd3.GetLeader().GetName(), Equals, pd2.GetConfig().Name)
})
c.Succeed()
}
19 changes: 10 additions & 9 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Server) leaderLoop() {
continue
}

leader, err := getLeader(s.client, s.getLeaderPath())
leader, rev, err := getLeader(s.client, s.getLeaderPath())
if err != nil {
log.Errorf("get leader err %v", err)
time.Sleep(200 * time.Millisecond)
Expand All @@ -100,7 +100,7 @@ func (s *Server) leaderLoop() {
}
} else {
log.Infof("leader is %s, watch it", leader)
s.watchLeader(leader)
s.watchLeader(leader, rev)
log.Info("leader changed, try to campaign leader")
}
}
Expand Down Expand Up @@ -157,17 +157,17 @@ func (s *Server) etcdLeaderLoop() {
}

// getLeader gets server leader from etcd.
func getLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, error) {
func getLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
ok, err := getProtoMsg(c, leaderPath, leader)
ok, rev, err := getProtoMsgWithModRev(c, leaderPath, leader)
if err != nil {
return nil, err
return nil, 0, err
}
if !ok {
return nil, nil
return nil, 0, nil
}

return leader, nil
return leader, rev, nil
}

// GetEtcdLeader returns the etcd leader ID.
Expand Down Expand Up @@ -289,7 +289,7 @@ func (s *Server) campaignLeader() error {
}
}

func (s *Server) watchLeader(leader *pdpb.Member) {
func (s *Server) watchLeader(leader *pdpb.Member, revision int64) {
s.leader.Store(leader)
defer s.leader.Store(&pdpb.Member{})

Expand All @@ -309,7 +309,8 @@ func (s *Server) watchLeader(leader *pdpb.Member) {
}

for {
rch := watcher.Watch(ctx, s.getLeaderPath())
// gofail: var delayWatcher struct{}
rch := watcher.Watch(ctx, s.getLeaderPath(), clientv3.WithRev(revision))
for wresp := range rch {
if wresp.Canceled {
return
Expand Down
2 changes: 1 addition & 1 deletion server/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *testTimeFallBackSuite) TestTimeFallBack(c *C) {

func mustGetLeader(c *C, client *clientv3.Client, leaderPath string) *pdpb.Member {
for i := 0; i < 20; i++ {
leader, err := getLeader(client, leaderPath)
leader, _, err := getLeader(client, leaderPath)
c.Assert(err, IsNil)
if leader != nil {
return leader
Expand Down
33 changes: 20 additions & 13 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,18 @@ func CheckPDVersion(opt *scheduleOption) {
}

// A helper function to get value with key from etcd.
// TODO: return the value revision for outer use.
func getValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte, error) {
resp, err := get(c, key, opts...)
if err != nil {
return nil, err
}
if resp == nil {
return nil, nil
}
return resp.Kvs[0].Value, nil
}

func get(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
resp, err := kvGet(c, key, opts...)
if err != nil {
return nil, err
Expand All @@ -93,26 +103,23 @@ func getValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte
} else if n > 1 {
return nil, errors.Errorf("invalid get value resp %v, must only one", resp.Kvs)
}

return resp.Kvs[0].Value, nil
return resp, nil
}

// Return boolean to indicate whether the key exists or not.
// TODO: return the value revision for outer use.
func getProtoMsg(c *clientv3.Client, key string, msg proto.Message, opts ...clientv3.OpOption) (bool, error) {
value, err := getValue(c, key, opts...)
func getProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, opts ...clientv3.OpOption) (bool, int64, error) {
resp, err := get(c, key, opts...)
if err != nil {
return false, err
return false, 0, err
}
if value == nil {
return false, nil
if resp == nil {
return false, 0, nil
}

value := resp.Kvs[0].Value
if err = proto.Unmarshal(value, msg); err != nil {
return false, errors.WithStack(err)
return false, 0, errors.WithStack(err)
}

return true, nil
return true, resp.Kvs[0].ModRevision, nil
}

func initOrGetClusterID(c *clientv3.Client, key string) (uint64, error) {
Expand Down