Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: cherry pick some fixes #1321

Merged
merged 2 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/integration_test/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ func (s *testServer) GetClusterID() uint64 {
return s.server.ClusterID()
}

func (s *testServer) GetLeader() *pdpb.Member {
s.RLock()
defer s.RUnlock()
return s.server.GetLeader()
}

func (s *testServer) GetClusterVersion() semver.Version {
s.RLock()
defer s.RUnlock()
Expand Down
6 changes: 6 additions & 0 deletions pkg/integration_test/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package integration

import (
"context"
"os"
"path"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -44,6 +46,8 @@ func (s *integrationTestSuite) TestSimpleJoin(c *C) {
c.Assert(err, IsNil)
err = pd2.Run(context.TODO())
c.Assert(err, IsNil)
_, err = os.Stat(path.Join(pd2.GetConfig().DataDir, "join"))
c.Assert(os.IsNotExist(err), IsFalse)
members, err = etcdutil.ListEtcdMembers(client)
c.Assert(err, IsNil)
c.Assert(members.Members, HasLen, 2)
Expand All @@ -57,6 +61,8 @@ func (s *integrationTestSuite) TestSimpleJoin(c *C) {
c.Assert(err, IsNil)
err = pd3.Run(context.TODO())
c.Assert(err, IsNil)
_, err = os.Stat(path.Join(pd3.GetConfig().DataDir, "join"))
c.Assert(os.IsNotExist(err), IsFalse)
members, err = etcdutil.ListEtcdMembers(client)
c.Assert(err, IsNil)
c.Assert(members.Members, HasLen, 3)
Expand Down
58 changes: 58 additions & 0 deletions pkg/integration_test/leader_watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"time"

gofail "github.com/etcd-io/gofail/runtime"
. "github.com/pingcap/check"
"github.com/pingcap/pd/pkg/testutil"
)

func (s *integrationTestSuite) TestWatcher(c *C) {
c.Parallel()
cluster, err := newTestCluster(1)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
pd1 := cluster.GetServer(cluster.GetLeader())
c.Assert(pd1, NotNil)

pd2, err := cluster.Join()
c.Assert(err, IsNil)
err = pd2.Run(context.TODO())
c.Assert(err, IsNil)
cluster.WaitLeader()

time.Sleep(5 * time.Second)
pd3, err := cluster.Join()
c.Assert(err, IsNil)
gofail.Enable("github.com/pingcap/pd/server/delayWatcher", `sleep("15s")`)
err = pd3.Run(context.Background())
c.Assert(err, IsNil)
time.Sleep(200 * time.Millisecond)
c.Assert(pd3.GetLeader().GetName(), Equals, pd1.GetConfig().Name)
pd1.Stop()
cluster.WaitLeader()
c.Assert(pd2.GetLeader().GetName(), Equals, pd2.GetConfig().Name)
testutil.WaitUntil(c, func(c *C) bool {
return c.Check(pd3.GetLeader().GetName(), Equals, pd2.GetConfig().Name)
})
c.Succeed()
}
36 changes: 34 additions & 2 deletions server/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
Expand All @@ -26,6 +27,13 @@ import (
log "github.com/sirupsen/logrus"
)

const (
// privateFileMode grants owner to read/write a file.
privateFileMode = 0600
// privateDirMode grants owner to make/remove files inside the directory.
privateDirMode = 0700
)

// PrepareJoinCluster sends MemberAdd command to PD cluster,
// and returns the initial configuration of the PD cluster.
//
Expand Down Expand Up @@ -73,8 +81,20 @@ func PrepareJoinCluster(cfg *Config) error {
return errors.New("join self is forbidden")
}

// Cases with data directory.
filePath := path.Join(cfg.DataDir, "join")
// Read the persist join config
if _, err := os.Stat(filePath); !os.IsNotExist(err) {
s, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatal("read the join config meet error: ", err)
}
cfg.InitialCluster = strings.TrimSpace(string(s))
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

initialCluster := ""
// Cases with data directory.
if isDataExist(path.Join(cfg.DataDir, "member")) {
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
Expand Down Expand Up @@ -103,6 +123,9 @@ func PrepareJoinCluster(cfg *Config) error {

existed := false
for _, m := range listResp.Members {
if len(m.Name) == 0 {
return errors.New("there is a member that has not joined successfully")
}
if m.Name == cfg.Name {
existed = true
}
Expand Down Expand Up @@ -131,14 +154,23 @@ func PrepareJoinCluster(cfg *Config) error {
if memb.ID == addResp.Member.ID {
n = cfg.Name
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))
}
}
initialCluster = strings.Join(pds, ",")
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
err = os.MkdirAll(cfg.DataDir, privateDirMode)
if err != nil && !os.IsExist(err) {
return errors.WithStack(err)
}

err = ioutil.WriteFile(filePath, []byte(cfg.InitialCluster), privateFileMode)
return errors.WithStack(err)
}

func isDataExist(d string) bool {
Expand Down
19 changes: 10 additions & 9 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *Server) leaderLoop() {
continue
}

leader, err := getLeader(s.client, s.getLeaderPath())
leader, rev, err := getLeader(s.client, s.getLeaderPath())
if err != nil {
log.Errorf("get leader err %v", err)
time.Sleep(200 * time.Millisecond)
Expand All @@ -100,7 +100,7 @@ func (s *Server) leaderLoop() {
}
} else {
log.Infof("leader is %s, watch it", leader)
s.watchLeader(leader)
s.watchLeader(leader, rev)
log.Info("leader changed, try to campaign leader")
}
}
Expand Down Expand Up @@ -157,17 +157,17 @@ func (s *Server) etcdLeaderLoop() {
}

// getLeader gets server leader from etcd.
func getLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, error) {
func getLeader(c *clientv3.Client, leaderPath string) (*pdpb.Member, int64, error) {
leader := &pdpb.Member{}
ok, err := getProtoMsg(c, leaderPath, leader)
ok, rev, err := getProtoMsgWithModRev(c, leaderPath, leader)
if err != nil {
return nil, err
return nil, 0, err
}
if !ok {
return nil, nil
return nil, 0, nil
}

return leader, nil
return leader, rev, nil
}

// GetEtcdLeader returns the etcd leader ID.
Expand Down Expand Up @@ -289,7 +289,7 @@ func (s *Server) campaignLeader() error {
}
}

func (s *Server) watchLeader(leader *pdpb.Member) {
func (s *Server) watchLeader(leader *pdpb.Member, revision int64) {
s.leader.Store(leader)
defer s.leader.Store(&pdpb.Member{})

Expand All @@ -300,7 +300,8 @@ func (s *Server) watchLeader(leader *pdpb.Member) {
defer cancel()

for {
rch := watcher.Watch(ctx, s.getLeaderPath())
// gofail: var delayWatcher struct{}
rch := watcher.Watch(ctx, s.getLeaderPath(), clientv3.WithRev(revision))
for wresp := range rch {
if wresp.Canceled {
return
Expand Down
2 changes: 1 addition & 1 deletion server/tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *testTimeFallBackSuite) TestTimeFallBack(c *C) {

func mustGetLeader(c *C, client *clientv3.Client, leaderPath string) *pdpb.Member {
for i := 0; i < 20; i++ {
leader, err := getLeader(client, leaderPath)
leader, _, err := getLeader(client, leaderPath)
c.Assert(err, IsNil)
if leader != nil {
return leader
Expand Down
33 changes: 20 additions & 13 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,18 @@ func CheckPDVersion(opt *scheduleOption) {
}

// A helper function to get value with key from etcd.
// TODO: return the value revision for outer use.
func getValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte, error) {
resp, err := get(c, key, opts...)
if err != nil {
return nil, err
}
if resp == nil {
return nil, nil
}
return resp.Kvs[0].Value, nil
}

func get(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
resp, err := kvGet(c, key, opts...)
if err != nil {
return nil, err
Expand All @@ -93,26 +103,23 @@ func getValue(c *clientv3.Client, key string, opts ...clientv3.OpOption) ([]byte
} else if n > 1 {
return nil, errors.Errorf("invalid get value resp %v, must only one", resp.Kvs)
}

return resp.Kvs[0].Value, nil
return resp, nil
}

// Return boolean to indicate whether the key exists or not.
// TODO: return the value revision for outer use.
func getProtoMsg(c *clientv3.Client, key string, msg proto.Message, opts ...clientv3.OpOption) (bool, error) {
value, err := getValue(c, key, opts...)
func getProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, opts ...clientv3.OpOption) (bool, int64, error) {
resp, err := get(c, key, opts...)
if err != nil {
return false, err
return false, 0, err
}
if value == nil {
return false, nil
if resp == nil {
return false, 0, nil
}

value := resp.Kvs[0].Value
if err = proto.Unmarshal(value, msg); err != nil {
return false, errors.WithStack(err)
return false, 0, errors.WithStack(err)
}

return true, nil
return true, resp.Kvs[0].ModRevision, nil
}

func initOrGetClusterID(c *clientv3.Client, key string) (uint64, error) {
Expand Down