Skip to content

Commit

Permalink
server: add retry mechanism for join (tikv#1643)
Browse files Browse the repository at this point in the history
* server/join: add retry for join member

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Aug 5, 2019
1 parent aec1c1f commit a2f134b
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 16 deletions.
64 changes: 48 additions & 16 deletions server/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"os"
"path"
"strings"
"time"

"github.com/pingcap/failpoint"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/etcdutil"
"github.com/pkg/errors"
Expand All @@ -35,6 +37,9 @@ const (
privateDirMode = 0700
)

// listMemberRetryTimes is the retry times of list member.
var listMemberRetryTimes = 20

// PrepareJoinCluster sends MemberAdd command to PD cluster,
// and returns the initial configuration of the PD cluster.
//
Expand Down Expand Up @@ -137,31 +142,58 @@ func PrepareJoinCluster(cfg *Config) error {
return errors.New("missing data or join a duplicated pd")
}

var addResp *clientv3.MemberAddResponse

failpoint.Inject("add-member-failed", func() {
listMemberRetryTimes = 2
failpoint.Goto("LabelSkipAddMember")
})
// - A new PD joins an existing cluster.
// - A deleted PD joins to previous cluster.
addResp, err := etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls})
if err != nil {
return err
{
// First adds member through the API
addResp, err = etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls})
if err != nil {
return err
}
}
failpoint.Label("LabelSkipAddMember")

listResp, err = etcdutil.ListEtcdMembers(client)
if err != nil {
return err
}
var (
pds []string
listSucc bool
)

pds := []string{}
for _, memb := range listResp.Members {
n := memb.Name
if memb.ID == addResp.Member.ID {
n = cfg.Name
for i := 0; i < listMemberRetryTimes; i++ {
listResp, err = etcdutil.ListEtcdMembers(client)
if err != nil {
return err
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")

pds = []string{}
for _, memb := range listResp.Members {
n := memb.Name
if addResp != nil && memb.ID == addResp.Member.ID {
n = cfg.Name
listSucc = true
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))
}
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))

if listSucc {
break
}
time.Sleep(500 * time.Millisecond)
}
if !listSucc {
return errors.Errorf("join failed, adds the new member %s may failed", cfg.Name)
}

initialCluster = strings.Join(pds, ",")
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
Expand Down
53 changes: 53 additions & 0 deletions tests/server/join/join_fail/join_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package join_fail_test

import (
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/tests"
)

func Test(t *testing.T) {
TestingT(t)
}

var _ = Suite(&serverTestSuite{})

type serverTestSuite struct{}

func (s *serverTestSuite) SetUpSuite(c *C) {
server.EnableZap = true
}

func (s *serverTestSuite) TestFailedPDJoinInStep1(c *C) {
cluster, err := tests.NewTestCluster(1)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

// Join the second PD.
c.Assert(failpoint.Enable("github.com/pingcap/pd/server/add-member-failed", `return`), IsNil)
_, err = cluster.Join()
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "join failed"), IsTrue)
c.Assert(failpoint.Disable("github.com/pingcap/pd/server/add-member-failed"), IsNil)
}

0 comments on commit a2f134b

Please sign in to comment.