diff --git a/etcdserver/membership/cluster.go b/etcdserver/membership/cluster.go index dccfa17f4dd..8fa380a14d6 100644 --- a/etcdserver/membership/cluster.go +++ b/etcdserver/membership/cluster.go @@ -52,6 +52,9 @@ type RaftCluster struct { // removed contains the ids of removed members in the cluster. // removed id cannot be reused. removed map[types.ID]bool + + initAddNotifyOnce *sync.Once + initAddNotifyCh chan struct{} } func NewClusterFromURLsMap(token string, urlsmap types.URLsMap) (*RaftCluster, error) { @@ -81,9 +84,11 @@ func NewClusterFromMembers(token string, id types.ID, membs []*Member) *RaftClus func NewCluster(token string) *RaftCluster { return &RaftCluster{ - token: token, - members: make(map[types.ID]*Member), - removed: make(map[types.ID]bool), + token: token, + members: make(map[types.ID]*Member), + removed: make(map[types.ID]bool), + initAddNotifyOnce: new(sync.Once), + initAddNotifyCh: make(chan struct{}), } } @@ -295,9 +300,16 @@ func (c *RaftCluster) AddMember(m *Member) { c.members[m.ID] = m + if c.initAddNotifyOnce != nil { + c.initAddNotifyOnce.Do(func() { close(c.initAddNotifyCh) }) + } plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id) } +// InitialAddNotify returns a channel that closes when +// the first member is added to the cluster +func (c *RaftCluster) InitialAddNotify() <-chan struct{} { return c.initAddNotifyCh } + // RemoveMember removes a member from the store. // The given id MUST exist, or the function panics. func (c *RaftCluster) RemoveMember(id types.ID) { diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 1fa08590548..ebbfa0f94e4 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -97,6 +97,7 @@ type raftNode struct { term uint64 lead uint64 + tickMu *sync.Mutex raftNodeConfig // a chan to send/receive snapshot @@ -133,6 +134,7 @@ type raftNodeConfig struct { func newRaftNode(cfg raftNodeConfig) *raftNode { r := &raftNode{ + tickMu: new(sync.Mutex), raftNodeConfig: cfg, // set up contention detectors for raft heartbeat message. // expect to send a heartbeat within 2 heartbeat intervals. @@ -151,6 +153,14 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { return r } +// package raft does not have locks +// prevent racey tick operation from advancing election ticks +func (r *raftNode) tick() { + r.tickMu.Lock() + r.raftNodeConfig.Node.Tick() + r.tickMu.Unlock() +} + // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { @@ -163,7 +173,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { for { select { case <-r.ticker.C: - r.Tick() + r.tick() case rd := <-r.Ready(): if rd.SoftState != nil { newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead @@ -370,16 +380,6 @@ func (r *raftNode) resumeSending() { p.Resume() } -// advanceTicksForElection advances ticks to the node for fast election. -// This reduces the time to wait for first leader election if bootstrapping the whole -// cluster, while leaving at least 1 heartbeat for possible existing leader -// to contact it. -func advanceTicksForElection(n raft.Node, electionTicks int) { - for i := 0; i < electionTicks-1; i++ { - n.Tick() - } -} - func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { var err error member := cl.MemberByName(cfg.Name) @@ -418,7 +418,6 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() - advanceTicksForElection(n, c.ElectionTick) return id, n, s, w } @@ -453,7 +452,6 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member raftStatusMu.Lock() raftStatus = n.Status raftStatusMu.Unlock() - advanceTicksForElection(n, c.ElectionTick) return id, cl, n, s, w } diff --git a/etcdserver/server.go b/etcdserver/server.go index aa2321752de..38439e8e8b6 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -179,6 +179,12 @@ type EtcdServer struct { consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned. r raftNode // uses 64-bit atomics; keep 64-bit aligned. + // advanceRaftTicks advances ticks of Raft node. + // This can be used for fast-forwarding election + // ticks in multi data-center deployments, thus + // speeding up election process. + advanceRaftTicks func(ticks int) + readych chan struct{} Cfg ServerConfig @@ -445,6 +451,12 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { hostWhitelist: cfg.HostWhitelist, } + srv.advanceRaftTicks = func(ticks int) { + for i := 0; i < ticks; i++ { + srv.r.tick() + } + } + srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} srv.be = be @@ -527,6 +539,62 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { } srv.r.transport = tr + // fresh start + if !haveWAL { + ticks := cfg.ElectionTicks - 1 + plog.Infof("%s started anew; fast-forwarding %d ticks (election ticks %d) with %d found member(s)", srv.ID(), ticks, cfg.ElectionTicks, len(cl.Members())) + srv.advanceRaftTicks(ticks) + return srv, nil + } + + srv.goAttach(func() { + select { + case <-cl.InitialAddNotify(): + singleNode := len(cl.Members()) == 1 + if !singleNode { + break // multi-node + } + + // more member-add commands may be applied + // then not single-node cluster + select { + case <-time.After(srv.Cfg.electionTimeout()): + singleNode = len(cl.Members()) == 1 + case <-tr.InitialPeerNotify(): + singleNode = false + } + + // restarted single-node + if singleNode { + if !srv.isLeader() { // and leader has not been elected + ticks := cfg.ElectionTicks - 1 + plog.Infof("%s as 1-node cluster; fast-forwarding %d ticks (election ticks %d)", srv.ID(), ticks, cfg.ElectionTicks) + srv.advanceRaftTicks(ticks) + } else { + plog.Infof("%s started as leader to 1-node cluster", srv.ID()) + } + return + } + + case <-time.After(rafthttp.ConnReadTimeout): + // slow raft config change apply + plog.Infof("%s waited %s for member add apply but timed out", srv.ID(), rafthttp.ConnReadTimeout) + return + } + + // multi-node, wait for peer connection reports + select { + case <-tr.InitialPeerNotify(): + // adjust ticks in case slow leader message receive + ticks := cfg.ElectionTicks - 3 + plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d found member(s)", srv.ID(), ticks, cfg.ElectionTicks, len(cl.Members())) + srv.advanceRaftTicks(ticks) + + case <-time.After(rafthttp.ConnReadTimeout): + // connection failed, or no active peers + plog.Infof("%s waited %s but no active peer found (or restarted 1-node cluster)", srv.ID(), rafthttp.ConnReadTimeout) + } + }) return srv, nil } diff --git a/etcdserver/util_test.go b/etcdserver/util_test.go index e0b75454c38..640b54c4100 100644 --- a/etcdserver/util_test.go +++ b/etcdserver/util_test.go @@ -86,4 +86,5 @@ func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { re func (s *nopTransporterWithActiveTime) Stop() {} func (s *nopTransporterWithActiveTime) Pause() {} func (s *nopTransporterWithActiveTime) Resume() {} +func (s *nopTransporterWithActiveTime) InitialPeerNotify() <-chan struct{} { return nil } func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am } diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 58b51f03494..17cb0851cba 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -122,7 +122,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats plog.Infof("starting peer %s...", peerID) defer plog.Infof("started peer %s", peerID) - status := newPeerStatus(peerID) + status := newPeerStatus(peerID, transport.initPeerNotifyOnce, transport.initPeerNotifyCh) picker := newURLPicker(urls) errorc := transport.ErrorC r := transport.Raft diff --git a/rafthttp/peer_status.go b/rafthttp/peer_status.go index 706144f6466..31d7f4410e6 100644 --- a/rafthttp/peer_status.go +++ b/rafthttp/peer_status.go @@ -32,11 +32,16 @@ type peerStatus struct { mu sync.Mutex // protect variables below active bool since time.Time + + once *sync.Once + notify chan struct{} } -func newPeerStatus(id types.ID) *peerStatus { +func newPeerStatus(id types.ID, once *sync.Once, notify chan struct{}) *peerStatus { return &peerStatus{ - id: id, + id: id, + once: once, + notify: notify, } } @@ -47,6 +52,13 @@ func (s *peerStatus) activate() { plog.Infof("peer %s became active", s.id) s.active = true s.since = time.Now() + + if s.once != nil { + s.once.Do(func() { + plog.Infof("notifying of active peer %q", s.id) + close(s.notify) + }) + } } } diff --git a/rafthttp/peer_test.go b/rafthttp/peer_test.go index 00e00853bf9..ac58101a94a 100644 --- a/rafthttp/peer_test.go +++ b/rafthttp/peer_test.go @@ -15,8 +15,11 @@ package rafthttp import ( + "sync" "testing" + "time" + "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/raft/raftpb" ) @@ -85,3 +88,24 @@ func TestPeerPick(t *testing.T) { } } } + +func TestInitialPeerNotify(t *testing.T) { + tr := &roundTripperRecorder{rec: testutil.NewRecorderStream()} + picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) + tp := &Transport{ + pipelineRt: tr, + initPeerNotifyOnce: &sync.Once{}, + initPeerNotifyCh: make(chan struct{}), + } + p := startTestPipeline(tp, picker) + defer p.stop() + + p.msgc <- raftpb.Message{Type: raftpb.MsgApp} + tr.rec.Wait(1) + + select { + case <-tp.InitialPeerNotify(): + case <-time.After(3 * time.Second): + t.Fatal("took too long to receive initial peer notify") + } +} diff --git a/rafthttp/pipeline_test.go b/rafthttp/pipeline_test.go index bdcdbc87053..dcc1c82e13c 100644 --- a/rafthttp/pipeline_test.go +++ b/rafthttp/pipeline_test.go @@ -301,7 +301,7 @@ func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline { peerID: types.ID(1), tr: tr, picker: picker, - status: newPeerStatus(types.ID(1)), + status: newPeerStatus(types.ID(1), tr.initPeerNotifyOnce, tr.initPeerNotifyCh), raft: &fakeRaft{}, followerStats: &stats.FollowerStats{}, errorc: make(chan error, 1), diff --git a/rafthttp/remote.go b/rafthttp/remote.go index c62c818235a..095a9db1a01 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -27,7 +27,7 @@ type remote struct { func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote { picker := newURLPicker(urls) - status := newPeerStatus(id) + status := newPeerStatus(id, tr.initPeerNotifyOnce, tr.initPeerNotifyCh) pipeline := &pipeline{ peerID: id, tr: tr, diff --git a/rafthttp/snapshot_test.go b/rafthttp/snapshot_test.go index 7e8503c0af0..5331084a94a 100644 --- a/rafthttp/snapshot_test.go +++ b/rafthttp/snapshot_test.go @@ -107,7 +107,7 @@ func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo) defer srv.Close() picker := mustNewURLPicker(t, []string{srv.URL}) - snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(types.ID(1))) + snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(types.ID(1), nil, nil)) defer snapsend.stop() snapsend.send(*sm) diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index 29ceaaafd4d..7dbac437166 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -40,7 +40,7 @@ import ( // to streamWriter. After that, streamWriter can use it to send messages // continuously, and closes it when stopped. func TestStreamWriterAttachOutgoingConn(t *testing.T) { - sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1), nil, nil), &stats.FollowerStats{}, &fakeRaft{}) // the expected initial state of streamWriter is not working if _, ok := sw.writec(); ok { t.Errorf("initial working status = %v, want false", ok) @@ -92,7 +92,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) { // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad // outgoingConn will close the outgoingConn and fall back to non-working status. func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { - sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1), nil, nil), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() wfc := newFakeWriteFlushCloser(errors.New("blah")) sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) @@ -196,7 +196,7 @@ func TestStreamReaderStopOnDial(t *testing.T) { picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), errorc: make(chan error, 1), typ: streamTypeMessage, - status: newPeerStatus(types.ID(2)), + status: newPeerStatus(types.ID(2), nil, nil), rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } tr.onResp = func() { @@ -303,7 +303,7 @@ func TestStream(t *testing.T) { srv := httptest.NewServer(h) defer srv.Close() - sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) + sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1), nil, nil), &stats.FollowerStats{}, &fakeRaft{}) defer sw.stop() h.sw = sw @@ -315,7 +315,7 @@ func TestStream(t *testing.T) { typ: tt.t, tr: tr, picker: picker, - status: newPeerStatus(types.ID(2)), + status: newPeerStatus(types.ID(2), nil, nil), recvc: recvc, propc: propc, rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 938482517ae..462af800dd1 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -87,6 +87,10 @@ type Transporter interface { ActiveSince(id types.ID) time.Time // Stop closes the connections and stops the transporter. Stop() + // InitialPeerNotify returns a channel that closes when an initial + // peer connection has been established. Use this to wait until the + // first peer connection becomes active. + InitialPeerNotify() <-chan struct{} } // Transport implements Transporter interface. It provides the functionality @@ -126,6 +130,9 @@ type Transport struct { peers map[types.ID]Peer // peers map prober probing.Prober + + initPeerNotifyOnce *sync.Once + initPeerNotifyCh chan struct{} } func (t *Transport) Start() error { @@ -148,6 +155,10 @@ func (t *Transport) Start() error { if t.DialRetryFrequency == 0 { t.DialRetryFrequency = rate.Every(100 * time.Millisecond) } + + t.initPeerNotifyOnce = &sync.Once{} + t.initPeerNotifyCh = make(chan struct{}) + return nil } @@ -375,6 +386,8 @@ func (t *Transport) Resume() { } } +func (t *Transport) InitialPeerNotify() <-chan struct{} { return t.initPeerNotifyCh } + type nopTransporter struct{} func NewNopTransporter() Transporter { @@ -394,6 +407,7 @@ func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time func (s *nopTransporter) Stop() {} func (s *nopTransporter) Pause() {} func (s *nopTransporter) Resume() {} +func (s *nopTransporter) InitialPeerNotify() <-chan struct{} { return nil } type snapTransporter struct { nopTransporter