Skip to content

Commit b499aa9

Browse files
author
Flavio Crisciani
committed
Cleanup node management logic
Created method to handle the node state change with cleanup operation associated. Realign testing client with the new diagnostic interface Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
1 parent 822e5b5 commit b499aa9

File tree

8 files changed

+323
-104
lines changed

8 files changed

+323
-104
lines changed

diagnose/server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (s *Server) EnableDebug(ip string, port int) {
9191
}
9292

9393
logrus.Infof("Starting the diagnose server listening on %d for commands", port)
94-
srv := &http.Server{Addr: fmt.Sprintf("127.0.0.1:%d", port), Handler: s}
94+
srv := &http.Server{Addr: fmt.Sprintf("%s:%d", ip, port), Handler: s}
9595
s.srv = srv
9696
s.enable = 1
9797
go func(n *Server) {
@@ -101,7 +101,6 @@ func (s *Server) EnableDebug(ip string, port int) {
101101
atomic.SwapInt32(&n.enable, 0)
102102
}
103103
}(s)
104-
105104
}
106105

107106
// DisableDebug stop the dubug and closes the tcp socket

networkdb/delegate.go

Lines changed: 23 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,92 +16,61 @@ func (d *delegate) NodeMeta(limit int) []byte {
1616
return []byte{}
1717
}
1818

19-
// getNode searches the node inside the tables
20-
// returns true if the node was respectively in the active list, explicit node leave list or failed list
21-
func (nDB *NetworkDB) getNode(nEvent *NodeEvent, extract bool) (bool, bool, bool, *node) {
22-
var active bool
23-
var left bool
24-
var failed bool
25-
26-
for _, nodes := range []map[string]*node{
27-
nDB.failedNodes,
28-
nDB.leftNodes,
29-
nDB.nodes,
30-
} {
31-
if n, ok := nodes[nEvent.NodeName]; ok {
32-
active = &nodes == &nDB.nodes
33-
left = &nodes == &nDB.leftNodes
34-
failed = &nodes == &nDB.failedNodes
35-
if n.ltime >= nEvent.LTime {
36-
return active, left, failed, nil
37-
}
38-
if extract {
39-
delete(nodes, n.Name)
40-
}
41-
return active, left, failed, n
42-
}
43-
}
44-
return active, left, failed, nil
45-
}
46-
4719
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
4820
// Update our local clock if the received messages has newer
4921
// time.
5022
nDB.networkClock.Witness(nEvent.LTime)
5123

5224
nDB.RLock()
53-
active, left, _, n := nDB.getNode(nEvent, false)
25+
defer nDB.RUnlock()
26+
27+
// check if the node exists
28+
n, _, _ := nDB.findNode(nEvent.NodeName)
5429
if n == nil {
55-
nDB.RUnlock()
5630
return false
5731
}
58-
nDB.RUnlock()
32+
33+
// check if the event is fresh
34+
if n.ltime >= nEvent.LTime {
35+
return false
36+
}
37+
38+
// If we are here means that the event is fresher and the node is known. Update the laport time
39+
n.ltime = nEvent.LTime
40+
5941
// If it is a node leave event for a manager and this is the only manager we
6042
// know of we want the reconnect logic to kick in. In a single manager
6143
// cluster manager's gossip can't be bootstrapped unless some other node
6244
// connects to it.
6345
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
6446
for _, ip := range nDB.bootStrapIP {
6547
if ip.Equal(n.Addr) {
66-
n.ltime = nEvent.LTime
6748
return true
6849
}
6950
}
7051
}
7152

72-
n.ltime = nEvent.LTime
73-
7453
switch nEvent.Type {
7554
case NodeEventTypeJoin:
76-
if active {
77-
// the node is already marked as active nothing to do
55+
moved, err := nDB.changeNodeState(n.Name, nodeActiveState)
56+
if err != nil {
57+
logrus.WithError(err).Error("unable to find the node to move")
7858
return false
7959
}
80-
nDB.Lock()
81-
// Because the lock got released on the previous check we have to do it again and re verify the status of the node
82-
// All of this is to avoid a big lock on the function
83-
if active, _, _, n = nDB.getNode(nEvent, true); !active && n != nil {
84-
n.reapTime = 0
85-
nDB.nodes[n.Name] = n
60+
if moved {
8661
logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
8762
}
88-
nDB.Unlock()
89-
return true
63+
return moved
9064
case NodeEventTypeLeave:
91-
if left {
92-
// the node is already marked as left nothing to do.
65+
moved, err := nDB.changeNodeState(n.Name, nodeLeftState)
66+
if err != nil {
67+
logrus.WithError(err).Error("unable to find the node to move")
9368
return false
9469
}
95-
nDB.Lock()
96-
// Because the lock got released on the previous check we have to do it again and re verify the status of the node
97-
// All of this is to avoid a big lock on the function
98-
if _, left, _, n = nDB.getNode(nEvent, true); !left && n != nil {
99-
n.reapTime = nodeReapInterval
100-
nDB.leftNodes[n.Name] = n
70+
if moved {
10171
logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
10272
}
103-
nDB.Unlock()
104-
return true
73+
return moved
10574
}
10675

10776
return false

networkdb/event_delegate.go

Lines changed: 17 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,6 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
2121
}
2222
}
2323

24-
func (e *eventDelegate) purgeReincarnation(mn *memberlist.Node) {
25-
for name, node := range e.nDB.failedNodes {
26-
if node.Addr.Equal(mn.Addr) {
27-
logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", mn.Name, mn.Addr, name, node.Addr)
28-
delete(e.nDB.failedNodes, name)
29-
return
30-
}
31-
}
32-
33-
for name, node := range e.nDB.leftNodes {
34-
if node.Addr.Equal(mn.Addr) {
35-
logrus.Infof("Node %s/%s, is the new incarnation of the shutdown node %s/%s", mn.Name, mn.Addr, name, node.Addr)
36-
delete(e.nDB.leftNodes, name)
37-
return
38-
}
39-
}
40-
}
41-
4224
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
4325
logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
4426
e.broadcastNodeEvent(mn.Addr, opCreate)
@@ -57,44 +39,35 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
5739
// Every node has a unique ID
5840
// Check on the base of the IP address if the new node that joined is actually a new incarnation of a previous
5941
// failed or shutdown one
60-
e.purgeReincarnation(mn)
42+
e.nDB.purgeReincarnation(mn)
6143

6244
e.nDB.nodes[mn.Name] = &node{Node: *mn}
6345
logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
6446
}
6547

6648
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
67-
var failed bool
6849
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
6950
e.broadcastNodeEvent(mn.Addr, opDelete)
70-
// The node left or failed, delete all the entries created by it.
71-
// If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted
72-
// If the node instead left because was going down, then it makes sense to just delete all its state
51+
7352
e.nDB.Lock()
7453
defer e.nDB.Unlock()
75-
e.nDB.deleteNodeFromNetworks(mn.Name)
76-
e.nDB.deleteNodeTableEntries(mn.Name)
77-
if n, ok := e.nDB.nodes[mn.Name]; ok {
78-
delete(e.nDB.nodes, mn.Name)
79-
80-
// Check if a new incarnation of the same node already joined
81-
// In that case this node can simply be removed and no further action are needed
82-
for name, node := range e.nDB.nodes {
83-
if node.Addr.Equal(mn.Addr) {
84-
logrus.Infof("Node %s/%s, is the new incarnation of the failed node %s/%s", name, node.Addr, mn.Name, mn.Addr)
85-
return
86-
}
87-
}
8854

89-
// In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h)
90-
// Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map
91-
n.reapTime = nodeReapInterval
92-
e.nDB.failedNodes[mn.Name] = n
93-
failed = true
55+
n, currState, _ := e.nDB.findNode(mn.Name)
56+
if n == nil {
57+
logrus.Errorf("Node %s/%s not found in the node lists", mn.Name, mn.Addr)
58+
return
9459
}
95-
96-
if failed {
97-
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
60+
// if the node was active means that did not send the leave cluster message, so it's probable that
61+
// failed. Else would be already in the left list so nothing else has to be done
62+
if currState == nodeActiveState {
63+
moved, err := e.nDB.changeNodeState(mn.Name, nodeFailedState)
64+
if err != nil {
65+
logrus.WithError(err).Errorf("impossible condition, node %s/%s not present in the list", mn.Name, mn.Addr)
66+
return
67+
}
68+
if moved {
69+
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
70+
}
9871
}
9972
}
10073

networkdb/networkdb_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import (
55
"fmt"
66
"io/ioutil"
77
"log"
8+
"net"
89
"os"
910
"sync/atomic"
1011
"testing"
1112
"time"
1213

1314
"github.com/docker/docker/pkg/stringid"
1415
"github.com/docker/go-events"
16+
"github.com/hashicorp/memberlist"
1517
"github.com/sirupsen/logrus"
1618
"github.com/stretchr/testify/assert"
1719
"github.com/stretchr/testify/require"
@@ -580,3 +582,156 @@ func TestNetworkDBGarbageCollection(t *testing.T) {
580582

581583
closeNetworkDBInstances(dbs)
582584
}
585+
586+
func TestFindNode(t *testing.T) {
587+
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
588+
589+
dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
590+
dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
591+
dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
592+
593+
// active nodes is 2 because the testing node is in the list
594+
assert.Equal(t, 2, len(dbs[0].nodes))
595+
assert.Equal(t, 1, len(dbs[0].failedNodes))
596+
assert.Equal(t, 1, len(dbs[0].leftNodes))
597+
598+
n, currState, m := dbs[0].findNode("active")
599+
assert.NotNil(t, n)
600+
assert.Equal(t, "active", n.Name)
601+
assert.Equal(t, nodeActiveState, currState)
602+
assert.NotNil(t, m)
603+
// delete the entry manually
604+
delete(m, "active")
605+
606+
// test if can be still find
607+
n, currState, m = dbs[0].findNode("active")
608+
assert.Nil(t, n)
609+
assert.Equal(t, nodeNotFound, currState)
610+
assert.Nil(t, m)
611+
612+
n, currState, m = dbs[0].findNode("failed")
613+
assert.NotNil(t, n)
614+
assert.Equal(t, "failed", n.Name)
615+
assert.Equal(t, nodeFailedState, currState)
616+
assert.NotNil(t, m)
617+
618+
// find and remove
619+
n, currState, m = dbs[0].findNode("left")
620+
assert.NotNil(t, n)
621+
assert.Equal(t, "left", n.Name)
622+
assert.Equal(t, nodeLeftState, currState)
623+
assert.NotNil(t, m)
624+
delete(m, "left")
625+
626+
n, currState, m = dbs[0].findNode("left")
627+
assert.Nil(t, n)
628+
assert.Equal(t, nodeNotFound, currState)
629+
assert.Nil(t, m)
630+
631+
closeNetworkDBInstances(dbs)
632+
}
633+
634+
func TestChangeNodeState(t *testing.T) {
635+
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
636+
637+
dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
638+
dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
639+
dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
640+
641+
// active nodes is 4 because the testing node is in the list
642+
assert.Equal(t, 4, len(dbs[0].nodes))
643+
644+
n, currState, m := dbs[0].findNode("node1")
645+
assert.NotNil(t, n)
646+
assert.Equal(t, nodeActiveState, currState)
647+
assert.Equal(t, "node1", n.Name)
648+
assert.NotNil(t, m)
649+
650+
// node1 to failed
651+
dbs[0].changeNodeState("node1", nodeFailedState)
652+
653+
n, currState, m = dbs[0].findNode("node1")
654+
assert.NotNil(t, n)
655+
assert.Equal(t, nodeFailedState, currState)
656+
assert.Equal(t, "node1", n.Name)
657+
assert.NotNil(t, m)
658+
assert.NotEqual(t, time.Duration(0), n.reapTime)
659+
660+
// node1 back to active
661+
dbs[0].changeNodeState("node1", nodeActiveState)
662+
663+
n, currState, m = dbs[0].findNode("node1")
664+
assert.NotNil(t, n)
665+
assert.Equal(t, nodeActiveState, currState)
666+
assert.Equal(t, "node1", n.Name)
667+
assert.NotNil(t, m)
668+
assert.Equal(t, time.Duration(0), n.reapTime)
669+
670+
// node1 to left
671+
dbs[0].changeNodeState("node1", nodeLeftState)
672+
dbs[0].changeNodeState("node2", nodeLeftState)
673+
dbs[0].changeNodeState("node3", nodeLeftState)
674+
675+
n, currState, m = dbs[0].findNode("node1")
676+
assert.NotNil(t, n)
677+
assert.Equal(t, nodeLeftState, currState)
678+
assert.Equal(t, "node1", n.Name)
679+
assert.NotNil(t, m)
680+
assert.NotEqual(t, time.Duration(0), n.reapTime)
681+
682+
n, currState, m = dbs[0].findNode("node2")
683+
assert.NotNil(t, n)
684+
assert.Equal(t, nodeLeftState, currState)
685+
assert.Equal(t, "node2", n.Name)
686+
assert.NotNil(t, m)
687+
assert.NotEqual(t, time.Duration(0), n.reapTime)
688+
689+
n, currState, m = dbs[0].findNode("node3")
690+
assert.NotNil(t, n)
691+
assert.Equal(t, nodeLeftState, currState)
692+
assert.Equal(t, "node3", n.Name)
693+
assert.NotNil(t, m)
694+
assert.NotEqual(t, time.Duration(0), n.reapTime)
695+
696+
// active nodes is 1 because the testing node is in the list
697+
assert.Equal(t, 1, len(dbs[0].nodes))
698+
assert.Equal(t, 0, len(dbs[0].failedNodes))
699+
assert.Equal(t, 3, len(dbs[0].leftNodes))
700+
701+
closeNetworkDBInstances(dbs)
702+
}
703+
704+
func TestNodeReincarnation(t *testing.T) {
705+
dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
706+
707+
dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
708+
dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
709+
dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
710+
711+
// active nodes is 2 because the testing node is in the list
712+
assert.Equal(t, 2, len(dbs[0].nodes))
713+
assert.Equal(t, 1, len(dbs[0].failedNodes))
714+
assert.Equal(t, 1, len(dbs[0].leftNodes))
715+
716+
b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
717+
assert.True(t, b)
718+
dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
719+
720+
b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
721+
assert.True(t, b)
722+
dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
723+
724+
b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
725+
assert.True(t, b)
726+
dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
727+
728+
b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
729+
assert.False(t, b)
730+
731+
// active nodes is 1 because the testing node is in the list
732+
assert.Equal(t, 4, len(dbs[0].nodes))
733+
assert.Equal(t, 0, len(dbs[0].failedNodes))
734+
assert.Equal(t, 3, len(dbs[0].leftNodes))
735+
736+
closeNetworkDBInstances(dbs)
737+
}

0 commit comments

Comments
 (0)