diff --git a/cluster/cluster.go b/cluster/cluster.go index 7a9a337f46..7aac47c5bc 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,8 +2,8 @@ package cluster import ( "errors" - "math/rand" "net/http" + "sync/atomic" "time" opentracing "github.com/opentracing/opentracing-go" @@ -11,6 +11,8 @@ import ( type ModeType string +var counter uint32 + const ( ModeSingle = "single" ModeMulti = "multi" @@ -105,12 +107,13 @@ func MembersForQuery() ([]Node, error) { } continue } - if membersMap[part].priority == member.GetPriority() { + priority := member.GetPriority() + if membersMap[part].priority == priority { membersMap[part].nodes = append(membersMap[part].nodes, member) - } else if membersMap[part].priority > member.GetPriority() { + } else if membersMap[part].priority > priority { // this node has higher priority (lower number) then previously seen candidates membersMap[part] = &partitionCandidates{ - priority: member.GetPriority(), + priority: priority, nodes: []Node{member}, } } @@ -125,6 +128,8 @@ func MembersForQuery() ([]Node, error) { // we want to get the minimum number of nodes // needed to cover all partitions + count := int(atomic.AddUint32(&counter, 1)) + LOOP: for _, candidates := range membersMap { if candidates.nodes[0].GetName() == thisNode.GetName() { @@ -140,9 +145,10 @@ LOOP: continue LOOP } } - // if no nodes have been selected yet then grab a - // random node from the set of available nodes - selected := candidates.nodes[rand.Intn(len(candidates.nodes))] + // if no nodes have been selected yet then grab a node from + // the set of available nodes in such a way that nodes are + // weighted fairly across MembersForQuery calls + selected := candidates.nodes[count%len(candidates.nodes)] selectedMembers[selected.GetName()] = struct{}{} answer = append(answer, selected) } diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 6c8cc76106..b0f45eb9e5 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -73,17 +73,17 @@ func TestPeersForQueryMulti(t *testing.T) { So(nodeNames, ShouldContain, manager.thisNode().GetName()) Convey("members should be selected randomly with even distribution", func() { peerCount := make(map[string]int) - for i := 0; i < 1000; i++ { + for i := 0; i < 100; i++ { selected, err = MembersForQuery() So(err, ShouldBeNil) for _, p := range selected { peerCount[p.GetName()]++ } } - So(peerCount["node1"], ShouldEqual, 1000) + So(peerCount["node1"], ShouldEqual, 100) So(peerCount["node2"], ShouldEqual, 0) - So(peerCount["node3"], ShouldNotAlmostEqual, 500) - So(peerCount["node4"], ShouldNotAlmostEqual, 500) + So(peerCount["node3"], ShouldEqual, 50) + So(peerCount["node4"], ShouldEqual, 50) for p, count := range peerCount { t.Logf("%s: %d", p, count) } diff --git a/cluster/manager.go b/cluster/manager.go index a6fc95c98c..26bfb26fb6 100644 --- a/cluster/manager.go +++ b/cluster/manager.go @@ -3,6 +3,7 @@ package cluster import ( "crypto/sha256" "encoding/json" + "sort" "strings" "sync" "time" @@ -163,6 +164,7 @@ func (c *MemberlistManager) memberList() []HTTPNode { i++ } c.RUnlock() + sort.Sort(HTTPNodesByName(list)) return list } diff --git a/cluster/node.go b/cluster/node.go index 6e6b852feb..88fc620cee 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -206,3 +206,9 @@ func handleResp(rsp *http.Response) ([]byte, error) { } return ioutil.ReadAll(rsp.Body) } + +type HTTPNodesByName []HTTPNode + +func (n HTTPNodesByName) Len() int { return len(n) } +func (n HTTPNodesByName) Swap(i, j int) { n[i], n[j] = n[j], n[i] } +func (n HTTPNodesByName) Less(i, j int) bool { return n[i].GetName() < n[j].GetName() }