Skip to content

Commit

Permalink
two bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Oct 5, 2017
1 parent e01e09d commit 12017b2
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 54 deletions.
85 changes: 67 additions & 18 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/tinode/chat/server/store/types"
)

const DEFAULT_CLUSTER_RECONNECT = 1000 * time.Millisecond
const DEFAULT_CLUSTER_RECONNECT = 200 * time.Millisecond
const CLUSTER_HASH_REPLICAS = 20

type ClusterNodeConfig struct {
Expand All @@ -37,10 +37,10 @@ type ClusterNode struct {

// RPC endpoint
endpoint *rpc.Client
// true if the endpoint is believed to be connected
// True if the endpoint is believed to be connected
connected bool
// Error returned by the last call, could be nil
lastError error
// True if a go routine is trying to reconnect the node
reconnecting bool
// TCP address in the form host:port
address string
// Name of the node
Expand Down Expand Up @@ -106,21 +106,26 @@ type ClusterResp struct {
func (n *ClusterNode) reconnect() {
var reconnTicker *time.Ticker

// Avoid parallel reconnection threads
n.lock.Lock()
if n.reconnecting {
n.lock.Unlock()
return
}
n.reconnecting = true
n.lock.Unlock()

var count = 0
var err error
for {
if n.connected {
// Avoid parallel reconnection threads
return
}

// Attempt to reconnect right away
if n.endpoint, err = rpc.Dial("tcp", n.address); err == nil {
if reconnTicker != nil {
reconnTicker.Stop()
}
n.lock.Lock()
n.connected = true
n.reconnecting = false
n.lock.Unlock()
log.Printf("cluster: connection to '%s' established", n.name)
return
Expand All @@ -140,7 +145,10 @@ func (n *ClusterNode) reconnect() {
if n.endpoint != nil {
n.endpoint.Close()
}
n.lock.Lock()
n.connected = false
n.reconnecting = false
n.lock.Unlock()
log.Printf("cluster: node '%s' shut down completed", n.name)
return
}
Expand All @@ -149,7 +157,7 @@ func (n *ClusterNode) reconnect() {

func (n *ClusterNode) call(proc string, msg interface{}, resp interface{}) error {
if !n.connected {
return errors.New("cluster: node not connected")
return errors.New("cluster: node '" + n.name + "' not connected")
}

if err := n.endpoint.Call(proc, msg, resp); err != nil {
Expand All @@ -168,6 +176,51 @@ func (n *ClusterNode) call(proc string, msg interface{}, resp interface{}) error
return nil
}

func (n *ClusterNode) callAsync(proc string, msg interface{}, resp interface{}, done chan *rpc.Call) *rpc.Call {
if done != nil && cap(done) == 0 {
log.Panic("cluster: RPC done channel is unbuffered")
}

if !n.connected {
call := &rpc.Call{
ServiceMethod: proc,
Args: msg,
Reply: resp,
Error: errors.New("cluster: node '" + n.name + "' not connected"),
Done: done,
}
if done != nil {
done <- call
}
return call
}

myDone := make(chan *rpc.Call, 1)
go func() {
select {
case call := <-myDone:
if call.Error != nil {
n.lock.Lock()
if n.connected {
n.endpoint.Close()
n.connected = false
go n.reconnect()
}
n.lock.Unlock()
}

if done != nil {
done <- call
}
}
}()

call := n.endpoint.Go(proc, msg, resp, myDone)
call.Done = done

return call
}

// Proxy forwards message to master
func (n *ClusterNode) forward(msg *ClusterReq) error {
log.Printf("cluster: forwarding request to node '%s'", n.name)
Expand Down Expand Up @@ -279,7 +332,7 @@ func (c *Cluster) nodeForTopic(topic string) *ClusterNode {
} else {
node := globals.cluster.nodes[key]
if node == nil {
log.Println("cluster: node has disconnected")
log.Println("cluster: no node for topic", topic, key)
}
return node
}
Expand Down Expand Up @@ -371,14 +424,10 @@ func clusterInit(configString json.RawMessage, self *string) {
}
globals.cluster = &Cluster{
thisNodeName: thisName,
ring: rh.New(CLUSTER_HASH_REPLICAS, nil),
nodes: make(map[string]*ClusterNode)}

ringKeys := make([]string, 0, len(config.Nodes))
listenOn := ""
for _, host := range config.Nodes {
ringKeys = append(ringKeys, host.Name)

if host.Name == globals.cluster.thisNodeName {
listenOn = host.Addr
// Don't create a cluster member for this local instance
Expand All @@ -400,7 +449,7 @@ func clusterInit(configString json.RawMessage, self *string) {
}

if !globals.cluster.failoverInit(config.Failover) {
globals.cluster.ring.Add(ringKeys...)
globals.cluster.rehash(nil)
}

addr, err := net.ResolveTCPAddr("tcp", listenOn)
Expand Down Expand Up @@ -443,7 +492,7 @@ func (sess *Session) rpcWriteLoop() {
}
// The error is returned if the remote node is down. Which means the remote
// session is also disconnected.
if err := sess.rpcnode.endpoint.Call("Cluster.Proxy",
if err := sess.rpcnode.call("Cluster.Proxy",
&ClusterResp{Msg: msg, FromSID: sess.sid}, &unused); err != nil {

log.Println("sess.writeRPC: " + err.Error())
Expand All @@ -452,7 +501,7 @@ func (sess *Session) rpcWriteLoop() {
case msg := <-sess.stop:
// Shutdown is requested, don't care if the message is delivered
if msg != nil {
sess.rpcnode.endpoint.Call("Cluster.Proxy", &ClusterResp{Msg: msg, FromSID: sess.sid},
sess.rpcnode.call("Cluster.Proxy", &ClusterResp{Msg: msg, FromSID: sess.sid},
&unused)
}
return
Expand Down
84 changes: 49 additions & 35 deletions server/cluster_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,27 @@ func (c *Cluster) failoverInit(config *ClusterFailoverConfig) bool {
return false
}
if len(c.nodes) < 2 {
log.Printf("cluster: failover disabled; need at least 3 nodes, got %d", len(c.nodes)+1)
log.Printf("cluster: failover disabled; need at least 3 nodes, got %d only", len(c.nodes)+1)
return false
}

// Generate ring hash on the assumption that all nodes are alive and well.
// This minimizes rehashing during normal operations.
var activeNodes []string
for _, node := range c.nodes {
activeNodes = append(activeNodes, node.name)
}
activeNodes = append(activeNodes, c.thisNodeName)
c.rehash(activeNodes)

// Random heartbeat ticker: 0.75 * config.HeartBeat + random(0, 0.5 * config.HeartBeat)
rand.Seed(time.Now().UnixNano())
hb := time.Duration(config.Heartbeat) * time.Millisecond
hb = (hb >> 1) + (hb >> 2) + time.Duration(rand.Intn(int(hb>>1)))

c.fo = &ClusterFailover{
heartBeat: time.Duration(config.Heartbeat) * time.Millisecond,
activeNodes: activeNodes,
heartBeat: hb,
voteTimeout: config.VoteAfter,
nodeFailCountLimit: config.NodeFailAfter,
leaderPing: make(chan *ClusterPing, config.VoteAfter),
Expand Down Expand Up @@ -141,7 +156,6 @@ func (c *Cluster) sendPings() {
node.failCount++
if node.failCount == c.fo.nodeFailCountLimit {
// Node failed too many times
log.Printf("cluster: node %s failed too many times", node.name)
rehash = true
}
} else {
Expand Down Expand Up @@ -171,53 +185,49 @@ func (c *Cluster) sendPings() {
}

func (c *Cluster) electLeader() {
// Increment the term and clear the leader
// Increment the term (voting for myself in this term) and clear the leader
c.fo.term++
c.fo.leader = ""

log.Println("cluster: leading new election for term", c.fo.term)

nodeCount := len(c.nodes)
// Number of votes needed to elect the leader
expectVotes := (nodeCount+1)>>1 + 1
done := make(chan *rpc.Call, nodeCount)

// Send async requests for votes to other nodes
for _, node := range c.nodes {
if node.endpoint == nil {
// Nodes may not have been connected yet.
continue
}
response := ClusterVoteResponse{}
node.endpoint.Go("Cluster.Vote", &ClusterVoteRequest{
node.callAsync("Cluster.Vote", &ClusterVoteRequest{
Node: c.thisNodeName,
Term: c.fo.term}, &response, done)

}

// Number of votes received (1 vote for self)
voteCount := 1
timeout := time.NewTimer(c.fo.heartBeat>>1 + c.fo.heartBeat)
// Wait for one of the following
// 1. More than half of the nodes voting in favor
// 2. Timeout.
// 2. All nodes responded.
// 3. Timeout.
for i := 0; i < nodeCount && voteCount < expectVotes; {
select {
case call := <-done:
if call.Error == nil {
if call.Reply.(*ClusterVoteResponse).Result {
// Vote in my favor
voteCount++
log.Printf("cluster: %d vote(s) in my favor", voteCount)
} else {
// Vote against me
if c.fo.term < call.Reply.(*ClusterVoteResponse).Term {
// Abandon vote: this node's term is behind the cluster
i = nodeCount
voteCount = 0
c.fo.term = call.Reply.(*ClusterVoteResponse).Term
}
log.Println("cluster: vote against me")
}
} else {
// log.Println("cluster: failed to contact node", call.Error)
}

i++
case <-timeout.C:
// break the loop
Expand All @@ -228,22 +238,20 @@ func (c *Cluster) electLeader() {
if voteCount >= expectVotes {
// Current node elected as the leader
c.fo.leader = c.thisNodeName
log.Printf("Elected myself as a new leader with %d votes", voteCount)
log.Println("Elected myself as a new leader")
}
}

// Go routine that processes calls related to leader election and maintenance.
func (c *Cluster) run() {
// Heartbeat ticker
rand.Seed(time.Now().UnixNano())

// Random ticker 0.75 * heartBeat + random(0, 0.5 * heartBeat)
ticker := time.NewTicker((c.fo.heartBeat >> 1) + (c.fo.heartBeat >> 2) +
time.Duration(rand.Intn(int(c.fo.heartBeat>>1))))
ticker := time.NewTicker(c.fo.heartBeat)

missed := 0
// Don't rehash immediately on the first ping. If this node just came onlyne, leader will
// account it on the next ping. Otherwise it will be rehashing twice.
rehashSkipped := false

for {
select {
case <-ticker.C:
Expand All @@ -254,34 +262,39 @@ func (c *Cluster) run() {
missed++
if missed >= c.fo.voteTimeout {
// Elect the leader
log.Println("cluster: initiating election after failed pings:", missed)
c.electLeader()
missed = 0
c.electLeader()
}
}
case ping := <-c.fo.leaderPing:
// Ping from a leader.

if ping.Term < c.fo.term {
// This is a ping from a stale leader. Ignore.
log.Println("cluster: ping from a stale leader", ping.Term, c.fo.term, ping.Leader, c.fo.leader)
continue
}

if ping.Term > c.fo.term {
c.fo.term = ping.Term
c.fo.leader = ping.Leader
log.Printf("cluster: leader set to '%s'", c.fo.leader)
} else if ping.Leader != c.fo.leader && c.fo.leader != "" {
// Wrong leader. It's a bug, should never happen!
log.Printf("cluster: wrong leader '%s' while expecting '%s'; term %d",
ping.Leader, c.fo.leader, ping.Term)
log.Printf("cluster: leader '%s' elected", c.fo.leader)
} else if ping.Leader != c.fo.leader {
if c.fo.leader != "" {
// Wrong leader. It's a bug, should never happen!
log.Printf("cluster: wrong leader '%s' while expecting '%s'; term %d",
ping.Leader, c.fo.leader, ping.Term)
} else {
log.Printf("cluster: leader set to '%s'", ping.Leader)
}
c.fo.leader = ping.Leader
}

missed = 0
if ping.Signature != c.ring.Signature() {
if rehashSkipped {
log.Println("cluster: rehashing at request of a leader", ping.Leader, ping.Nodes)
log.Println("cluster: rehashing at a request of",
ping.Leader, ping.Nodes, ping.Signature, c.ring.Signature())
c.rehash(ping.Nodes)
rehashSkipped = false

Expand All @@ -293,15 +306,16 @@ func (c *Cluster) run() {

case vreq := <-c.fo.electionVote:
if c.fo.term < vreq.req.Term {
// This is a new election. This node has not voted yet. Vote for the requestor.
// This is a new election. This node has not voted yet. Vote for the requestor and
// clear the current leader.
log.Printf("Voting YES for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term)
c.fo.term = vreq.req.Term
c.fo.leader = ""
vreq.resp <- ClusterVoteResponse{Result: true, Term: c.fo.term}
log.Printf("Voted YES for %s, terms %d, %d", vreq.req.Node, c.fo.term, vreq.req.Term)
} else {
// This node has voted already, reject.
// This node has voted already or stale election, reject.
log.Printf("Voting NO for %s, my term %d, vote term %d", vreq.req.Node, c.fo.term, vreq.req.Term)
vreq.resp <- ClusterVoteResponse{Result: false, Term: c.fo.term}

log.Printf("Voted NO for %s, terms %d, %d", vreq.req.Node, c.fo.term, vreq.req.Term)
}
case <-c.fo.done:
return
Expand Down
6 changes: 5 additions & 1 deletion server/run-cluster.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#!/bin/bash

./server --config=./tinode-two.conf
# Start test cluster on one host. This is provided just as an example.

./server -config=./cluster.conf -cluster_self=one -listen=:6060 &
./server -config=./cluster.conf -cluster_self=two -listen=:6061 &
./server -config=./cluster.conf -cluster_self=three -listen=:6062 &

0 comments on commit 12017b2

Please sign in to comment.