Skip to content

Commit

Permalink
Recover from transient gossip failures
Browse files Browse the repository at this point in the history
Currently if there is any transient gossip failure in any node the
recoevry process depends on other nodes propogating the information
indirectly. In cases if these transient failures affects all the nodes
that this node has in its memberlist then this node will be permenantly
cutoff from the the gossip channel. Added node state management code in
networkdb to address these problems by trying to rejoin the cluster via
the failed nodes when there is a failure. This also necessitates the
need to add new messages called node event messages to differentiate
between node leave and node failure.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
  • Loading branch information
mrjana committed Sep 15, 2016
1 parent 971c5e0 commit 3f28ac9
Show file tree
Hide file tree
Showing 7 changed files with 654 additions and 84 deletions.
52 changes: 52 additions & 0 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package networkdb

import (
"fmt"
"time"

"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)

const broadcastTimeout = 5 * time.Second

type networkEventMessage struct {
id string
node string
Expand Down Expand Up @@ -44,6 +49,53 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
return nil
}

type nodeEventMessage struct {
msg []byte
notify chan<- struct{}
}

func (m *nodeEventMessage) Invalidates(other memberlist.Broadcast) bool {
return false
}

func (m *nodeEventMessage) Message() []byte {
return m.msg
}

func (m *nodeEventMessage) Finished() {
if m.notify != nil {
close(m.notify)
}
}

func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
nEvent := NodeEvent{
Type: event,
LTime: nDB.networkClock.Increment(),
NodeName: nDB.config.NodeName,
}

raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
if err != nil {
return err
}

notifyCh := make(chan struct{})
nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
msg: raw,
notify: notifyCh,
})

// Wait for the broadcast
select {
case <-notifyCh:
case <-time.After(broadcastTimeout):
return fmt.Errorf("timed out broadcasting node event")
}

return nil
}

type tableEventMessage struct {
id string
tname string
Expand Down
68 changes: 63 additions & 5 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
rnd "math/rand"
"net"
"strings"
"time"

Expand Down Expand Up @@ -111,6 +112,13 @@ func (nDB *NetworkDB) clusterInit() error {
RetransmitMult: config.RetransmitMult,
}

nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(nDB.nodes)
},
RetransmitMult: config.RetransmitMult,
}

mlist, err := memberlist.Create(config)
if err != nil {
return fmt.Errorf("failed to create memberlist: %v", err)
Expand All @@ -127,6 +135,7 @@ func (nDB *NetworkDB) clusterInit() error {
{reapInterval, nDB.reapState},
{config.GossipInterval, nDB.gossip},
{config.PushPullInterval, nDB.bulkSyncTables},
{1 * time.Second, nDB.reconnectNode},
} {
t := time.NewTicker(trigger.interval)
go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
Expand All @@ -143,12 +152,20 @@ func (nDB *NetworkDB) clusterJoin(members []string) error {
return fmt.Errorf("could not join node to memberlist: %v", err)
}

if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
return fmt.Errorf("failed to send node join: %v", err)
}

return nil
}

func (nDB *NetworkDB) clusterLeave() error {
mlist := nDB.memberlist

if err := nDB.sendNodeEvent(NodeEventTypeLeave); err != nil {
return fmt.Errorf("failed to send node leave: %v", err)
}

if err := mlist.Leave(time.Second); err != nil {
return err
}
Expand Down Expand Up @@ -180,6 +197,42 @@ func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, sto
}
}

func (nDB *NetworkDB) reconnectNode() {
nDB.RLock()
if len(nDB.failedNodes) == 0 {
nDB.RUnlock()
return
}

nodes := make([]*node, 0, len(nDB.failedNodes))
for _, n := range nDB.failedNodes {
nodes = append(nodes, n)
}
nDB.RUnlock()

// Update all the local state to a new time to force update on
// the node we are trying to rejoin, just in case that node
// has these in leaving/deleting state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
nDB.updateLocalStateTime()

node := nodes[randomOffset(len(nodes))]
addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}

if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil {
return
}

if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
logrus.Errorf("failed to send node join during reconnect: %v", err)
return
}

logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
nDB.bulkSync([]string{node.Name}, true)
}

func (nDB *NetworkDB) reapState() {
nDB.reapNetworks()
nDB.reapTableEntries()
Expand Down Expand Up @@ -288,7 +341,7 @@ func (nDB *NetworkDB) gossip() {
}

// Send the compound message
if err := nDB.memberlist.SendToUDP(mnode, compound); err != nil {
if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
}
}
Expand Down Expand Up @@ -323,7 +376,7 @@ func (nDB *NetworkDB) bulkSyncTables() {
continue
}

completed, err := nDB.bulkSync(nid, nodes, false)
completed, err := nDB.bulkSync(nodes, false)
if err != nil {
logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
continue
Expand All @@ -350,7 +403,7 @@ func (nDB *NetworkDB) bulkSyncTables() {
}
}

func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string, error) {
func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
if !all {
// If not all, then just pick one.
nodes = nDB.mRandomNodes(1, nodes)
Expand Down Expand Up @@ -388,7 +441,12 @@ func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string,
func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
var msgs [][]byte

logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node)
var unsolMsg string
if unsolicited {
unsolMsg = "unsolicited"
}

logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node)

nDB.RLock()
mnode := nDB.nodes[node]
Expand Down Expand Up @@ -454,7 +512,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()

err = nDB.memberlist.SendToTCP(mnode, buf)
err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
Expand Down
87 changes: 85 additions & 2 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,56 @@ func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}

func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
nDB.Lock()
defer nDB.Unlock()

for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
if n, ok := nodes[nEvent.NodeName]; ok {
if n.ltime >= nEvent.LTime {
return nil
}

delete(nDB.failedNodes, n.Name)
return n
}
}

return nil
}

func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)

n := nDB.checkAndGetNode(nEvent)
if n == nil {
return false
}

n.ltime = nEvent.LTime

switch nEvent.Type {
case NodeEventTypeJoin:
nDB.Lock()
nDB.nodes[n.Name] = n
nDB.Unlock()
return true
case NodeEventTypeLeave:
nDB.Lock()
nDB.leftNodes[n.Name] = n
nDB.Unlock()
return true
}

return false
}

func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
// Update our local clock if the received messages has newer
// time.
Expand Down Expand Up @@ -188,6 +238,27 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
}
}

func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
var nEvent NodeEvent
if err := proto.Unmarshal(buf, &nEvent); err != nil {
logrus.Errorf("Error decoding node event message: %v", err)
return
}

if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
var err error
buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
if err != nil {
logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
return
}

nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
msg: buf,
})
}
}

func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
var nEvent NetworkEvent
if err := proto.Unmarshal(buf, &nEvent); err != nil {
Expand Down Expand Up @@ -256,6 +327,8 @@ func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
}

switch mType {
case MessageTypeNodeEvent:
nDB.handleNodeMessage(data)
case MessageTypeNetworkEvent:
nDB.handleNetworkMessage(data)
case MessageTypeTableEvent:
Expand All @@ -278,15 +351,18 @@ func (d *delegate) NotifyMsg(buf []byte) {
}

func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
return msgs
}

func (d *delegate) LocalState(join bool) []byte {
d.nDB.RLock()
defer d.nDB.RUnlock()

pp := NetworkPushPull{
LTime: d.nDB.networkClock.Time(),
LTime: d.nDB.networkClock.Time(),
NodeName: d.nDB.config.NodeName,
}

for name, nn := range d.nDB.networks {
Expand Down Expand Up @@ -336,6 +412,13 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
d.nDB.networkClock.Witness(pp.LTime)
}

nodeEvent := &NodeEvent{
LTime: pp.LTime,
NodeName: pp.NodeName,
Type: NodeEventTypeJoin,
}
d.nDB.handleNodeEvent(nodeEvent)

for _, n := range pp.Networks {
nEvent := &NetworkEvent{
LTime: n.LTime,
Expand Down
26 changes: 20 additions & 6 deletions networkdb/event_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,31 @@ type eventDelegate struct {
nDB *NetworkDB
}

func (e *eventDelegate) NotifyJoin(n *memberlist.Node) {
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
e.nDB.Lock()
e.nDB.nodes[n.Name] = n
// In case the node is rejoining after a failure or leave,
// wait until an explicit join message arrives before adding
// it to the nodes just to make sure this is not a stale
// join. If you don't know about this node add it immediately.
_, fOk := e.nDB.failedNodes[mn.Name]
_, lOk := e.nDB.leftNodes[mn.Name]
if fOk || lOk {
e.nDB.Unlock()
return
}

e.nDB.nodes[mn.Name] = &node{Node: *mn}
e.nDB.Unlock()
}

func (e *eventDelegate) NotifyLeave(n *memberlist.Node) {
e.nDB.deleteNodeTableEntries(n.Name)
e.nDB.deleteNetworkNodeEntries(n.Name)
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
e.nDB.deleteNodeTableEntries(mn.Name)
e.nDB.deleteNetworkEntriesForNode(mn.Name)
e.nDB.Lock()
delete(e.nDB.nodes, n.Name)
if n, ok := e.nDB.nodes[mn.Name]; ok {
delete(e.nDB.nodes, mn.Name)
e.nDB.failedNodes[mn.Name] = n
}
e.nDB.Unlock()
}

Expand Down
Loading

0 comments on commit 3f28ac9

Please sign in to comment.