Skip to content

Commit

Permalink
server: fix join a member with the empty name (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored and siddontang committed Mar 10, 2017
1 parent 62bc1e4 commit 7dda2f9
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 27 deletions.
8 changes: 6 additions & 2 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,22 @@ func main() {
case flag.ErrHelp:
os.Exit(0)
default:
log.Fatalf("parse cmd flags err %s\n", err)
log.Fatalf("parse cmd flags error %s\n", err)
}

err = server.InitLogger(cfg)
if err != nil {
log.Fatalf("initalize logger err %s\n", err)
log.Fatalf("initalize logger error %s\n", err)
}

server.LogPDInfo()

metricutil.Push(&cfg.Metric)

err = server.PrepareJoinCluster(cfg)
if err != nil {
log.Fatal("join error ", err)
}
svr := server.CreateServer(cfg)
err = svr.StartEtcd(api.NewHandler(svr))
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,6 @@ func (c *Config) adjust() error {
adjustString(&c.PeerUrls, defaultPeerUrls)
adjustString(&c.AdvertisePeerUrls, c.PeerUrls)

if c.Join != "" {
initialCluster, state, err := prepareJoinCluster(c)
if err != nil {
return errors.Trace(err)
}
c.InitialCluster = initialCluster
c.InitialClusterState = state
}

if len(c.InitialCluster) == 0 {
// The advertise peer urls may be http://127.0.0.1:2380,http://127.0.0.1:2381
// so the initial cluster is pd=http://127.0.0.1:2380,pd=http://127.0.0.1:2381
Expand Down
29 changes: 18 additions & 11 deletions server/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func genClientV3Config(cfg *Config) clientv3.Config {
}
}

// prepareJoinCluster sends MemberAdd command to PD cluster,
// PrepareJoinCluster sends MemberAdd command to PD cluster,
// and returns the initial configuration of the PD cluster.
//
// TL;TR: The join functionality is safe. With data, join does nothing, w/o data
Expand Down Expand Up @@ -70,30 +70,36 @@ func genClientV3Config(cfg *Config) clientv3.Config {
// What join does: return "" (as etcd will read data directory and find
// that the PD itself has been removed, so an empty string
// is fine.)
func prepareJoinCluster(cfg *Config) (string, string, error) {
func PrepareJoinCluster(cfg *Config) error {
// - A PD tries to join itself.
if cfg.Join == "" {
return nil
}

if cfg.Join == cfg.AdvertiseClientUrls {
return "", "", errors.New("join self is forbidden")
return errors.New("join self is forbidden")
}

// Cases with data directory.

initialCluster := ""
if wal.Exist(cfg.DataDir) {
return initialCluster, embed.ClusterStateFlagExisting, nil
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}

// Below are cases without data directory.

client, err := clientv3.New(genClientV3Config(cfg))
if err != nil {
return "", "", errors.Trace(err)
return errors.Trace(err)
}
defer client.Close()

listResp, err := etcdutil.ListEtcdMembers(client)
if err != nil {
return "", "", errors.Trace(err)
return errors.Trace(err)
}

existed := false
Expand All @@ -105,19 +111,19 @@ func prepareJoinCluster(cfg *Config) (string, string, error) {

// - A failed PD re-joins the previous cluster.
if existed {
return "", "", errors.New("missing data or join a duplicated pd")
return errors.New("missing data or join a duplicated pd")
}

// - A new PD joins an existing cluster.
// - A deleted PD joins to previous cluster.
addResp, err := etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls})
if err != nil {
return "", "", errors.Trace(err)
return errors.Trace(err)
}

listResp, err = etcdutil.ListEtcdMembers(client)
if err != nil {
return "", "", errors.Trace(err)
return errors.Trace(err)
}

pds := []string{}
Expand All @@ -131,6 +137,7 @@ func prepareJoinCluster(cfg *Config) (string, string, error) {
}
}
initialCluster = strings.Join(pds, ",")

return initialCluster, embed.ClusterStateFlagExisting, nil
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}
14 changes: 11 additions & 3 deletions server/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,18 @@ func startPdWith(cfg *Config) (*Server, error) {
abortCh := make(chan struct{}, 1)

go func() {
// TODO: Decouple join from cfg.adjust().
cfg.adjust()
err := cfg.adjust()
if err != nil {
errCh <- errors.Trace(err)
return
}
err = PrepareJoinCluster(cfg)
if err != nil {
errCh <- errors.Trace(err)
return
}
svr := CreateServer(cfg)
err := svr.StartEtcd(nil)
err = svr.StartEtcd(nil)
if err != nil {
errCh <- errors.Trace(err)
svr.Close()
Expand Down
12 changes: 10 additions & 2 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"net/http"
"os"
"path"
"reflect"
"strings"
"time"
Expand All @@ -41,6 +42,8 @@ import (
const (
requestTimeout = etcdutil.DefaultRequestTimeout
slowRequestTime = etcdutil.DefaultSlowRequestTime

logDirMode = 0755
)

// Version information.
Expand Down Expand Up @@ -305,9 +308,14 @@ func (rf *redirectFormatter) Format(pkg string, level capnslog.LogLevel, depth i
func (rf *redirectFormatter) Flush() {}

// setLogOutput sets output path for all logs.
func setLogOutput(path string) error {
func setLogOutput(logFile string) error {
// PD log.
log.SetOutputByName(path)
dir := path.Dir(logFile)
err := os.MkdirAll(dir, logDirMode)
if err != nil {
return errors.Trace(err)
}
log.SetOutputByName(logFile)
log.SetRotateByDay()

// ETCD log.
Expand Down

0 comments on commit 7dda2f9

Please sign in to comment.