Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: adjust election timeout on restart #9364

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions etcdserver/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type RaftCluster struct {
// removed contains the ids of removed members in the cluster.
// removed id cannot be reused.
removed map[types.ID]bool

initAddNotifyOnce *sync.Once
initAddNotifyCh chan struct{}
}

func NewClusterFromURLsMap(token string, urlsmap types.URLsMap) (*RaftCluster, error) {
Expand Down Expand Up @@ -81,9 +84,11 @@ func NewClusterFromMembers(token string, id types.ID, membs []*Member) *RaftClus

func NewCluster(token string) *RaftCluster {
return &RaftCluster{
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
token: token,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
initAddNotifyOnce: new(sync.Once),
initAddNotifyCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -295,9 +300,16 @@ func (c *RaftCluster) AddMember(m *Member) {

c.members[m.ID] = m

if c.initAddNotifyOnce != nil {
c.initAddNotifyOnce.Do(func() { close(c.initAddNotifyCh) })
}
plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id)
}

// InitialAddNotify returns a channel that closes when
// the first member is added to the cluster
func (c *RaftCluster) InitialAddNotify() <-chan struct{} { return c.initAddNotifyCh }

// RemoveMember removes a member from the store.
// The given id MUST exist, or the function panics.
func (c *RaftCluster) RemoveMember(id types.ID) {
Expand Down
24 changes: 11 additions & 13 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type raftNode struct {
term uint64
lead uint64

tickMu *sync.Mutex
raftNodeConfig

// a chan to send/receive snapshot
Expand Down Expand Up @@ -133,6 +134,7 @@ type raftNodeConfig struct {

func newRaftNode(cfg raftNodeConfig) *raftNode {
r := &raftNode{
tickMu: new(sync.Mutex),
raftNodeConfig: cfg,
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
Expand All @@ -151,6 +153,14 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
return r
}

// package raft does not have locks
// prevent racey tick operation from advancing election ticks
func (r *raftNode) tick() {
r.tickMu.Lock()
r.raftNodeConfig.Node.Tick()
r.tickMu.Unlock()
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
Expand All @@ -163,7 +173,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
for {
select {
case <-r.ticker.C:
r.Tick()
r.tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
Expand Down Expand Up @@ -370,16 +380,6 @@ func (r *raftNode) resumeSending() {
p.Resume()
}

// advanceTicksForElection advances ticks to the node for fast election.
// This reduces the time to wait for first leader election if bootstrapping the whole
// cluster, while leaving at least 1 heartbeat for possible existing leader
// to contact it.
func advanceTicksForElection(n raft.Node, electionTicks int) {
for i := 0; i < electionTicks-1; i++ {
n.Tick()
}
}

func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cl.MemberByName(cfg.Name)
Expand Down Expand Up @@ -418,7 +418,6 @@ func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
advanceTicksForElection(n, c.ElectionTick)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still should advanceTicks for newly start node. is there a reason not to do so?

return id, n, s, w
}

Expand Down Expand Up @@ -453,7 +452,6 @@ func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *member
raftStatusMu.Lock()
raftStatus = n.Status
raftStatusMu.Unlock()
advanceTicksForElection(n, c.ElectionTick)
return id, cl, n, s, w
}

Expand Down
68 changes: 68 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ type EtcdServer struct {
consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
r raftNode // uses 64-bit atomics; keep 64-bit aligned.

// advanceRaftTicks advances ticks of Raft node.
// This can be used for fast-forwarding election
// ticks in multi data-center deployments, thus
// speeding up election process.
advanceRaftTicks func(ticks int)

readych chan struct{}
Cfg ServerConfig

Expand Down Expand Up @@ -445,6 +451,12 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
hostWhitelist: cfg.HostWhitelist,
}

srv.advanceRaftTicks = func(ticks int) {
for i := 0; i < ticks; i++ {
srv.r.tick()
}
}

srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}

srv.be = be
Expand Down Expand Up @@ -527,6 +539,62 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}
srv.r.transport = tr

// fresh start
if !haveWAL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to care about restart vs fresh start?

see #9364 (comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just easier, so that fresh start does not need to synchronize with peer connection reports. But as you suggested, let me simplify the logic (#9364 (comment)).

ticks := cfg.ElectionTicks - 1
plog.Infof("%s started anew; fast-forwarding %d ticks (election ticks %d) with %d found member(s)", srv.ID(), ticks, cfg.ElectionTicks, len(cl.Members()))
srv.advanceRaftTicks(ticks)
return srv, nil
}

srv.goAttach(func() {
select {
case <-cl.InitialAddNotify():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty complicated. let us just get the peer list from the existing snapshot. we do not need to ensure all the configuration in the wal file are executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason for that is reconfiguration in infrequent. and moving from one -> N nodes cluster is even more infrequent. snapshot will contain the correct information 99% of the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to cover all cases where there's no snapshot (which needs to populate member lists from WAL). But, agree that this should be simplified by loading members from snapshot. Will rework on this.

singleNode := len(cl.Members()) == 1
if !singleNode {
break // multi-node
}

// more member-add commands may be applied
// then not single-node cluster
select {
case <-time.After(srv.Cfg.electionTimeout()):
singleNode = len(cl.Members()) == 1
case <-tr.InitialPeerNotify():
singleNode = false
}

// restarted single-node
if singleNode {
if !srv.isLeader() { // and leader has not been elected
ticks := cfg.ElectionTicks - 1
plog.Infof("%s as 1-node cluster; fast-forwarding %d ticks (election ticks %d)", srv.ID(), ticks, cfg.ElectionTicks)
srv.advanceRaftTicks(ticks)
} else {
plog.Infof("%s started as leader to 1-node cluster", srv.ID())
}
return
}

case <-time.After(rafthttp.ConnReadTimeout):
// slow raft config change apply
plog.Infof("%s waited %s for member add apply but timed out", srv.ID(), rafthttp.ConnReadTimeout)
return
}

// multi-node, wait for peer connection reports
select {
case <-tr.InitialPeerNotify():
// adjust ticks in case slow leader message receive
ticks := cfg.ElectionTicks - 3
plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d found member(s)", srv.ID(), ticks, cfg.ElectionTicks, len(cl.Members()))
srv.advanceRaftTicks(ticks)

case <-time.After(rafthttp.ConnReadTimeout):
// connection failed, or no active peers
plog.Infof("%s waited %s but no active peer found (or restarted 1-node cluster)", srv.ID(), rafthttp.ConnReadTimeout)
}
})
return srv, nil
}

Expand Down
1 change: 1 addition & 0 deletions etcdserver/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,5 @@ func (s *nopTransporterWithActiveTime) ActiveSince(id types.ID) time.Time { re
func (s *nopTransporterWithActiveTime) Stop() {}
func (s *nopTransporterWithActiveTime) Pause() {}
func (s *nopTransporterWithActiveTime) Resume() {}
func (s *nopTransporterWithActiveTime) InitialPeerNotify() <-chan struct{} { return nil }
func (s *nopTransporterWithActiveTime) reset(am map[types.ID]time.Time) { s.activeMap = am }
2 changes: 1 addition & 1 deletion rafthttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
plog.Infof("starting peer %s...", peerID)
defer plog.Infof("started peer %s", peerID)

status := newPeerStatus(peerID)
status := newPeerStatus(peerID, transport.initPeerNotifyOnce, transport.initPeerNotifyCh)
picker := newURLPicker(urls)
errorc := transport.ErrorC
r := transport.Raft
Expand Down
16 changes: 14 additions & 2 deletions rafthttp/peer_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ type peerStatus struct {
mu sync.Mutex // protect variables below
active bool
since time.Time

once *sync.Once
notify chan struct{}
}

func newPeerStatus(id types.ID) *peerStatus {
func newPeerStatus(id types.ID, once *sync.Once, notify chan struct{}) *peerStatus {
return &peerStatus{
id: id,
id: id,
once: once,
notify: notify,
}
}

Expand All @@ -47,6 +52,13 @@ func (s *peerStatus) activate() {
plog.Infof("peer %s became active", s.id)
s.active = true
s.since = time.Now()

if s.once != nil {
s.once.Do(func() {
plog.Infof("notifying of active peer %q", s.id)
close(s.notify)
})
}
}
}

Expand Down
24 changes: 24 additions & 0 deletions rafthttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package rafthttp

import (
"sync"
"testing"
"time"

"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -85,3 +88,24 @@ func TestPeerPick(t *testing.T) {
}
}
}

func TestInitialPeerNotify(t *testing.T) {
tr := &roundTripperRecorder{rec: testutil.NewRecorderStream()}
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
tp := &Transport{
pipelineRt: tr,
initPeerNotifyOnce: &sync.Once{},
initPeerNotifyCh: make(chan struct{}),
}
p := startTestPipeline(tp, picker)
defer p.stop()

p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
tr.rec.Wait(1)

select {
case <-tp.InitialPeerNotify():
case <-time.After(3 * time.Second):
t.Fatal("took too long to receive initial peer notify")
}
}
2 changes: 1 addition & 1 deletion rafthttp/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
peerID: types.ID(1),
tr: tr,
picker: picker,
status: newPeerStatus(types.ID(1)),
status: newPeerStatus(types.ID(1), tr.initPeerNotifyOnce, tr.initPeerNotifyCh),
raft: &fakeRaft{},
followerStats: &stats.FollowerStats{},
errorc: make(chan error, 1),
Expand Down
2 changes: 1 addition & 1 deletion rafthttp/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type remote struct {

func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
picker := newURLPicker(urls)
status := newPeerStatus(id)
status := newPeerStatus(id, tr.initPeerNotifyOnce, tr.initPeerNotifyCh)
pipeline := &pipeline{
peerID: id,
tr: tr,
Expand Down
2 changes: 1 addition & 1 deletion rafthttp/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo)
defer srv.Close()

picker := mustNewURLPicker(t, []string{srv.URL})
snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(types.ID(1)))
snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(types.ID(1), nil, nil))
defer snapsend.stop()

snapsend.send(*sm)
Expand Down
10 changes: 5 additions & 5 deletions rafthttp/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
// to streamWriter. After that, streamWriter can use it to send messages
// continuously, and closes it when stopped.
func TestStreamWriterAttachOutgoingConn(t *testing.T) {
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1), nil, nil), &stats.FollowerStats{}, &fakeRaft{})
// the expected initial state of streamWriter is not working
if _, ok := sw.writec(); ok {
t.Errorf("initial working status = %v, want false", ok)
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestStreamWriterAttachOutgoingConn(t *testing.T) {
// TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
// outgoingConn will close the outgoingConn and fall back to non-working status.
func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1), nil, nil), &stats.FollowerStats{}, &fakeRaft{})
defer sw.stop()
wfc := newFakeWriteFlushCloser(errors.New("blah"))
sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestStreamReaderStopOnDial(t *testing.T) {
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
errorc: make(chan error, 1),
typ: streamTypeMessage,
status: newPeerStatus(types.ID(2)),
status: newPeerStatus(types.ID(2), nil, nil),
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
tr.onResp = func() {
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestStream(t *testing.T) {
srv := httptest.NewServer(h)
defer srv.Close()

sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1), nil, nil), &stats.FollowerStats{}, &fakeRaft{})
defer sw.stop()
h.sw = sw

Expand All @@ -315,7 +315,7 @@ func TestStream(t *testing.T) {
typ: tt.t,
tr: tr,
picker: picker,
status: newPeerStatus(types.ID(2)),
status: newPeerStatus(types.ID(2), nil, nil),
recvc: recvc,
propc: propc,
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
Expand Down
Loading