Skip to content

Commit

Permalink
Adjust corner case for reconnect logic
Browse files Browse the repository at this point in the history
Previous logic was not accounting that each node is
in the node list so the bootstrap nodes won't retry
to reconnect because they will always find themselves
in the node map
Added test that validate the gossip island condition

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Jun 22, 2018
1 parent c5e832e commit d611ef2
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 17 deletions.
29 changes: 23 additions & 6 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,35 @@ func (nDB *NetworkDB) rejoinClusterBootStrap() {
return
}

myself, _ := nDB.nodes[nDB.config.NodeID]
bootStrapIPs := make([]string, 0, len(nDB.bootStrapIP))
for _, bootIP := range nDB.bootStrapIP {
for _, node := range nDB.nodes {
if node.Addr.Equal(bootIP) {
// One of the bootstrap nodes is part of the cluster, return
nDB.RUnlock()
return
// botostrap IPs are usually IP:port from the Join
var bootstrapIP net.IP
ipStr, _, err := net.SplitHostPort(bootIP)
if err != nil {
// try to parse it as an IP with port
// Note this seems to be the case for swarm that do not specify any port
ipStr = bootIP
}
bootstrapIP = net.ParseIP(ipStr)
if bootstrapIP != nil {
for _, node := range nDB.nodes {
if node.Addr.Equal(bootstrapIP) && !node.Addr.Equal(myself.Addr) {
// One of the bootstrap nodes (and not myself) is part of the cluster, return
nDB.RUnlock()
return
}
}
bootStrapIPs = append(bootStrapIPs, bootIP)
}
bootStrapIPs = append(bootStrapIPs, bootIP.String())
}
nDB.RUnlock()
if len(bootStrapIPs) == 0 {
// this will also avoid to call the Join with an empty list erasing the current bootstrap ip list
logrus.Debug("rejoinClusterBootStrap did not find any valid IP")
return
}
// None of the bootStrap nodes are in the cluster, call memberlist join
logrus.Debugf("rejoinClusterBootStrap, calling cluster join with bootStrap %v", bootStrapIPs)
ctx, cancel := context.WithTimeout(nDB.ctx, rejoinClusterDuration)
Expand Down
9 changes: 3 additions & 6 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package networkdb
import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -96,7 +95,7 @@ type NetworkDB struct {

// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP
bootStrapIP []string

// lastStatsTimestamp is the last timestamp when the stats got printed
lastStatsTimestamp time.Time
Expand Down Expand Up @@ -268,10 +267,8 @@ func New(c *Config) (*NetworkDB, error) {
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
nDB.bootStrapIP = make([]net.IP, 0, len(members))
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
nDB.bootStrapIP = members
logrus.Infof("The new bootstrap node list is:%v", nDB.bootStrapIP)
nDB.Unlock()
return nDB.clusterJoin(members)
}
Expand Down
70 changes: 65 additions & 5 deletions networkdb/networkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,22 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func launchNode(t *testing.T, conf Config) *NetworkDB {
db, err := New(&conf)
require.NoError(t, err)
return db
}

func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
var dbs []*NetworkDB
for i := 0; i < num; i++ {
localConfig := *conf
localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(&localConfig)
require.NoError(t, err)

db := launchNode(t, localConfig)
if i != 0 {
err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
assert.NoError(t, err)
assert.NoError(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
}

dbs = append(dbs, db)
Expand Down Expand Up @@ -803,3 +806,60 @@ func TestParallelDelete(t *testing.T) {

closeNetworkDBInstances(dbs)
}

func TestNetworkDBIslands(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel)
dbs := createNetworkDBInstances(t, 5, "node", DefaultConfig())

// Get the node IP used currently
node, _ := dbs[0].nodes[dbs[0].config.NodeID]
baseIPStr := node.Addr.String()
// Node 0,1,2 are going to be the 3 bootstrap nodes
members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
// Rejoining will update the list of the bootstrap members
for i := 3; i < 5; i++ {
assert.NoError(t, dbs[i].Join(members))
}

// Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
for i := 0; i < 3; i++ {
logrus.Infof("node %d leaving", i)
dbs[i].Close()
time.Sleep(2 * time.Second)
}

// Give some time to let the system propagate the messages and free up the ports
time.Sleep(10 * time.Second)

// Verify that the nodes are actually all gone and marked appropiately
for i := 3; i < 5; i++ {
assert.Len(t, dbs[i].leftNodes, 3)
assert.Len(t, dbs[i].failedNodes, 0)
}

// Spawn again the first 3 nodes with different names but same IP:port
for i := 0; i < 3; i++ {
logrus.Infof("node %d coming back", i)
dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
dbs[i] = launchNode(t, *dbs[i].config)
time.Sleep(2 * time.Second)
}

// Give some time for the reconnect routine to run, it runs every 60s
time.Sleep(50 * time.Second)

// Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
for i := 0; i < 5; i++ {
assert.Len(t, dbs[i].nodes, 5)
assert.Len(t, dbs[i].failedNodes, 0)
if i < 3 {
// nodes from 0 to 3 has no left nodes
assert.Len(t, dbs[i].leftNodes, 0)
} else {
// nodes from 4 to 5 has the 3 previous left nodes
assert.Len(t, dbs[i].leftNodes, 3)
}
}
}

0 comments on commit d611ef2

Please sign in to comment.