Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multitag cardinality bug #842

Merged
merged 1 commit into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 72 additions & 61 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"github.com/juliangruber/go-intersect"
"github.com/sirupsen/logrus"
"github.com/soheilhy/cmux"
"google.golang.org/grpc"
Expand Down Expand Up @@ -125,7 +124,7 @@ type Plugins struct {
// AgentOption type that defines agent options
type AgentOption func(agent *Agent)

// NewAgent return a new Agent instace capable of starting
// NewAgent returns a new Agent instance capable of starting
// and running a Dkron instance.
func NewAgent(config *Config, options ...AgentOption) *Agent {
agent := &Agent{
Expand Down Expand Up @@ -680,8 +679,7 @@ func (a *Agent) join(addrs []string, replay bool) (n int, err error) {
}

func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]string, error) {
// candidates will contain a set of candidates by tags
// the final set of nodes will be the intesection of all groups
// The final set of nodes will be the intersection of all groups
tags := make(map[string]string)

// Actually copy the map
Expand All @@ -693,80 +691,93 @@ func (a *Agent) processFilteredNodes(job *Job) (map[string]string, map[string]st
// on the same region.
tags["region"] = a.config.Region

candidates := [][]string{}
// Make a set of all members
execNodes := make(map[string]serf.Member)
for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
execNodes[member.Name] = member
}
}

execNodes, tags, cardinality, err := filterNodes(execNodes, tags)
if err != nil {
return nil, nil, err
}

// Create an array of node names to aid in computing resulting set based on cardinality
var nameIndex []string
for name := range execNodes {
nameIndex = append(nameIndex, name)
}

nodes := make(map[string]string)
rand.Seed(time.Now().UnixNano())
for ; cardinality > 0; cardinality-- {
// Pick a node, any node
randomIndex := rand.Intn(cardinality)
m := execNodes[nameIndex[randomIndex]]

// Store name and address
if addr, ok := m.Tags["rpc_addr"]; ok {
nodes[m.Name] = addr
} else {
nodes[m.Name] = m.Addr.String()
}

// Swap picked node with the first one and shorten array, so node can't get picked again
nameIndex[randomIndex], nameIndex[0] = nameIndex[0], nameIndex[randomIndex]
nameIndex = nameIndex[1:]
}

return nodes, tags, nil
}

// filterNodes determines which of the execNodes have the given tags
// Out param! The incoming execNodes map is modified.
// Returns:
// * the (modified) map of execNodes
// * a map of tag values without cardinality
// * cardinality, i.e. the max number of nodes that should be targeted, regardless of the
// number of nodes in the resulting map.
// * an error if a cardinality was malformed
func filterNodes(execNodes map[string]serf.Member, tags map[string]string) (map[string]serf.Member, map[string]string, int, error) {
cardinality := int(^uint(0) >> 1) // MaxInt

cleanTags := make(map[string]string)

// Filter nodes that lack tags
// Determine lowest cardinality along the way
for jtk, jtv := range tags {
cans := []string{}
tc := strings.Split(jtv, ":")

tv := tc[0]

// Set original tag to clean tag
tags[jtk] = tv

for _, member := range a.serf.Members() {
if member.Status == serf.StatusAlive {
for mtk, mtv := range member.Tags {
if mtk == jtk && mtv == tv {
cans = append(cans, member.Name)
}
}
cleanTags[jtk] = tv

// Remove nodes that do not have the selected tags
for name, member := range execNodes {
if mtv, tagPresent := member.Tags[jtk]; !tagPresent || mtv != tv {
delete(execNodes, name)
}
}

// In case there is cardinality in the tag, randomize the order and select the amount of nodes
// or else just add all nodes to the result.
if len(tc) == 2 {
f := []string{}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(cans), func(i, j int) {
cans[i], cans[j] = cans[j], cans[i]
})

count, err := strconv.Atoi(tc[1])
tagCardinality, err := strconv.Atoi(tc[1])
if err != nil {
return nil, nil, err
return nil, nil, 0, err
}
for i := 1; i <= count; i++ {
if len(cans) == 0 {
break
}
f = append(f, cans[0])
cans = cans[1:]
if tagCardinality < cardinality {
cardinality = tagCardinality
}
cans = f
}

candidates = append(candidates, cans)
}

// The final result will be the intersection of all candidates.
nodes := make(map[string]string)
r := candidates[0]
for i := 1; i <= len(candidates)-1; i++ {
isec := intersect.Simple(r, candidates[i]).([]interface{})
// Empty the slice
r = []string{}

// Refill with the intersection
for _, v := range isec {
r = append(r, v.(string))
}
}

for _, n := range r {
for _, m := range a.serf.Members() {
if n == m.Name {
// If the server is missing the rpc_addr tag, default to the serf advertise addr
if addr, ok := m.Tags["rpc_addr"]; ok {
nodes[n] = addr
} else {
nodes[n] = m.Addr.String()
}
}
}
// limit the cardinality to the number of possible nodes
if len(execNodes) < cardinality {
cardinality = len(execNodes)
}

return nodes, tags, nil
return execNodes, cleanTags, cardinality, nil
}

// This function is called when a client request the RPCAddress
Expand Down
51 changes: 43 additions & 8 deletions dkron/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ func Test_processFilteredNodes(t *testing.T) {
c.Server = true
c.LogLevel = logLevel
c.Tags = map[string]string{
"tag": "test",
"region": "global",
"tag": "test",
"region": "global",
"additional": "value",
"additional2": "value2",
}
c.DevMode = true
c.DataDir = dir
Expand All @@ -140,9 +142,11 @@ func Test_processFilteredNodes(t *testing.T) {
c.Server = true
c.LogLevel = logLevel
c.Tags = map[string]string{
"tag": "test",
"extra": "tag",
"region": "global",
"tag": "test",
"extra": "tag",
"region": "global",
"additional": "value",
"additional2": "value2",
}
c.DevMode = true
c.DataDir = dir
Expand All @@ -162,9 +166,11 @@ func Test_processFilteredNodes(t *testing.T) {
c.Server = false
c.LogLevel = logLevel
c.Tags = map[string]string{
"tag": "test_client",
"extra": "tag",
"region": "global",
"tag": "test_client",
"extra": "tag",
"region": "global",
"additional": "value",
"additional2": "value2",
}
c.DevMode = true
c.DataDir = dir
Expand All @@ -174,6 +180,7 @@ func Test_processFilteredNodes(t *testing.T) {

time.Sleep(2 * time.Second)

// Test cardinality of 2 returns correct nodes
job := &Job{
Name: "test_job_1",
Tags: map[string]string{
Expand All @@ -189,6 +196,7 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Len(t, nodes, 2)
assert.Equal(t, tags["tag"], "test")

// Test cardinality of 1 with two qualified nodes returns 1 node
job2 := &Job{
Name: "test_job_2",
Tags: map[string]string{
Expand All @@ -201,6 +209,7 @@ func Test_processFilteredNodes(t *testing.T) {

assert.Len(t, nodes, 1)

// Test no cardinality specified, all nodes returned
job3 := &Job{
Name: "test_job_3",
}
Expand All @@ -213,6 +222,7 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Contains(t, nodes, "test2")
assert.Contains(t, nodes, "test3")

// Test exclusive tag returns correct node
job4 := &Job{
Name: "test_job_4",
Tags: map[string]string{
Expand All @@ -226,6 +236,7 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Len(t, nodes, 1)
assert.Contains(t, nodes, "test3")

// Test existing tag but no matching value returns no nodes
job5 := &Job{
Name: "test_job_5",
Tags: map[string]string{
Expand All @@ -238,6 +249,7 @@ func Test_processFilteredNodes(t *testing.T) {

assert.Len(t, nodes, 0)

// Test 1 matching and 1 not matching tag returns no nodes
job6 := &Job{
Name: "test_job_6",
Tags: map[string]string{
Expand All @@ -252,6 +264,7 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Len(t, nodes, 0)
assert.Equal(t, tags["tag"], "test")

// Test matching tags with cardinality of 2 but only 1 matching node returns correct node
job7 := &Job{
Name: "test_job_7",
Tags: map[string]string{
Expand All @@ -268,6 +281,28 @@ func Test_processFilteredNodes(t *testing.T) {
assert.Equal(t, tags["tag"], "test")
assert.Equal(t, tags["extra"], "tag")

// Test two tags matching same 3 servers and cardinality of 1 should always return 1 server

// Do this 10 times: an old bug caused this to sometimes succeed and sometimes fail due to the use of math.rand
// Statistically, with 10 tries about 3 should succeed and the rest should fail, if the code is buggy.
for i := 0; i < 10; i++ {
job8 := &Job{
Name: "test_job_7",
Tags: map[string]string{
"additional": "value:1",
"additional2": "value2:1",
},
}

nodes, tags, err = a1.processFilteredNodes(job8)
require.NoError(t, err)

assert.Len(t, nodes, 1)
assert.Equal(t, tags["additional"], "value")
assert.Equal(t, tags["additional2"], "value2")
}

// Clean up
a1.Stop()
a2.Stop()
a3.Stop()
Expand Down